Skip to content

Commit

Permalink
Merge pull request #238 from STIXProject/namespaces
Browse files Browse the repository at this point in the history
Namespaces
  • Loading branch information
gtback committed Feb 25, 2015
2 parents 2c19267 + 3d56f0b commit 0ef159f
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 31 deletions.
58 changes: 43 additions & 15 deletions stix/base.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,46 @@
# Copyright (c) 2015, The MITRE Corporation. All rights reserved.
# See LICENSE.txt for complete terms.

# builtin
import itertools
import collections
import StringIO
import json
from StringIO import StringIO

# external
from lxml import etree

# internal
import stix.bindings as bindings

# lazy imports
cybox = None
cybox_common = None
utils = None
nsparser = None

__LAZY_MODS_LOADED = False


def _load_lazy_mods():
global cybox, cybox_common, utils, nsparser
global __LAZY_MODS_LOADED

if __LAZY_MODS_LOADED:
return

if not cybox:
import cybox
if not cybox_common:
import cybox.common as cybox_common
if not utils:
import stix.utils as utils
if not nsparser:
import stix.utils.nsparser as nsparser

__LAZY_MODS_LOADED = True


def _override(*args, **kwargs):
raise NotImplementedError()

Expand Down Expand Up @@ -77,14 +109,11 @@ def to_xml(self, include_namespaces=True, include_schemalocs=True,
:class:`Entity` instance. Default character encoding is ``utf-8``.
"""
from stix.utils.nsparser import (
NamespaceParser, NamespaceInfo, DEFAULT_STIX_NAMESPACES
)

parser = NamespaceParser()
_load_lazy_mods()
parser = nsparser.NamespaceParser()

if auto_namespace:
ns_info = NamespaceInfo()
ns_info = nsparser.NamespaceInfo()
else:
ns_info = None

Expand All @@ -97,16 +126,16 @@ def to_xml(self, include_namespaces=True, include_schemalocs=True,
)

if auto_namespace:
ns_info.finalize()
ns_info.finalize(ns_dict=ns_dict, schemaloc_dict=schemaloc_dict)
obj_ns_dict = ns_info.finalized_namespaces
else:
ns_info = NamespaceInfo()
ns_info = nsparser.NamespaceInfo()
ns_info.finalized_namespaces = ns_dict or {}
ns_info.finalized_schemalocs = schemaloc_dict or {}
obj_ns_dict = dict(
itertools.chain(
ns_dict.iteritems(),
DEFAULT_STIX_NAMESPACES.iteritems()
nsparser.DEFAULT_STIX_NAMESPACES.iteritems()
)
)

Expand All @@ -123,7 +152,7 @@ def to_xml(self, include_namespaces=True, include_schemalocs=True,
namespace_def = namespace_def.replace('\n\t', ' ')

with bindings.save_encoding(encoding):
sio = StringIO()
sio = StringIO.StringIO()
obj.export(
sio.write, # output buffer
0, # output level
Expand Down Expand Up @@ -213,11 +242,10 @@ def dict_from_object(cls, entity_obj):
return cls.from_obj(entity_obj).to_dict()

def walk(self):
from cybox import Entity as cyboxEntity
from cybox.common import ObjectProperties
_load_lazy_mods()

yieldable = (Entity, cyboxEntity)
skip = {ObjectProperties : '_parent'}
yieldable = (Entity, cybox.cyboxEntity)
skip = {cybox_common.ObjectProperties : '_parent'}

def can_skip(obj, field):
for klass, prop in skip.iteritems():
Expand Down
64 changes: 48 additions & 16 deletions stix/utils/nsparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ def update(self, ns_info):
self.input_schemalocs.update(ns_info.input_schemalocs)

def finalize(self, ns_dict=None, schemaloc_dict=None):
if not ns_dict:
ns_dict = {}

if not schemaloc_dict:
schemaloc_dict = {}
ns_dict = dict(ns_dict.iteritems()) if ns_dict else {}
schemaloc_dict = dict(schemaloc_dict.iteritems()) if schemaloc_dict else {}

id_ns = get_id_namespace()
id_ns_alias = get_id_namespace_alias()

# Baseline namespaces: these appear in every document
d_ns = {
'http://www.w3.org/2001/XMLSchema-instance': 'xsi',
'http://stix.mitre.org/stix-1': 'stix',
Expand All @@ -44,32 +42,49 @@ def finalize(self, ns_dict=None, schemaloc_dict=None):
id_ns: id_ns_alias
}

# Iterate over the namespaces collected during a parse of the package.
# If a namespace is not a STIX/CybOX/MAEC/XML namespace, include
# the namespace->alias mapping.
for ns, alias in self.input_namespaces.iteritems():
if ns not in DEFAULT_STIX_NAMESPACES:
d_ns[ns] = alias

# Iterate over the 'collected' namespaces which were found on every
# python-stix|cybox|maec object in this package. If it has an alias
# defined, use it. Otherwise, look up the alias in our default dicts.
for ns, alias in self.namespaces.iteritems():
if alias:
d_ns[ns] = alias
else:
default_alias = DEFAULT_STIX_NAMESPACES[ns]
d_ns[ns] = default_alias

d_ns.update(ns_dict)
# Update the input dictionary with our processed input/collected
# namespaces. This will overwrite any of the ns_dict namespace mappings
# with those expected/defined by the APIs and bindings.
#
# This will be our finalized_namespaces value.
ns_dict.update(d_ns)

# Attempts to resolve issues where our samples use
# 'http://example.com/' for our example namespace but python-stix uses
# 'http://example.com' by removing the former.
examples = (
('http://example.com/' in d_ns),
('http://example.com' in d_ns)
('http://example.com/' in ns_dict),
('http://example.com' in ns_dict)
)

# If we found both example namespaces, remove the one with a slash
# at the end, because our default ID namespace doesn't have a slash.
if all(examples):
del d_ns['http://example.com/']
del ns_dict['http://example.com/']


# Attempt to identify duplicate namespace aliases. This will render
# an invalid XML document. Raise a Python warning if duplicates are
# found.
aliases = {}
for ns, alias in d_ns.iteritems():
for ns, alias in ns_dict.iteritems():
if alias not in aliases:
aliases[alias] = ns
else:
Expand All @@ -79,11 +94,23 @@ def finalize(self, ns_dict=None, schemaloc_dict=None):
message = message.format(alias, ns, aliases[alias])
warnings.warn(message)

d_sl = dict(self.input_schemalocs.items())

# Build our schemalocation dictionary.
#
# Initialize it from values found in the parsed, input schemalocations
# (if there are any) and the schemaloc_dict parameter values (if there
# are any).
#
# If there is a schemalocation found in both the parsed schemalocs and
# the schema_loc dict, use the schemaloc_dict value.
d_sl = {}
for ns, loc in self.input_schemalocs.iteritems():
d_sl[ns] = loc if ns not in schemaloc_dict else schemaloc_dict[ns]

# Iterate over input/discovered namespaces for document and attempt
# to map them to schemalocations. Warn if unable to map ns to schemaloc.
for ns, _ in d_ns.iteritems():
# to map them to schemalocations. Warn if the namespace should have a
# schemalocation and we can't find it anywhere.
for ns, _ in ns_dict.iteritems():
if ns in DEFAULT_STIX_SCHEMALOCATIONS:
schemalocation = DEFAULT_STIX_SCHEMALOCATIONS[ns]
d_sl[ns] = schemalocation
Expand All @@ -102,9 +129,14 @@ def finalize(self, ns_dict=None, schemaloc_dict=None):
"Unable to map namespace '%s' to schemaLocation" % ns
)

d_sl.update(schemaloc_dict)
self.finalized_schemalocs = d_sl
self.finalized_namespaces = d_ns
# Update our schemalocation dictionary with the schemalocs found in
# the 'collect' phase. This will overwrite input schemalocs and
# schemaloc_dict entries if there are collisions.
schemaloc_dict.update(d_sl)

# Set the finalized attributes
self.finalized_namespaces = ns_dict
self.finalized_schemalocs = schemaloc_dict

def collect(self, entity):
# Traverse the MRO so we can collect _namespace attributes on Entity
Expand Down

0 comments on commit 0ef159f

Please sign in to comment.