Skip to content

Commit

Permalink
Merge branch 'master' of github.com:oasis-open/cti-python-stix2 into …
Browse files Browse the repository at this point in the history
…main
  • Loading branch information
chrisr3d committed Jun 24, 2020
2 parents ca61b06 + 9d05c9d commit 808dd94
Show file tree
Hide file tree
Showing 21 changed files with 1,180 additions and 442 deletions.
2 changes: 1 addition & 1 deletion stix2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
RepeatQualifier, StartStopQualifier, StringConstant, TimestampConstant,
WithinQualifier,
)
from .utils import new_version, revoke
from .v20 import * # This import will always be the latest STIX 2.X version
from .version import __version__
from .versioning import new_version, revoke

_collect_stix2_mappings()
221 changes: 148 additions & 73 deletions stix2/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from .utils import (
NOW, PREFIX_21_REGEX, find_property_index, format_datetime, get_timestamp,
)
from .utils import new_version as _new_version
from .utils import revoke as _revoke
from .versioning import new_version as _new_version
from .versioning import revoke as _revoke

try:
from collections.abc import Mapping
Expand Down Expand Up @@ -351,23 +351,20 @@ class _Observable(_STIXBase):
def __init__(self, **kwargs):
# the constructor might be called independently of an observed data object
self._STIXBase__valid_refs = kwargs.pop('_valid_refs', [])

self._allow_custom = kwargs.get('allow_custom', False)
self._properties['extensions'].allow_custom = kwargs.get('allow_custom', False)
super(_Observable, self).__init__(**kwargs)

try:
# Since `spec_version` is optional, this is how we check for a 2.1 SCO
self._id_contributing_properties
if 'id' not in kwargs and not isinstance(self, stix2.v20._Observable):
# Specific to 2.1+ observables: generate a deterministic ID
id_ = self._generate_id()

if 'id' not in kwargs:
possible_id = self._generate_id(kwargs)
if possible_id is not None:
kwargs['id'] = possible_id
except AttributeError:
# End up here if handling a 2.0 SCO, and don't need to do anything further
pass

super(_Observable, self).__init__(**kwargs)
# Spec says fall back to UUIDv4 if no contributing properties were
# given. That's what already happened (the following is actually
# overwriting the default uuidv4), so nothing to do here.
if id_ is not None:
# Can't assign to self (we're immutable), so slip the ID in
# more sneakily.
self._inner["id"] = id_

def _check_ref(self, ref, prop, prop_name):
"""
Expand Down Expand Up @@ -413,42 +410,53 @@ def _check_property(self, prop_name, prop, kwargs):
for ref in kwargs[prop_name]:
self._check_ref(ref, prop, prop_name)

def _generate_id(self, kwargs):
required_prefix = self._type + "--"

properties_to_use = self._id_contributing_properties
if properties_to_use:
streamlined_object = {}
if "hashes" in kwargs and "hashes" in properties_to_use:
possible_hash = _choose_one_hash(kwargs["hashes"])
if possible_hash:
streamlined_object["hashes"] = possible_hash
for key in properties_to_use:
if key != "hashes" and key in kwargs:
if isinstance(kwargs[key], dict) or isinstance(kwargs[key], _STIXBase):
temp_deep_copy = copy.deepcopy(dict(kwargs[key]))
_recursive_stix_to_dict(temp_deep_copy)
streamlined_object[key] = temp_deep_copy
elif isinstance(kwargs[key], list):
temp_deep_copy = copy.deepcopy(kwargs[key])
_recursive_stix_list_to_dict(temp_deep_copy)
streamlined_object[key] = temp_deep_copy
else:
streamlined_object[key] = kwargs[key]
if streamlined_object:
data = canonicalize(streamlined_object, utf8=False)

# The situation is complicated w.r.t. python 2/3 behavior, so
# I'd rather not rely on particular exceptions being raised to
# determine what to do. Better to just check the python version
# directly.
if six.PY3:
return required_prefix + six.text_type(uuid.uuid5(SCO_DET_ID_NAMESPACE, data))
def _generate_id(self):
"""
Generate a UUIDv5 for this observable, using its "ID contributing
properties".
:return: The ID, or None if no ID contributing properties are set
"""

id_ = None
json_serializable_object = {}

for key in self._id_contributing_properties:

if key in self:
obj_value = self[key]

if key == "hashes":
serializable_value = _choose_one_hash(obj_value)

if serializable_value is None:
raise InvalidValueError(
self, key, "No hashes given",
)

else:
return required_prefix + six.text_type(uuid.uuid5(SCO_DET_ID_NAMESPACE, data.encode("utf-8")))
serializable_value = _make_json_serializable(obj_value)

# We return None if there are no values specified for any of the id-contributing-properties
return None
json_serializable_object[key] = serializable_value

if json_serializable_object:

data = canonicalize(json_serializable_object, utf8=False)

# The situation is complicated w.r.t. python 2/3 behavior, so
# I'd rather not rely on particular exceptions being raised to
# determine what to do. Better to just check the python version
# directly.
if six.PY3:
uuid_ = uuid.uuid5(SCO_DET_ID_NAMESPACE, data)
else:
uuid_ = uuid.uuid5(
SCO_DET_ID_NAMESPACE, data.encode("utf-8"),
)

id_ = "{}--{}".format(self._type, six.text_type(uuid_))

return id_


class _Extension(_STIXBase):
Expand All @@ -472,35 +480,102 @@ def _choose_one_hash(hash_dict):
if k is not None:
return {k: hash_dict[k]}

return None


def _cls_init(cls, obj, kwargs):
if getattr(cls, '__init__', object.__init__) is not object.__init__:
cls.__init__(obj, **kwargs)


def _recursive_stix_to_dict(input_dict):
for key in input_dict:
if isinstance(input_dict[key], dict):
_recursive_stix_to_dict(input_dict[key])
elif isinstance(input_dict[key], _STIXBase):
input_dict[key] = dict(input_dict[key])
def _make_json_serializable(value):
"""
Make the given value JSON-serializable; required for the JSON canonicalizer
to work. This recurses into lists/dicts, converts stix objects to dicts,
etc. "Convenience" types this library uses as property values are
JSON-serialized to produce a JSON-serializable value. (So you will always
get strings for those.)
The conversion will not affect the passed in value.
:param value: The value to make JSON-serializable.
:return: The JSON-serializable value.
:raises ValueError: If value is None (since nulls are not allowed in STIX
objects).
"""
if value is None:
raise ValueError("Illegal null value found in a STIX object")

json_value = value # default assumption

if isinstance(value, Mapping):
json_value = {
k: _make_json_serializable(v)
for k, v in value.items()
}

elif isinstance(value, list):
json_value = [
_make_json_serializable(v)
for v in value
]

elif not isinstance(value, (int, float, six.string_types, bool)):
# If a "simple" value which is not already JSON-serializable,
# JSON-serialize to a string and use that as our JSON-serializable
# value. This applies to our datetime objects currently (timestamp
# properties), and could apply to any other "convenience" types this
# library uses for property values in the future.
json_value = json.dumps(value, ensure_ascii=False, cls=STIXJSONEncoder)

# If it looks like a string literal was output, strip off the quotes.
# Otherwise, a second pair will be added when it's canonicalized. Also
# to be extra safe, we need to unescape.
if len(json_value) >= 2 and \
json_value[0] == '"' and json_value[-1] == '"':
json_value = _un_json_escape(json_value[1:-1])

return json_value


_JSON_ESCAPE_RE = re.compile(r"\\.")
# I don't think I should need to worry about the unicode escapes (\uXXXX)
# since I use ensure_ascii=False when generating it. I will just fix all
# the other escapes, e.g. \n, \r, etc.
#
# This list is taken from RFC8259 section 7:
# https://tools.ietf.org/html/rfc8259#section-7
# Maps the second char of a "\X" style escape, to a replacement char
_JSON_ESCAPE_MAP = {
'"': '"',
"\\": "\\",
"/": "/",
"b": "\b",
"f": "\f",
"n": "\n",
"r": "\r",
"t": "\t",
}


def _un_json_escape(json_string):
"""
Removes JSON string literal escapes. We should undo these things Python's
serializer does, so we can ensure they're done canonically. The
canonicalizer should be in charge of everything, as much as is feasible.
# There may stil be nested _STIXBase objects
_recursive_stix_to_dict(input_dict[key])
elif isinstance(input_dict[key], list):
_recursive_stix_list_to_dict(input_dict[key])
else:
pass
:param json_string: String literal output of Python's JSON serializer,
minus the surrounding quotes.
:return: The unescaped string
"""

def replace(m):
replacement = _JSON_ESCAPE_MAP.get(m.group(0)[1])
if replacement is None:
raise ValueError("Unrecognized JSON escape: " + m.group(0))

def _recursive_stix_list_to_dict(input_list):
for i in range(len(input_list)):
if isinstance(input_list[i], _STIXBase):
input_list[i] = dict(input_list[i])
elif isinstance(input_list[i], dict):
pass
elif isinstance(input_list[i], list):
_recursive_stix_list_to_dict(input_list[i])
else:
continue
_recursive_stix_to_dict(input_list[i])
return replacement

result = _JSON_ESCAPE_RE.sub(replace, json_string)

return result
3 changes: 2 additions & 1 deletion stix2/markings/granular_markings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from stix2 import exceptions
from stix2.markings import utils
from stix2.utils import is_marking, new_version
from stix2.utils import is_marking
from stix2.versioning import new_version


def get_markings(obj, selectors, inherited=False, descendants=False, marking_ref=True, lang=True):
Expand Down
2 changes: 1 addition & 1 deletion stix2/markings/object_markings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from stix2 import exceptions
from stix2.markings import utils
from stix2.utils import new_version
from stix2.versioning import new_version


def get_markings(obj):
Expand Down
13 changes: 13 additions & 0 deletions stix2/pattern_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ def remove_terminal_nodes(parse_tree_nodes):
return values


_TIMESTAMP_RE = re.compile(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d{1,6})?Z')


def check_for_valid_timetamp_syntax(timestamp_string):
return _TIMESTAMP_RE.match(timestamp_string)



Expand Down Expand Up @@ -214,6 +219,14 @@ def visitPropTestParen(self, ctx):
# Visit a parse tree produced by STIXPatternParser#startStopQualifier.
def visitStartStopQualifier(self, ctx):
children = self.visitChildren(ctx)
# 2.0 parser will accept any string, need to make sure it is a full STIX timestamp
if isinstance(children[1], StringConstant):
if not check_for_valid_timetamp_syntax(children[1].value):
raise (ValueError("Start time is not a legal timestamp"))
if isinstance(children[3], StringConstant):
if not check_for_valid_timetamp_syntax(children[3].value):
raise (ValueError("Stop time is not a legal timestamp"))

return StartStopQualifier(children[1], children[3])

# Visit a parse tree produced by STIXPatternParser#withinQualifier.
Expand Down
4 changes: 4 additions & 0 deletions stix2/patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,12 +669,16 @@ def __init__(self, start_time, stop_time):
self.start_time = start_time
elif isinstance(start_time, datetime.date):
self.start_time = TimestampConstant(start_time)
elif isinstance(start_time, StringConstant):
self.start_time = StringConstant(start_time.value)
else:
raise ValueError("%s is not a valid argument for a Start/Stop Qualifier" % start_time)
if isinstance(stop_time, TimestampConstant):
self.stop_time = stop_time
elif isinstance(stop_time, datetime.date):
self.stop_time = TimestampConstant(stop_time)
elif isinstance(stop_time, StringConstant):
self.stop_time = StringConstant(stop_time.value)
else:
raise ValueError("%s is not a valid argument for a Start/Stop Qualifier" % stop_time)

Expand Down
9 changes: 4 additions & 5 deletions stix2/properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
MutuallyExclusivePropertiesError,
)
from .parsing import STIX2_OBJ_MAPS, parse, parse_observable
from .utils import (
TYPE_21_REGEX, TYPE_REGEX, _get_dict, get_class_hierarchy_names,
parse_into_datetime,
)
from .utils import _get_dict, get_class_hierarchy_names, parse_into_datetime

ID_REGEX_interoperability = re.compile(r"[0-9a-fA-F]{8}-"
"[0-9a-fA-F]{4}-"
Expand All @@ -33,6 +30,8 @@
except ImportError:
from collections import Mapping, defaultdict

TYPE_REGEX = re.compile(r'^\-?[a-z0-9]+(-[a-z0-9]+)*\-?$')
TYPE_21_REGEX = re.compile(r'^([a-z][a-z0-9]*)+(-[a-z0-9]+)*\-?$')
ERROR_INVALID_ID = (
"not a valid STIX identifier, must match <object-type>--<UUID>: {}"
)
Expand Down Expand Up @@ -547,7 +546,7 @@ def enumerate_types(types, spec_version):
return return_types


SELECTOR_REGEX = re.compile(r"^[a-z0-9_-]{3,250}(\.(\[\d+\]|[a-z0-9_-]{1,250}))*$")
SELECTOR_REGEX = re.compile(r"^([a-z0-9_-]{3,250}(\.(\[\d+\]|[a-z0-9_-]{1,250}))*|id)$")


class SelectorProperty(Property):
Expand Down
14 changes: 14 additions & 0 deletions stix2/test/v20/test_granular_markings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1089,3 +1089,17 @@ def test_clear_marking_not_present(data):
"""Test clearing markings for a selector that has no associated markings."""
with pytest.raises(MarkingNotFoundError):
data = markings.clear_markings(data, ["labels"])


def test_set_marking_on_id_property():
malware = Malware(
granular_markings=[
{
"selectors": ["id"],
"marking_ref": MARKING_IDS[0],
},
],
**MALWARE_KWARGS
)

assert "id" in malware["granular_markings"][0]["selectors"]
1 change: 1 addition & 0 deletions stix2/test/v20/test_object_markings.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
MALWARE_KWARGS = MALWARE_KWARGS_CONST.copy()
MALWARE_KWARGS.update({
'id': MALWARE_ID,
'type': 'malware',
'created': FAKE_TIME,
'modified': FAKE_TIME,
})
Expand Down
Loading

0 comments on commit 808dd94

Please sign in to comment.