Skip to content

Commit

Permalink
loop primitive & batch mdq example
Browse files Browse the repository at this point in the history
  • Loading branch information
leifj committed Dec 9, 2019
1 parent 382c53d commit 359224b
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 89 deletions.
39 changes: 39 additions & 0 deletions examples/batch-mdq-loop.fd
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
- when sign:
- finalize:
cacheDuration: PT5H
validUntil: P10D
- sign:
key: sign.key
cert: sign.crt
- when batch:
- load:
- https://mds.swamid.se/md/swamid-registered.xml
- http://md.incommon.org/InCommon/InCommon-metadata.xml
- http://fed.openathens.net/oafed/metadata
- http://edugain.cdn.samlbits.net
- select:
- each:
- log_entity:
- fork:
- drop_xsi_type:
- finalize:
cacheDuration: PT5H
validUntil: P10D
- sign:
key: sign.key
cert: sign.crt
- publish:
output: /tmp/mdq/entities
hash_link: true
urlencode_filenames: true
update_store: false
- break
- fork:
- discojson
- publish:
output: /tmp/mdq/entities
hash_link: true
raw: true
ext: .json
urlencode_filenames: true
update_store: false
16 changes: 0 additions & 16 deletions examples/batch-mdq.fd

This file was deleted.

167 changes: 94 additions & 73 deletions src/pyff/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,78 +55,48 @@ def dump(req, *opts):
print("<EntitiesDescriptor xmlns=\"{}\"/>".format(NS['md']))


@pipe(name="mdq")
def _mdq_t(req, *opts):
@pipe(name="each")
def _each(req, *opts):
"""
Emit an mdq tree
loop over the entities in a selection
:param req:
:param opts:
:return: None
:param req:
:param opts:
:return: None
**Examples**
**Examples**
.. code-block:: yaml
.. code-block:: yaml
- mdq
output: "somedir"
- each:
- ...statements...
Each time around the loop a request is processed with the id set to the entity_id of the object. The
statements below the each statements are processed for each entity in the working tree.
"""
opts = dict(list(zip(opts[::2], [opts[1::2]])))
opts.setdefault('via', [])
base_dir = req.args.get('output', None)

if req.t is None:
raise PipeException("Your pipeline is missing a select statement.")

if opts['via'] is not None:
opts['via'] = [PipelineCallback(pipe, req, store=req.md.store) for pipe in opts['via']]

def _xml(t):
return dumptree(t)

def _json(t):
res = discojson_t(t, icon_store=req.md.icon_store)
res.sort(key=operator.itemgetter('title'))

return json.dumps(res)

st = req.t
for cb in opts['via']:
st = cb(st)

safe_write("{}/index.json".format(base_dir), _json(st), mkdirs=True)
safe_write("{}/index.xml".format(base_dir), _xml(st), mkdirs=True)

def _p(e):
for cb in opts['via']:
e = cb(e)
entity_id = e.get('entityID')
hid = hash_id(entity_id, 'sha1', True)
j = _json(e)
x = _xml(e)

json_fn = "{}/{}.json".format(base_dir, quote_plus(entity_id))
xml_fn = "{}/{}.xml".format(base_dir, quote_plus(entity_id))
json_id_fn = "{}/{}.json".format(base_dir, quote_plus(hid))
xml_id_fn = "{}/{}.xml".format(base_dir, quote_plus(hid))
safe_write(json_fn, j, mkdirs=True)
safe_write(xml_fn, x, mkdirs=True)

if os.path.exists(json_id_fn):
os.unlink(json_id_fn)
os.symlink(json_fn, "{}/{}.json".format(base_dir, quote_plus(hid)))
if os.path.exists(xml_id_fn):
os.unlink(xml_id_fn)
os.symlink(xml_fn, "{}/{}.xml".format(base_dir, quote_plus(hid)))
return entity_id
ip = Plumbing(pipeline=req.args, pid="{}.each[{}]".format(req.plumbing.pid, entity_id))
ireq = Plumbing.Request(ip, req.md, t=e, scheduler=req.scheduler)
ireq.set_id(entity_id)
return ip.iprocess(ireq)

from multiprocessing.pool import ThreadPool
pool = ThreadPool()
result = pool.map(_p, iter_entities(req.t))
log.info("wrote an mdq store with {} entities".format(len(result)))
result = pool.map(_p, iter_entities(req.t), chunksize=10)
log.info("processed {} entities".format(len(result)))


@pipe(name="log_entity")
def _log_entity(req, *opts):
"""
log the request id as it is processed (typically the entity_id)
"""
log.info(req.id)
return req.t


@pipe(name="print")
Expand All @@ -153,6 +123,7 @@ def _print_t(req, *opts):
else:
print(req.t)


@pipe
def end(req, *opts):
"""
Expand Down Expand Up @@ -250,6 +221,7 @@ def fork(req, *opts):

ip = Plumbing(pipeline=req.args, pid="%s.fork" % req.plumbing.pid)
ireq = Plumbing.Request(ip, req.md, t=nt, scheduler=req.scheduler)
ireq.set_id(req.id)
ip.iprocess(ireq)

if req.t is not None and ireq.t is not None and len(root(ireq.t)) > 0:
Expand Down Expand Up @@ -440,25 +412,62 @@ def publish(req, *opts):
.. code-block:: yaml
- publish: /tmp/idp.xml
The full set of options with their corresponding defaults:
.. code-block:: yaml
- publish:
output: output
raw: false
urlencode_filenames: false
hash_link: false
update_store: true
ext: .xml
If output is an existing directory, publish will write the working tree to a filename in the directory
based on the @entityID or @Name attribute. Unless 'raw' is set to true the working tree will be serialized
to a string before writing. If true, 'hash_link' will generate a symlink based on the hash id (sha1) for
compatibility with MDQ. Unless false, 'update_store' will cause the the current store to be updated with
the published artifact. Setting 'ext' allows control over the file extension.
"""

if req.t is None:
raise PipeException("Empty document submitted for publication")

if req.args is None:
raise PipeException("publish must specify output")
raise PipeException("Publish must at least specify output")

try:
validate_document(req.t)
except DocumentInvalid as ex:
log.error(ex.error_log)
raise PipeException("XML schema validation failed")
if type(req.args) is not dict:
req.args = dict(output=req.args[0])

for t in ('raw', 'update_store', 'hash_link', 'urlencode_filenames'):
if t in req.args and type(req.args[t]) is not bool:
req.args[t] = strtobool("{}".format(req.args[t]))

req.args.setdefault('ext', '.xml')
req.args.setdefault('output_file', 'output')
req.args.setdefault('raw', False)
req.args.setdefault('update_store', True)
req.args.setdefault('hash_link', False)
req.args.setdefault('urlencode_filenames', False)

output_file = req.args.get("output", None)

if not req.args.get('raw'):
try:
validate_document(req.t)
except DocumentInvalid as ex:
log.error(ex.error_log)
raise PipeException("XML schema validation failed")

def _nop(x):
return x

enc = _nop
if req.args.get('urlencode_filenames'):
enc = quote_plus

output_file = None
if type(req.args) is dict:
output_file = req.args.get("output", None)
else:
output_file = req.args[0]
if output_file is not None:
output_file = output_file.strip()
resource_name = output_file
Expand All @@ -467,13 +476,25 @@ def publish(req, *opts):
output_file = m.group(1)
resource_name = m.group(2)
out = output_file
if os.path.isdir(output_file):
out = "{}.xml".format(os.path.join(output_file, req.id))
data = req.t
if not req.args.get('raw'):
data = dumptree(req.t)

data = dumptree(req.t)
if os.path.isdir(output_file):
file_name = "{}{}".format(enc(req.id), req.args.get('ext'))
out = os.path.join(output_file, file_name)
safe_write(out, data, mkdirs=True)
if req.args.get('hash_link'):
link_name = "{}{}".format(enc(hash_id(req.id)), req.args.get('ext'))
link_path = os.path.join(output_file, link_name)
if os.path.exists(link_path):
os.unlink(link_path)
os.symlink(file_name, link_path)
else:
safe_write(out, data, mkdirs=True)

safe_write(out, data)
req.store.update(req.t, tid=resource_name) # TODO maybe this is not the right thing to do anymore
if req.args.get('update_store'):
req.store.update(req.t, tid=resource_name) # TODO maybe this is not the right thing to do anymore
return req.t


Expand Down
14 changes: 14 additions & 0 deletions src/pyff/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ def __init__(self, pl, md, t=None, name=None, args=None, state=None, store=None,
self.plumbing = pl
self.md = md
self.t = t
self._id = None
self.name = name
self.args = args
self.state = state
Expand All @@ -213,6 +214,19 @@ def __init__(self, pl, md, t=None, name=None, args=None, state=None, store=None,
self.raise_exceptions = raise_exceptions
self.exception = None

@property
def id(self):
if self.t is None:
return None
if self._id is None:
self._id = self.t.get('entityID')
if self._id is None:
self._id = self.t.get('Name')
return self._id

def set_id(self, _id):
self._id = _id

@property
def store(self):
if self._store:
Expand Down

0 comments on commit 359224b

Please sign in to comment.