Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions stix/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ def to_xml(self, include_namespaces=True, include_schemalocs=False,

"""

from .utils import nsparser
from mixbox.entities import NamespaceCollector

if (not auto_namespace) and (not ns_dict):
raise Exception(
"Auto-namespacing was disabled but ns_dict was empty "
"or missing."
)

ns_info = nsparser.NamespaceInfo()
ns_info = NamespaceCollector()

obj = self.to_obj(ns_info=ns_info if auto_namespace else None)

Expand Down
5 changes: 3 additions & 2 deletions stix/test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

import cybox.utils
from mixbox.binding_utils import ExternalEncoding
from mixbox.entities import NamespaceCollector

from stix.utils import NamespaceInfo, silence_warnings
from stix.utils import silence_warnings


@contextlib.contextmanager
Expand Down Expand Up @@ -97,7 +98,7 @@ def round_trip(o, output=False, list_=False):
o2 = klass.from_dict(d2)

# 5. Entity -> Bindings Object
ns_info = NamespaceInfo()
ns_info = NamespaceCollector()
xobj = o2.to_obj(ns_info=ns_info)

try:
Expand Down
63 changes: 1 addition & 62 deletions stix/test/utils/nsparser_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,72 +10,11 @@

# internal
import mixbox.namespaces
import stix
from stix.core import STIXPackage
from stix.utils import nsparser, silence_warnings


NSMAP = {
"test:a": "a",
"test:b": "b",
"test:c": "c"
}


SCHEMALOCS = {
"test:a": "/dev/null",
"test:b": "/dev/null",
"test:c": "/dev/null"
}


class A(stix.Entity):
_namespace = nsparser.NS_STIX_OBJECT.name
_XSI_TYPE = "a:AType"


class B(A):
_namespace = nsparser.NS_STIXCOMMON_OBJECT.name
_XSI_TYPE = "b:BType"


class C(B):
_namespace = nsparser.NS_INDICATOR_OBJECT.name
_XSI_TYPE = "c:CType"
from stix.utils import silence_warnings


class NamespaceInfoTests(unittest.TestCase):
def test_nsinfo_collect(self):
"""Tests that the NamespaceInfo.collect() method correctly ascends the MRO
of input objects.

"""
nsinfo = nsparser.NamespaceInfo()

# Collect classes
nsinfo.collect(C())

# Parse collected classes
nsinfo._parse_collected_classes()

self.assertEqual(len(nsinfo._collected_namespaces), 3) # noqa

def test_namespace_collect(self):
"""Test that NamespaceInfo correctly pulls namespaces from all classes
in an objects MRO.

"""
nsinfo = nsparser.NamespaceInfo()

# Collect classes
nsinfo.collect(C())

# finalize the namespace dictionary
nsinfo.finalize(ns_dict=NSMAP, schemaloc_dict=SCHEMALOCS)
namespaces = nsinfo.binding_namespaces.keys()

self.assertTrue(all(ns in namespaces for ns in NSMAP.iterkeys()))


@silence_warnings
def test_user_provided_ns(self):
Expand Down
1 change: 1 addition & 0 deletions stix/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import contextlib
import functools
import keyword
import warnings

import lxml.etree

Expand Down
248 changes: 0 additions & 248 deletions stix/utils/nsparser.py
Original file line number Diff line number Diff line change
@@ -1,256 +1,8 @@
# Copyright (c) 2015, The MITRE Corporation. All rights reserved.
# See LICENSE.txt for complete terms.

import itertools
import warnings

from mixbox import idgen
from mixbox.entities import Entity
import mixbox.namespaces

# internal
import stix


class NamespaceInfo(object):

def __init__(self):
# Namespaces that are "collected" from the Python objects during
# serialization. This will be a (mixbox) NamespaceSet.
self._collected_namespaces = None

# Namespaces and schemalocations that are attached to STIX/CybOX
# entities when parsed from an external source.
self._input_namespaces = {}
self._input_schemalocs = {}

# A list of classes that have been visited/seen during the namespace
# collection process. This speeds up the collect() method.
self._collected_classes = set()

# Namespaces and schemalocations that will appear in the output
# XML document.
self.finalized_schemalocs = None

# Namespace dictionary that gets passed to the bindings.
self.binding_namespaces = None

def update(self, ns_info):
self._collected_namespaces.update(ns_info._collected_namespaces) # noqa
self._input_namespaces.update(ns_info._input_namespaces) # noqa
self._input_schemalocs.update(ns_info._input_schemalocs) # noqa

def _parse_collected_classes(self):
collected = self._collected_classes
entity_klasses = (stix.Entity, Entity)

# Generator which yields all stix.Entity and mixbox.Entity subclasses
# that were collected.
entity_subclasses = (
klass for klass in collected if issubclass(klass, entity_klasses)
)

alias_to_ns_uri = {}
no_alias_ns_uris = []
for klass in entity_subclasses:
# Prevents exception being raised if/when
# collections.MutableSequence or another base class appears in the
# MRO.
ns = getattr(klass, "_namespace", None)
if not ns:
continue

# cybox.objects.* ObjectProperties derivations have an _XSI_NS
# class-level attribute which holds the namespace alias to be
# used for its namespace.
alias = getattr(klass, "_XSI_NS", None)
if alias:
alias_to_ns_uri[alias] = ns
continue

# Many stix/cybox entity classes have an _XSI_TYPE attribute that
# contains a `prefix:namespace` formatted QNAME for the
# associated xsi:type.
xsi_type = getattr(klass, "_XSI_TYPE", None)
if not xsi_type:
no_alias_ns_uris.append(ns)
continue

# Attempt to split the xsi:type attribute value into the ns alias
# and the typename.
typeinfo = xsi_type.split(":")
if len(typeinfo) == 2:
alias_to_ns_uri[typeinfo[0]] = ns
else:
no_alias_ns_uris.append(ns)

# Unrecognized namespace URIs will cause an error at this stage.
self._collected_namespaces = mixbox.namespaces.make_namespace_subset_from_uris(
itertools.chain(alias_to_ns_uri.itervalues(), no_alias_ns_uris)
)

# For some reason, prefixes are specified in API class vars and also in
# our big namespace tables. From python-cybox issue #274 [1], I
# conclude that the tables may take priority here. And those are
# already going to be preferred at this point. So the only thing I can
# think to do with class var values is fill in any missing prefixes
# we may have (but I doubt there will be any).
#
# 1. https://github.com/CybOXProject/python-cybox/issues/274
for prefix, ns_uri in alias_to_ns_uri.iteritems():
if self._collected_namespaces.preferred_prefix_for_namespace(ns_uri) is None:
self._collected_namespaces.set_preferred_prefix_for_namespace(
ns_uri, prefix, True)

def _fix_example_namespace(self):
"""Attempts to resolve issues where our samples use
'http://example.com/' for our example namespace but python-stix uses
'http://example.com' by removing the former.

"""
example_prefix = 'example' # Example ns prefix
idgen_prefix = idgen.get_id_namespace_prefix()

# If the ID namespace alias doesn't match the example alias, return.
if idgen_prefix != example_prefix:
return

# If the example namespace prefix isn't in the parsed namespace
# prefixes, return.
if example_prefix not in self._input_namespaces:
return

self._input_namespaces[example_prefix] = idgen.EXAMPLE_NAMESPACE.name

def _finalize_namespaces(self, ns_dict=None):
"""Returns a dictionary of namespaces to be exported with an XML
document.

This loops over all the namespaces that were discovered and built
during the execution of ``collect()`` and
``_parse_collected_classes()`` and attempts to merge them all.

Raises:
mixbox.namespaces.DuplicatePrefixError: If namespace prefix was
mapped to more than one namespace.

"""

if ns_dict:
# Add the user's entries to our set
for ns, alias in ns_dict.iteritems():
self._collected_namespaces.add_namespace_uri(ns, alias)

# Add the ID namespaces
self._collected_namespaces.add_namespace_uri(
idgen.get_id_namespace(),
idgen.get_id_namespace_alias()
)

# Remap the example namespace to the one expected by the APIs if the
# sample example namespace is found.
self._fix_example_namespace()

# Add _input_namespaces
for prefix, uri in self._input_namespaces.iteritems():
self._collected_namespaces.add_namespace_uri(uri, prefix)

# Add some default XML namespaces to make sure they're there.
self._collected_namespaces.import_from(mixbox.namespaces.XML_NAMESPACES)

# python-stix's generateDS-generated binding classes can't handle
# default namespaces. So make sure there are no preferred defaults in
# the set. Get prefixes from the global namespace set if we have to.
for ns_uri in self._collected_namespaces.namespace_uris:
if self._collected_namespaces.preferred_prefix_for_namespace(ns_uri) is None:
prefixes = self._collected_namespaces.get_prefixes(ns_uri)
if len(prefixes) > 0:
prefix = next(iter(prefixes))
else:
prefix = mixbox.namespaces.lookup_name(ns_uri)

if prefix is None:
raise mixbox.namespaces.NoPrefixesError(ns_uri)

self._collected_namespaces.set_preferred_prefix_for_namespace(
ns_uri, prefix, True)

def _finalize_schemalocs(self, schemaloc_dict=None):
# If schemaloc_dict was passed in, make a copy so we don't mistakenly
# modify the original.
if schemaloc_dict:
schemaloc_dict = dict(schemaloc_dict.iteritems())
else:
schemaloc_dict = {}

# Build our schemalocation dictionary!
#
# Initialize it from values found in the parsed, input schemalocations
# (if there are any) and the schemaloc_dict parameter values (if there
# are any).
#
# If there is a schemalocation found in both the parsed schemalocs and
# the schema_loc dict, use the schemaloc_dict value.
for ns, loc in self._input_schemalocs.iteritems():
if ns not in schemaloc_dict:
schemaloc_dict[ns] = loc

# Now use the merged dict to update any schema locations we don't
# already have.
for ns, loc in schemaloc_dict.iteritems():
if self._collected_namespaces.contains_namespace(ns) and \
self._collected_namespaces.get_schema_location(ns) is None:
self._collected_namespaces.set_schema_location(ns, loc)

# Warn if we are missing any schemalocations
id_ns = idgen.get_id_namespace()
for ns in self._collected_namespaces.namespace_uris:
if self._collected_namespaces.get_schema_location(ns) is None:
if ns == id_ns or \
mixbox.namespaces.XML_NAMESPACES.contains_namespace(ns) or \
ns in schemaloc_dict:
continue

error = "Unable to map namespace '{0}' to schemaLocation"
warnings.warn(error.format(ns))

def finalize(self, ns_dict=None, schemaloc_dict=None):
self._parse_collected_classes()
self._finalize_namespaces(ns_dict)
self._finalize_schemalocs(schemaloc_dict)

self.finalized_schemalocs = \
self._collected_namespaces.get_uri_schemaloc_map()
self.binding_namespaces = \
self._collected_namespaces.get_uri_prefix_map()

def get_xmlns_string(self, delim):
if self._collected_namespaces is None:
return ""
return self._collected_namespaces.get_xmlns_string(
preferred_prefixes_only=False, delim=delim
)

def get_schema_location_string(self, delim):
if self._collected_namespaces is None:
return ""
return self._collected_namespaces.get_schemaloc_string(delim=delim)

def collect(self, entity):
# Collect all the classes we need to inspect for namespace information
self._collected_classes.update(entity.__class__.__mro__)

# Collect the input namespaces if this entity came from some external
# source.
if hasattr(entity, "__input_namespaces__"):
self._input_namespaces.update(entity.__input_namespaces__)

# Collect the input schemalocation information if this entity came
# from some external source.
if hasattr(entity, "__input_schemalocations__"):
self._input_schemalocs.update(entity.__input_schemalocations__)


Namespace = mixbox.namespaces.Namespace

NS_CAMPAIGN_OBJECT = Namespace("http://stix.mitre.org/Campaign-1", "campaign", "http://stix.mitre.org/XMLSchema/campaign/1.2/campaign.xsd")
Expand Down