diff --git a/stix/base.py b/stix/base.py index f36c1b22..341c3166 100644 --- a/stix/base.py +++ b/stix/base.py @@ -1,14 +1,46 @@ # Copyright (c) 2014, The MITRE Corporation. All rights reserved. # See LICENSE.txt for complete terms. +# builtin import itertools import collections +import StringIO import json -from StringIO import StringIO + +# external from lxml import etree +# internal import stix.bindings as bindings +# lazy imports +cybox = None +cybox_common = None +utils = None +nsparser = None + +__LAZY_MODS_LOADED = False + + +def _load_lazy_mods(): + global cybox, cybox_common, utils, nsparser + global __LAZY_MODS_LOADED + + if __LAZY_MODS_LOADED: + return + + if not cybox: + import cybox + if not cybox_common: + import cybox.common as cybox_common + if not utils: + import stix.utils as utils + if not nsparser: + import stix.utils.nsparser as nsparser + + __LAZY_MODS_LOADED = True + + def _override(*args, **kwargs): raise NotImplementedError() @@ -77,14 +109,11 @@ def to_xml(self, include_namespaces=True, include_schemalocs=True, :class:`Entity` instance. Default character encoding is ``utf-8``. """ - from stix.utils.nsparser import ( - NamespaceParser, NamespaceInfo, DEFAULT_STIX_NAMESPACES - ) - - parser = NamespaceParser() + _load_lazy_mods() + parser = nsparser.NamespaceParser() if auto_namespace: - ns_info = NamespaceInfo() + ns_info = nsparser.NamespaceInfo() else: ns_info = None @@ -97,16 +126,16 @@ def to_xml(self, include_namespaces=True, include_schemalocs=True, ) if auto_namespace: - ns_info.finalize() + ns_info.finalize(ns_dict=ns_dict, schemaloc_dict=schemaloc_dict) obj_ns_dict = ns_info.finalized_namespaces else: - ns_info = NamespaceInfo() + ns_info = nsparser.NamespaceInfo() ns_info.finalized_namespaces = ns_dict or {} ns_info.finalized_schemalocs = schemaloc_dict or {} obj_ns_dict = dict( itertools.chain( ns_dict.iteritems(), - DEFAULT_STIX_NAMESPACES.iteritems() + nsparser.DEFAULT_STIX_NAMESPACES.iteritems() ) ) @@ -123,7 +152,7 @@ def to_xml(self, include_namespaces=True, include_schemalocs=True, namespace_def = namespace_def.replace('\n\t', ' ') with bindings.save_encoding(encoding): - sio = StringIO() + sio = StringIO.StringIO() obj.export( sio.write, # output buffer 0, # output level @@ -213,11 +242,10 @@ def dict_from_object(cls, entity_obj): return cls.from_obj(entity_obj).to_dict() def walk(self): - from cybox import Entity as cyboxEntity - from cybox.common import ObjectProperties + _load_lazy_mods() - yieldable = (Entity, cyboxEntity) - skip = {ObjectProperties : '_parent'} + yieldable = (Entity, cybox.cyboxEntity) + skip = {cybox_common.ObjectProperties : '_parent'} def can_skip(obj, field): for klass, prop in skip.iteritems(): diff --git a/stix/utils/nsparser.py b/stix/utils/nsparser.py index 3c75c71f..d706fe65 100644 --- a/stix/utils/nsparser.py +++ b/stix/utils/nsparser.py @@ -24,15 +24,13 @@ def update(self, ns_info): self.input_schemalocs.update(ns_info.input_schemalocs) def finalize(self, ns_dict=None, schemaloc_dict=None): - if not ns_dict: - ns_dict = {} - - if not schemaloc_dict: - schemaloc_dict = {} + ns_dict = dict(ns_dict.iteritems()) if ns_dict else {} + schemaloc_dict = dict(schemaloc_dict.iteritems()) if schemaloc_dict else {} id_ns = get_id_namespace() id_ns_alias = get_id_namespace_alias() + # Baseline namespaces: these appear in every document d_ns = { 'http://www.w3.org/2001/XMLSchema-instance': 'xsi', 'http://stix.mitre.org/stix-1': 'stix', @@ -44,10 +42,16 @@ def finalize(self, ns_dict=None, schemaloc_dict=None): id_ns: id_ns_alias } + # Iterate over the namespaces collected during a parse of the package. + # If a namespace is not a STIX/CybOX/MAEC/XML namespace, include + # the namespace->alias mapping. for ns, alias in self.input_namespaces.iteritems(): if ns not in DEFAULT_STIX_NAMESPACES: d_ns[ns] = alias + # Iterate over the 'collected' namespaces which were found on every + # python-stix|cybox|maec object in this package. If it has an alias + # defined, use it. Otherwise, look up the alias in our default dicts. for ns, alias in self.namespaces.iteritems(): if alias: d_ns[ns] = alias @@ -55,21 +59,32 @@ def finalize(self, ns_dict=None, schemaloc_dict=None): default_alias = DEFAULT_STIX_NAMESPACES[ns] d_ns[ns] = default_alias - d_ns.update(ns_dict) + # Update the input dictionary with our processed input/collected + # namespaces. This will overwrite any of the ns_dict namespace mappings + # with those expected/defined by the APIs and bindings. + # + # This will be our finalized_namespaces value. + ns_dict.update(d_ns) # Attempts to resolve issues where our samples use # 'http://example.com/' for our example namespace but python-stix uses # 'http://example.com' by removing the former. examples = ( - ('http://example.com/' in d_ns), - ('http://example.com' in d_ns) + ('http://example.com/' in ns_dict), + ('http://example.com' in ns_dict) ) + # If we found both example namespaces, remove the one with a slash + # at the end, because our default ID namespace doesn't have a slash. if all(examples): - del d_ns['http://example.com/'] + del ns_dict['http://example.com/'] + + # Attempt to identify duplicate namespace aliases. This will render + # an invalid XML document. Raise a Python warning if duplicates are + # found. aliases = {} - for ns, alias in d_ns.iteritems(): + for ns, alias in ns_dict.iteritems(): if alias not in aliases: aliases[alias] = ns else: @@ -79,11 +94,23 @@ def finalize(self, ns_dict=None, schemaloc_dict=None): message = message.format(alias, ns, aliases[alias]) warnings.warn(message) - d_sl = dict(self.input_schemalocs.items()) + + # Build our schemalocation dictionary. + # + # Initialize it from values found in the parsed, input schemalocations + # (if there are any) and the schemaloc_dict parameter values (if there + # are any). + # + # If there is a schemalocation found in both the parsed schemalocs and + # the schema_loc dict, use the schemaloc_dict value. + d_sl = {} + for ns, loc in self.input_schemalocs.iteritems(): + d_sl[ns] = loc if ns not in schemaloc_dict else schemaloc_dict[ns] # Iterate over input/discovered namespaces for document and attempt - # to map them to schemalocations. Warn if unable to map ns to schemaloc. - for ns, _ in d_ns.iteritems(): + # to map them to schemalocations. Warn if the namespace should have a + # schemalocation and we can't find it anywhere. + for ns, _ in ns_dict.iteritems(): if ns in DEFAULT_STIX_SCHEMALOCATIONS: schemalocation = DEFAULT_STIX_SCHEMALOCATIONS[ns] d_sl[ns] = schemalocation @@ -102,9 +129,14 @@ def finalize(self, ns_dict=None, schemaloc_dict=None): "Unable to map namespace '%s' to schemaLocation" % ns ) - d_sl.update(schemaloc_dict) - self.finalized_schemalocs = d_sl - self.finalized_namespaces = d_ns + # Update our schemalocation dictionary with the schemalocs found in + # the 'collect' phase. This will overwrite input schemalocs and + # schemaloc_dict entries if there are collisions. + schemaloc_dict.update(d_sl) + + # Set the finalized attributes + self.finalized_namespaces = ns_dict + self.finalized_schemalocs = schemaloc_dict def collect(self, entity): # Traverse the MRO so we can collect _namespace attributes on Entity