Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 43 additions & 15 deletions stix/base.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,46 @@
# Copyright (c) 2014, The MITRE Corporation. All rights reserved.
# See LICENSE.txt for complete terms.

# builtin
import itertools
import collections
import StringIO
import json
from StringIO import StringIO

# external
from lxml import etree

# internal
import stix.bindings as bindings

# lazy imports
cybox = None
cybox_common = None
utils = None
nsparser = None

__LAZY_MODS_LOADED = False


def _load_lazy_mods():
global cybox, cybox_common, utils, nsparser
global __LAZY_MODS_LOADED

if __LAZY_MODS_LOADED:
return

if not cybox:
import cybox
if not cybox_common:
import cybox.common as cybox_common
if not utils:
import stix.utils as utils
if not nsparser:
import stix.utils.nsparser as nsparser

__LAZY_MODS_LOADED = True


def _override(*args, **kwargs):
raise NotImplementedError()

Expand Down Expand Up @@ -77,14 +109,11 @@ def to_xml(self, include_namespaces=True, include_schemalocs=True,
:class:`Entity` instance. Default character encoding is ``utf-8``.

"""
from stix.utils.nsparser import (
NamespaceParser, NamespaceInfo, DEFAULT_STIX_NAMESPACES
)

parser = NamespaceParser()
_load_lazy_mods()
parser = nsparser.NamespaceParser()

if auto_namespace:
ns_info = NamespaceInfo()
ns_info = nsparser.NamespaceInfo()
else:
ns_info = None

Expand All @@ -97,16 +126,16 @@ def to_xml(self, include_namespaces=True, include_schemalocs=True,
)

if auto_namespace:
ns_info.finalize()
ns_info.finalize(ns_dict=ns_dict, schemaloc_dict=schemaloc_dict)
obj_ns_dict = ns_info.finalized_namespaces
else:
ns_info = NamespaceInfo()
ns_info = nsparser.NamespaceInfo()
ns_info.finalized_namespaces = ns_dict or {}
ns_info.finalized_schemalocs = schemaloc_dict or {}
obj_ns_dict = dict(
itertools.chain(
ns_dict.iteritems(),
DEFAULT_STIX_NAMESPACES.iteritems()
nsparser.DEFAULT_STIX_NAMESPACES.iteritems()
)
)

Expand All @@ -123,7 +152,7 @@ def to_xml(self, include_namespaces=True, include_schemalocs=True,
namespace_def = namespace_def.replace('\n\t', ' ')

with bindings.save_encoding(encoding):
sio = StringIO()
sio = StringIO.StringIO()
obj.export(
sio.write, # output buffer
0, # output level
Expand Down Expand Up @@ -213,11 +242,10 @@ def dict_from_object(cls, entity_obj):
return cls.from_obj(entity_obj).to_dict()

def walk(self):
from cybox import Entity as cyboxEntity
from cybox.common import ObjectProperties
_load_lazy_mods()

yieldable = (Entity, cyboxEntity)
skip = {ObjectProperties : '_parent'}
yieldable = (Entity, cybox.cyboxEntity)
skip = {cybox_common.ObjectProperties : '_parent'}

def can_skip(obj, field):
for klass, prop in skip.iteritems():
Expand Down
64 changes: 48 additions & 16 deletions stix/utils/nsparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ def update(self, ns_info):
self.input_schemalocs.update(ns_info.input_schemalocs)

def finalize(self, ns_dict=None, schemaloc_dict=None):
if not ns_dict:
ns_dict = {}

if not schemaloc_dict:
schemaloc_dict = {}
ns_dict = dict(ns_dict.iteritems()) if ns_dict else {}
schemaloc_dict = dict(schemaloc_dict.iteritems()) if schemaloc_dict else {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally find the previous style more intuitive, even though it's a couple more lines. I'm ok with this, though.


id_ns = get_id_namespace()
id_ns_alias = get_id_namespace_alias()

# Baseline namespaces: these appear in every document
d_ns = {
'http://www.w3.org/2001/XMLSchema-instance': 'xsi',
'http://stix.mitre.org/stix-1': 'stix',
Expand All @@ -44,32 +42,49 @@ def finalize(self, ns_dict=None, schemaloc_dict=None):
id_ns: id_ns_alias
}

# Iterate over the namespaces collected during a parse of the package.
# If a namespace is not a STIX/CybOX/MAEC/XML namespace, include
# the namespace->alias mapping.
for ns, alias in self.input_namespaces.iteritems():
if ns not in DEFAULT_STIX_NAMESPACES:
d_ns[ns] = alias

# Iterate over the 'collected' namespaces which were found on every
# python-stix|cybox|maec object in this package. If it has an alias
# defined, use it. Otherwise, look up the alias in our default dicts.
for ns, alias in self.namespaces.iteritems():
if alias:
d_ns[ns] = alias
else:
default_alias = DEFAULT_STIX_NAMESPACES[ns]
d_ns[ns] = default_alias

d_ns.update(ns_dict)
# Update the input dictionary with our processed input/collected
# namespaces. This will overwrite any of the ns_dict namespace mappings
# with those expected/defined by the APIs and bindings.
#
# This will be our finalized_namespaces value.
ns_dict.update(d_ns)

# Attempts to resolve issues where our samples use
# 'http://example.com/' for our example namespace but python-stix uses
# 'http://example.com' by removing the former.
examples = (
('http://example.com/' in d_ns),
('http://example.com' in d_ns)
('http://example.com/' in ns_dict),
('http://example.com' in ns_dict)
)

# If we found both example namespaces, remove the one with a slash
# at the end, because our default ID namespace doesn't have a slash.
if all(examples):
del d_ns['http://example.com/']
del ns_dict['http://example.com/']


# Attempt to identify duplicate namespace aliases. This will render
# an invalid XML document. Raise a Python warning if duplicates are
# found.
aliases = {}
for ns, alias in d_ns.iteritems():
for ns, alias in ns_dict.iteritems():
if alias not in aliases:
aliases[alias] = ns
else:
Expand All @@ -79,11 +94,23 @@ def finalize(self, ns_dict=None, schemaloc_dict=None):
message = message.format(alias, ns, aliases[alias])
warnings.warn(message)

d_sl = dict(self.input_schemalocs.items())

# Build our schemalocation dictionary.
#
# Initialize it from values found in the parsed, input schemalocations
# (if there are any) and the schemaloc_dict parameter values (if there
# are any).
#
# If there is a schemalocation found in both the parsed schemalocs and
# the schema_loc dict, use the schemaloc_dict value.
d_sl = {}
for ns, loc in self.input_schemalocs.iteritems():
d_sl[ns] = loc if ns not in schemaloc_dict else schemaloc_dict[ns]

# Iterate over input/discovered namespaces for document and attempt
# to map them to schemalocations. Warn if unable to map ns to schemaloc.
for ns, _ in d_ns.iteritems():
# to map them to schemalocations. Warn if the namespace should have a
# schemalocation and we can't find it anywhere.
for ns, _ in ns_dict.iteritems():
if ns in DEFAULT_STIX_SCHEMALOCATIONS:
schemalocation = DEFAULT_STIX_SCHEMALOCATIONS[ns]
d_sl[ns] = schemalocation
Expand All @@ -102,9 +129,14 @@ def finalize(self, ns_dict=None, schemaloc_dict=None):
"Unable to map namespace '%s' to schemaLocation" % ns
)

d_sl.update(schemaloc_dict)
self.finalized_schemalocs = d_sl
self.finalized_namespaces = d_ns
# Update our schemalocation dictionary with the schemalocs found in
# the 'collect' phase. This will overwrite input schemalocs and
# schemaloc_dict entries if there are collisions.
schemaloc_dict.update(d_sl)

# Set the finalized attributes
self.finalized_namespaces = ns_dict
self.finalized_schemalocs = schemaloc_dict

def collect(self, entity):
# Traverse the MRO so we can collect _namespace attributes on Entity
Expand Down