Permalink
Browse files

post-processing

  • Loading branch information...
leifj committed Sep 23, 2013
1 parent f7c55b5 commit 64fb9ccdc635fdd74e391ffb3ecdbb33db92a454
Showing with 112 additions and 53 deletions.
  1. +9 −3 examples/mdx.fd
  2. +40 −20 src/pyff/mdrepo.py
  3. +61 −29 src/pyff/pipes/builtins.py
  4. +2 −1 src/pyff/utils.py
View
@@ -1,7 +1,13 @@
- when normalize:
- xslt:
stylesheet: tidy.xml
- when edugain:
- xslt:
stylesheet: tidy.xml
- when update:
- load:
- examples/links.xrd
- http://mds.edugain.org
- examples/links.xrd via normalize
- http://mds.edugain.org via edugain
- select
- fork and merge:
- select: "!md:EntityDescriptor[md:Extensions[mdrpi:RegistrationInfo[@registrationAuthority='http://www.swamid.se/']]]"
@@ -49,5 +55,5 @@
- when accept application/json:
- xslt:
stylesheet: discojson.xsl
- emit application/json:
- emit application/json
- break
View
@@ -279,7 +279,7 @@ def set_entity_attributes(self, e, d, nf=NF_URI):
a.append(velt)
#log.debug(etree.tostring(a))
def fetch_metadata(self, resources, qsize=5, timeout=120, stats=None, xrd=None):
def fetch_metadata(self, resources, qsize=5, timeout=120, stats=None, xrd=None, validate=False):
"""Fetch a series of metadata URLs and optionally verify signatures.
:param resources: A list of triples (url,cert-or-fingerprint,id)
@@ -304,9 +304,9 @@ def fetch_metadata(self, resources, qsize=5, timeout=120, stats=None, xrd=None):
def producer(q, resources, cache=self.metadata_cache_enabled):
print resources
for url, verify, id, tries in resources:
for url, verify, id, tries, post in resources:
log.debug("starting fetcher for '%s'" % url)
thread = URLFetch(url, verify, id, enable_cache=cache, tries=tries)
thread = URLFetch(url, verify, id, enable_cache=cache, tries=tries, post=post)
thread.start()
q.put(thread, True)
@@ -348,7 +348,11 @@ def consumer(q, njobs, stats, next_jobs=None, resolved=None):
if thread.resp is not None:
info['Status'] = thread.resp.status
t = self.parse_metadata(StringIO(xml), key=thread.verify, base_url=thread.url)
t = self.parse_metadata(StringIO(xml),
key=thread.verify,
base_url=thread.url,
validate=validate,
post=thread.post)
if t is None:
self.fire(type=EVENT_IMPORT_FAIL, url=thread.url)
raise MetadataException("no valid metadata found at '%s'" % thread.url)
@@ -366,7 +370,7 @@ def consumer(q, njobs, stats, next_jobs=None, resolved=None):
if len(fingerprints) > 0:
fp = fingerprints[0]
log.debug("fingerprint: %s" % fp)
next_jobs.append((url, fp, url, 0))
next_jobs.append((url, fp, url, 0, thread.post))
elif relt.tag in ('{%s}EntityDescriptor' % NS['md'], '{%s}EntitiesDescriptor' % NS['md']):
cacheDuration = self.default_cache_duration
@@ -408,7 +412,7 @@ def consumer(q, njobs, stats, next_jobs=None, resolved=None):
if info is not None:
stats[thread.url] = info
resources = [(url, verify, rid, 0) for url, verify, rid in resources]
resources = [(url, verify, rid, 0, post) for url, verify, rid, post in resources]
resolved = set()
cache = True
while len(resources) > 0:
@@ -432,27 +436,43 @@ def consumer(q, njobs, stats, next_jobs=None, resolved=None):
with open(xrd, "w") as fd:
fd.write(template("trust.xrd").render(links=resolved))
def parse_metadata(self, fn, key=None, base_url=None, fail_on_error=False, filter_invalid=True):
def parse_metadata(self,
fn,
key=None,
base_url=None,
fail_on_error=False,
filter_invalid=True,
validate=True,
post=None):
"""Parse a piece of XML and split it up into EntityDescriptor elements. Each such element
is stored in the MDRepository instance.
:param fn: a file-like object containing SAML metadata
:param key: a certificate (file) or a SHA1 fingerprint to use for signature verification
:param base_url: use this base url to resolve relative URLs for XInclude processing
:param fail_on_error: (default: False)
:param filter_invalid: (default True) remove invalid EntityDescriptor elements rather than raise an errror
:param validate: (default: True) set to False to turn off all XML schema validation
:param post: A callable that will be called to modify the parse-tree before any validation (but after xinclud processing)
"""
try:
t = etree.parse(fn, base_url=base_url, parser=etree.XMLParser(resolve_entities=False))
t.xinclude()
if filter_invalid:
for e in t.findall('{%s}EntityDescriptor' % NS['md']):
if not schema().validate(e):
error = _e(schema().error_log, m=base_url)
log.debug("removing '%s': schema validation failed (%s)" % (e.get('entityID'), error))
e.getparent().remove(e)
self.fire(type=EVENT_DROP_ENTITY, url=base_url, entityID=e.get('entityID'), error=error)
else:
# Having removed the invalid entities this should now never happen...
schema().assertValid(t)
if post is not None:
t = post(t)
if validate:
if filter_invalid:
for e in t.findall('{%s}EntityDescriptor' % NS['md']):
if not schema().validate(e):
error = _e(schema().error_log, m=base_url)
log.debug("removing '%s': schema validation failed (%s)" % (e.get('entityID'), error))
e.getparent().remove(e)
self.fire(type=EVENT_DROP_ENTITY, url=base_url, entityID=e.get('entityID'), error=error)
else:
# Having removed the invalid entities this should now never happen...
schema().assertValid(t)
except DocumentInvalid, ex:
traceback.print_exc()
log.debug("schema validation failed on '%s': %s" % (base_url, _e(ex.error_log, m=base_url)))
@@ -469,7 +489,7 @@ def parse_metadata(self, fn, key=None, base_url=None, fail_on_error=False, filte
refs = xmlsec.verified(t, key)
if len(refs) != 1:
raise MetadataException("XML metadata contains %d signatures - exactly 1 is required" % len(refs))
t = refs[0] # prevent wrapping attacks
t = refs[0] # prevent wrapping attacks
except Exception, ex:
tb = traceback.format_exc()
print tb
@@ -529,7 +549,7 @@ def entities(self, t=None):
else:
return t.findall(".//{%s}EntityDescriptor" % NS['md'])
def load_dir(self, directory, ext=".xml", url=None):
def load_dir(self, directory, ext=".xml", url=None, validate=False, post=None):
"""
:param directory: A directory to walk.
:param ext: Include files with this extension (default .xml)
@@ -551,7 +571,7 @@ def load_dir(self, directory, ext=".xml", url=None):
if nm.endswith(ext):
fn = os.path.join(top, nm)
try:
t = self.parse_metadata(fn, fail_on_error=True)
t = self.parse_metadata(fn, fail_on_error=True, validate=validate, post=post)
entities.extend(self.entities(t)) # local metadata is assumed to be ok
except Exception, ex:
log.error(ex)
View
@@ -335,56 +335,88 @@ def local(req, *opts):
@deprecated
def _fetch(req, *opts):
return load(req,*opts)
return load(req, *opts)
def load(req, *opts):
"""
General-purpose resource fetcher.
:param opts:
:param req: The request
:param opts: Options: [qsize <5>] [timeout <30>] [xrd <output xrd file>]
:param opts: Options: [qsize <5>] [timeout <30>] [validate <True*|False>] [xrd <output xrd file>]
:return: None
Supports both remote and local resources. Fetching remote resources is done in parallell using threads.
"""
opts = dict(zip(opts[::2], opts[1::2]))
opts.setdefault('timeout', 30)
opts.setdefault('qsize', 5)
opts.setdefault('xrd', None)
opts.setdefault('validate', True)
stats = dict()
opts.setdefault('stats', stats)
class PipelineCallback():
"""
A delayed pipeline callback used as a post for parse_metadata
"""
def __init__(self, entry_point, req, stats):
self.entry_point = entry_point
self.plumbing = Plumbing(req.plumbing, "%s-via-%s" % (req.plumbing.id, entry_point))
self.req = req
self.stats = stats
def __call__(self, *args, **kwargs):
t = args[0]
if not t:
raise ValueError("PipelineCallback must be called with a parse-tree argument")
return self.plumbing.process(self.req.md, state={self.entry_point: True, 'stats': self.stats}, t=t)
remote = []
for x in req.args:
x = x.strip()
log.debug("load %s" % x)
m = re.match(FILESPEC_REGEX, x)
rid = None
if m:
x = m.group(1)
rid = m.group(2)
log.debug("load parsing '%s'" % x)
r = x.split()
assert len(r) in [1, 2], PipeException("Usage: load: resource [as url] [verification]")
verify = None
url = r[0]
if len(r) == 2:
verify = r[1]
assert len(r) in range(1, 7), PipeException("Usage: load: resource [as url] [[verify] verification] [via pipeline]")
url = r.pop(0)
params = dict()
while len(r) > 0:
elt = r.pop(0)
if elt in ("as", "verify", "via"):
if len(r) > 0:
params[elt] = r.pop(0)
else:
raise PipeException("Usage: load: resource [as url] [[verify] verification] [via pipeline]")
else:
params['verify'] = elt
for elt in ("as", "verify", "via"):
params.setdefault(elt, None)
post = None
if params['via'] is not None:
post = PipelineCallback(params['via'], req, stats)
print post
if "://" in url:
log.debug("remote %s %s %s" % (url, verify, rid))
remote.append((url, verify, rid))
log.debug("load %s verify %s as %s via %s" % (url, params['verify'], params['as'], params['via']))
remote.append((url, params['verify'], params['as'], post))
elif os.path.exists(url):
if os.path.isdir(url):
log.debug("local directory %s %s %s" % (url, verify, rid))
req.md.load_dir(url, url=rid)
log.debug("directory %s verify %s as %s via %s" % (url, params['verify'], params['as'], params['via']))
req.md.load_dir(url, url=params['as'], validate=opts['validate'], post=post)
elif os.path.isfile(url):
log.debug("local file %s %s %s" % (url, verify, rid))
remote.append(("file://%s" % url, verify, rid))
log.debug("file %s verify %s as %s via %s" % (url, params['verify'], params['as'], params['via']))
remote.append(("file://%s" % url, params['verify'], params['as'], post))
else:
log.error("Unknown file type for load: %s" % r[0])
log.error("Unknown file type for load: '%s'" % url)
else:
log.error("Don't know how to load '%s' as %s verified by %s" % (url, rid, verify))
opts = dict(zip(opts[::2], opts[1::2]))
opts.setdefault('timeout', 30)
opts.setdefault('qsize', 5)
opts.setdefault('xrd', None)
stats = dict()
opts.setdefault('stats', stats)
log.error("Don't know how to load '%s' as %s verify %s via %s" %
(url, params['as'], params['verify'], params['via']))
print remote
req.md.fetch_metadata(remote, **opts)
req.state['stats']['Metadata URLs'] = stats
@@ -689,7 +721,7 @@ def validate(req, *opts):
Validate the working document
:param req: The request
:param opts: Options - the template name
:param opts: Not used
:return: The unmodified tree
Generate an exception unless the working tree validates. Validation is done automatically during publication and
View
@@ -214,7 +214,7 @@ def template(name):
class URLFetch(threading.Thread):
def __init__(self, url, verify, id=None, enable_cache=False, tries=0):
def __init__(self, url, verify, id=None, enable_cache=False, tries=0, post=None):
self.url = url.strip()
self.verify = verify
self.id = id
@@ -230,6 +230,7 @@ def __init__(self, url, verify, id=None, enable_cache=False, tries=0):
self.start_time = 0
self.end_time = 0
self.tries = tries
self.post = post
if self.id is None:
self.id = self.url

0 comments on commit 64fb9cc

Please sign in to comment.