Permalink
Browse files

Create ArtifactWriter classes for writing ArtifactDefinitions #178

* Added JSON support for reading/writing artifacts.
  • Loading branch information...
pidydx authored and joachimmetz committed Aug 8, 2016
1 parent b68a60e commit adfe4498ab30eac2d3081e269d9182b75dae2149
Showing with 262 additions and 21 deletions.
  1. +37 −0 artifacts/artifact.py
  2. +36 −2 artifacts/reader.py
  3. +18 −18 artifacts/source_type.py
  4. +81 −0 artifacts/writer.py
  5. +1 −0 test_data/definitions.json
  6. +43 −1 tests/reader_test.py
  7. +46 −0 tests/writer_test.py
@@ -90,3 +90,40 @@ def AppendSource(self, type_indicator, attributes):

self.sources.append(source_object)
return source_object

def AsDict(self):
"""Represents an artifact as a dictionary.
Returns:
A dictionary containing the artifact attributes.
"""
sources = []
for source in self.sources:
source_definition = {
u'type': source.type_indicator,
u'attributes': source.AsDict()
}
if source.supported_os:
source_definition[u'supported_os'] = source.supported_os
if source.conditions:
source_definition[u'conditions'] = source.conditions
if source.returned_types:
source_definition[u'returned_types'] = source.returned_types
sources.append(source_definition)

artifact_definition = {
u'name': self.name,
u'doc': self.description,
u'sources': sources,
}
if self.labels:
artifact_definition[u'labels'] = self.labels
if self.supported_os:
artifact_definition[u'supported_os'] = self.supported_os
if self.provides:
artifact_definition[u'provides'] = self.provides
if self.conditions:
artifact_definition[u'conditions'] = self.conditions
if self.urls:
artifact_definition[u'urls'] = self.urls
return artifact_definition
@@ -4,7 +4,7 @@
import abc
import glob
import os

import json
import yaml

from artifacts import artifact
@@ -269,7 +269,7 @@ def ReadFile(self, filename):
Yields:
Artifact definitions (instances of ArtifactDefinition).
"""
with open(filename, 'rb') as file_object:
with open(filename, 'r') as file_object:
for artifact_definition in self.ReadFileObject(file_object):
yield artifact_definition

@@ -317,3 +317,37 @@ def ReadFileObject(self, file_object):

yield artifact_definition
last_artifact_definition = artifact_definition


class JsonArtifactsReader(ArtifactsReader):
"""Class that implements the JSON artifacts reader."""

def ReadFileObject(self, file_object):
"""Reads artifact definitions from a file-like object.
Args:
file_object: the file-like object to read from.
Yields:
Artifact definitions (instances of ArtifactDefinition).
Raises:
FormatError: if the format of the JSON artifact definition is not set
or incorrect.
"""
# TODO: add try, except?
json_definitions = json.loads(file_object.read())

last_artifact_definition = None
for json_definition in json_definitions:
try:
artifact_definition = self.ReadArtifactDefinitionValues(json_definition)
except errors.FormatError as exception:
error_location = u'At start'
if last_artifact_definition:
error_location = u'After: {0}'.format(last_artifact_definition.name)

raise errors.FormatError(u'{0} {1}'.format(error_location, exception))

yield artifact_definition
last_artifact_definition = artifact_definition
@@ -42,8 +42,8 @@ def type_indicator(self):
return self.TYPE_INDICATOR

@abc.abstractmethod
def CopyToDict(self):
"""Copies the source type to a dictionary.
def AsDict(self):
"""Represents a source type as a dictionary.
Returns:
A dictionary containing the source type attributes.
@@ -70,8 +70,8 @@ def __init__(self, names=None):
super(ArtifactGroupSourceType, self).__init__()
self.names = names

def CopyToDict(self):
"""Copies the source type to a dictionary.
def AsDict(self):
"""Represents a source type as a dictionary.
Returns:
A dictionary containing the source type attributes.
@@ -103,8 +103,8 @@ def __init__(self, paths=None, separator=u'/'):
self.paths = paths
self.separator = separator

def CopyToDict(self):
"""Copies the source type to a dictionary.
def AsDict(self):
"""Represents a source type as a dictionary.
Returns:
A dictionary containing the source type attributes.
@@ -138,8 +138,8 @@ def __init__(self, args=None, cmd=None):
self.args = args
self.cmd = cmd

def CopyToDict(self):
"""Copies the source type to a dictionary.
def AsDict(self):
"""Represents a source type as a dictionary.
Returns:
A dictionary containing the source type attributes.
@@ -171,8 +171,8 @@ def __init__(self, paths=None, separator=u'/'):
self.paths = paths
self.separator = separator

def CopyToDict(self):
"""Copies the source type to a dictionary.
def AsDict(self):
"""Represents a source type as a dictionary.
Returns:
A dictionary containing the source type attributes.
@@ -208,8 +208,8 @@ def __init__(self, paths=None, separator=u'/'):
self.paths = paths
self.separator = separator

def CopyToDict(self):
"""Copies the source type to a dictionary.
def AsDict(self):
"""Represents a source type as a dictionary.
Returns:
A dictionary containing the source type attributes.
@@ -255,8 +255,8 @@ def __init__(self, keys=None):
super(WindowsRegistryKeySourceType, self).__init__()
self.keys = keys

def CopyToDict(self):
"""Copies the source type to a dictionary.
def AsDict(self):
"""Represents a source type as a dictionary.
Returns:
A dictionary containing the source type attributes.
@@ -321,8 +321,8 @@ def __init__(self, key_value_pairs=None):
super(WindowsRegistryValueSourceType, self).__init__()
self.key_value_pairs = key_value_pairs

def CopyToDict(self):
"""Copies the source type to a dictionary.
def AsDict(self):
"""Represents a source type as a dictionary.
Returns:
A dictionary containing the source type attributes.
@@ -351,8 +351,8 @@ def __init__(self, query=None, base_object=None):
self.base_object = base_object
self.query = query

def CopyToDict(self):
"""Copies the source type to a dictionary.
def AsDict(self):
"""Represents a source type as a dictionary.
Returns:
A dictionary containing the source type attributes.
@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
"""The artifact writer objects."""

import abc
import json
import yaml


class BaseArtifactsWriter(object):
"""Class that implements the artifacts writer interface."""

@abc.abstractmethod
def WriteArtifactsFile(self, artifacts, filename):
"""Writes artifact definitions to a file.
Args:
artifacts: a list of ArtifactDefinition objects to be written.
filename: the filename to write artifacts to.
"""

@abc.abstractmethod
def FormatArtifacts(self, artifacts):
"""Formats artifacts to desired output format.
Args:
artifacts: an ArtifactDefinition instance or list of ArtifactDefinitions.
Returns:
formatted string of artifact definition.
"""


class ArtifactWriter(BaseArtifactsWriter):
"""Class that implements the artifacts writer interface."""

def WriteArtifactsFile(self, artifacts, filename):
"""Writes artifact definitions to a file.
Args:
artifacts: a list of ArtifactDefinition objects to be written.
filename: the filename to write artifacts to.
"""
with open(filename, 'w') as file_object:
file_object.write(self.FormatArtifacts(artifacts))


class JsonArtifactsWriter(ArtifactWriter):
"""Class that implements the JSON artifacts writer interface."""

def FormatArtifacts(self, artifacts):
"""Formats artifacts to desired output format.
Args:
artifacts: a list of ArtifactDefinitions.
Returns:
formatted string of artifact definition.
"""
artifact_definitions = [artifact.AsDict() for artifact in artifacts]
json_data = json.dumps(artifact_definitions)
return json_data


class YamlArtifactsWriter(ArtifactWriter):
"""Class that implements the YAML artifacts writer interface."""

def FormatArtifacts(self, artifacts):
"""Formats artifacts to desired output format.
Args:
artifacts: a list of ArtifactDefinitions.
Returns:
formatted string of artifact definition.
"""
# TODO: improve output formatting of yaml
artifact_definitions = [artifact.AsDict() for artifact in artifacts]
yaml_data = yaml.safe_dump_all(artifact_definitions)
return yaml_data
@@ -0,0 +1 @@
[{"conditions": ["os_major_version >= 6"], "name": "SecurityEventLogEvtx", "sources": [{"attributes": {"paths": ["%%environ_systemroot%%\\System32\\winevt\\Logs\\Security.evtx"]}, "type": "FILE"}], "supported_os": ["Windows"], "labels": ["Logs"], "doc": "Windows Security Event log for Vista or later systems.", "urls": ["http://www.forensicswiki.org/wiki/Windows_XML_Event_Log_(EVTX)"]}, {"name": "AllUsersProfileEnvironmentVariable", "sources": [{"attributes": {"keys": ["HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows NT\\CurrentVersion\\ProfileList\\ProfilesDirectory", "HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows NT\\CurrentVersion\\ProfileList\\AllUsersProfile"]}, "type": "REGISTRY_KEY"}], "provides": ["environ_allusersprofile"], "supported_os": ["Windows"], "doc": "The %AllUsersProfile% environment variable.", "urls": ["http://support.microsoft.com/kb//214653"]}, {"name": "CurrentControlSet", "sources": [{"attributes": {"key_value_pairs": [{"value": "Current", "key": "HKEY_LOCAL_MACHINE\\SYSTEM\\Select"}]}, "type": "REGISTRY_VALUE"}], "provides": ["current_control_set"], "supported_os": ["Windows"], "doc": "The control set the system is currently using.", "urls": ["https://code.google.com/p/winreg-kb/wiki/SystemKeys"]}, {"name": "WMIProfileUsersHomeDir", "sources": [{"attributes": {"query": "SELECT * FROM Win32_UserProfile WHERE SID='%%users.sid%%'"}, "type": "WMI"}], "provides": ["users.homedir"], "supported_os": ["Windows"], "labels": ["Users"], "doc": "Get user homedir from Win32_UserProfile based on a known user's SID.\n\nThis artifact relies on having the SID field users.sid populated in the knowledge\nbase. We expect it to be collected with WindowsRegistryProfiles to\nsupply the rest of the user information.\n", "urls": ["http://msdn.microsoft.com/en-us/library/windows/desktop/ee886409(v=vs.85).aspx"]}, {"labels": ["Logs"], "name": "EventLogs", "sources": [{"attributes": {"names": ["ApplicationEventLog", "ApplicationEventLogEvtx", "SecurityEventLog", "SecurityEventLogEvtx", "SystemEventLog", "SystemEventLogEvtx"]}, "type": "ARTIFACT_GROUP"}], "doc": "Windows Event logs.", "supported_os": ["Windows"]}, {"labels": ["Software"], "name": "RedhatPackagesList", "sources": [{"attributes": {"args": ["-qa"], "cmd": "/bin/rpm"}, "type": "COMMAND"}], "doc": "Linux output of rpm -qa.", "supported_os": ["Linux"]}, {"labels": ["System"], "name": "OSXLoadedKexts", "sources": [{"attributes": {"args": [], "cmd": "/usr/sbin/kextstat"}, "type": "COMMAND"}], "doc": "Mac OS X Loaded Kernel Extensions.", "supported_os": ["Darwin"]}]
@@ -4,6 +4,7 @@
import io
import os
import unittest
import yaml

from artifacts import definitions
from artifacts import errors
@@ -250,7 +251,7 @@ def testMissingNamesAttribute(self):
with self.assertRaises(errors.FormatError):
_ = list(artifact_reader.ReadFileObject(file_object))

def testReadFile(self):
def testReadYamlFile(self):
"""Tests the ReadFile function."""
artifact_reader = reader.YamlArtifactsReader()
test_file = os.path.join('test_data', 'definitions.yaml')
@@ -267,6 +268,47 @@ def testReadDirectory(self):

self.assertEqual(len(artifact_definitions), 7)

def testArtifactAsDict(self):
"""Tests the ArtifactDefinition AsDict method returns the same dict that parsing artifact from yaml yields."""
artifact_reader = reader.YamlArtifactsReader()
test_file = os.path.join('test_data', 'definitions.yaml')

with open(test_file, 'r') as file_object:
for artifact_definition in yaml.safe_load_all(file_object):
artifact_object = artifact_reader.ReadArtifactDefinitionValues(
artifact_definition)
self.assertEqual(artifact_definition, artifact_object.AsDict())

def testDefinitionsAsDict(self):
"""Tests that all defined artifacts can convert to dictionary representation without raising."""
artifact_reader = reader.YamlArtifactsReader()

artifact_definitions = list(artifact_reader.ReadDirectory('definitions'))

last_artifact_definition = None
for artifact in artifact_definitions:
try:
artifact_definition = artifact.AsDict()
except errors.FormatError:
error_location = u'At start'
if last_artifact_definition:
error_location = u'After: {0}'.format(last_artifact_definition.name)
self.fail(u'{0} failed to convert to dict'.format(error_location))
last_artifact_definition = artifact_definition


class JsonArtifactsReaderTest(unittest.TestCase):
"""Class to test the JSON artifacts reader."""

def testReadJsonFile(self):
"""Tests the ReadFile function."""
artifact_reader = reader.JsonArtifactsReader()
test_file = os.path.join('test_data', 'definitions.json')

artifact_definitions = list(artifact_reader.ReadFile(test_file))

self.assertEqual(len(artifact_definitions), 7)


if __name__ == '__main__':
unittest.main()
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
"""Tests for the artifact definitions readers."""

import io
import os
import unittest
import json
import tempfile

from artifacts import reader
from artifacts import writer


class ArtifactsWriterTest(unittest.TestCase):
"""Class to test the artifacts writer."""

def checkArtifactConversion(self, artifact_reader, artifact_writer,
test_file):
artifact_definitions = list(artifact_reader.ReadFile(test_file))
with tempfile.NamedTemporaryFile() as artifact_file:
artifact_writer.WriteArtifactsFile(artifact_definitions,
artifact_file.name)
converted_artifact_definitions = list(artifact_reader.ReadFile(
artifact_file.name))

self.assertListEqual(
[artifact.AsDict() for artifact in artifact_definitions],
[artifact.AsDict() for artifact in converted_artifact_definitions])

def testYamlWriter(self):
"""Tests the YamlArtifactsWriter FormatArtifacts method for loss during conversion."""
artifact_reader = reader.YamlArtifactsReader()
artifact_writer = writer.YamlArtifactsWriter()
test_file = os.path.join('test_data', 'definitions.yaml')
self.checkArtifactConversion(artifact_reader, artifact_writer, test_file)

def testJsonWriter(self):
"""Tests the JsonArtifactsWriter FormatArtifacts method for loss during conversion."""
artifact_reader = reader.JsonArtifactsReader()
artifact_writer = writer.JsonArtifactsWriter()
test_file = os.path.join('test_data', 'definitions.json')
self.checkArtifactConversion(artifact_reader, artifact_writer, test_file)


if __name__ == '__main__':
unittest.main()

0 comments on commit adfe449

Please sign in to comment.