-
Notifications
You must be signed in to change notification settings - Fork 208
/
registry_test.py
155 lines (109 loc) · 4.86 KB
/
registry_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# -*- coding: utf-8 -*-
"""Tests for the artifact definitions registry."""
import io
import unittest
from artifacts import errors
from artifacts import reader
from artifacts import registry
from artifacts import source_type
from tests import test_lib
class TestSourceType(source_type.SourceType):
"""Class that implements a test source type."""
TYPE_INDICATOR = 'test'
def __init__(self, test=None):
"""Initializes the source type object.
Args:
test (Optional[str]): test string.
Raises:
FormatError: when test is not set.
"""
if not test:
raise errors.FormatError('Missing test value.')
super(TestSourceType, self).__init__()
self.test = test
def AsDict(self):
"""Represents a source type as a dictionary.
Returns:
dict[str, str]: source type attributes.
"""
return {'test': self.test}
class ArtifactDefinitionsRegistryTest(test_lib.BaseTestCase):
"""Tests for the artifact definitions registry."""
# pylint: disable=protected-access
def testArtifactDefinitionsRegistry(self):
"""Tests the ArtifactDefinitionsRegistry functions."""
test_file = self._GetTestFilePath(['definitions.yaml'])
self._SkipIfPathNotExists(test_file)
artifact_registry = registry.ArtifactDefinitionsRegistry()
artifact_reader = reader.YamlArtifactsReader()
for artifact_definition in artifact_reader.ReadFile(test_file):
artifact_registry.RegisterDefinition(artifact_definition)
# Make sure the test file got turned into artifacts.
definitions = list(artifact_registry.GetDefinitions())
self.assertEqual(len(definitions), 7)
artifact_definition = artifact_registry.GetDefinitionByName('EventLogs')
self.assertIsNotNone(artifact_definition)
# Try to register something already registered
with self.assertRaises(KeyError):
artifact_registry.RegisterDefinition(artifact_definition)
# Deregister
artifact_registry.DeregisterDefinition(artifact_definition)
# Check it is gone
with self.assertRaises(KeyError):
artifact_registry.DeregisterDefinition(artifact_definition)
definitions = list(artifact_registry.GetDefinitions())
self.assertEqual(len(definitions), 6)
test_artifact_definition = artifact_registry.GetDefinitionByName(
'SecurityEventLogEvtxFile')
self.assertIsNotNone(test_artifact_definition)
self.assertEqual(test_artifact_definition.name, 'SecurityEventLogEvtxFile')
self.assertEqual(test_artifact_definition.aliases, ['SecurityEventLogEvtx'])
expected_description = (
'Windows Security Event log for Vista or later systems.')
self.assertEqual(test_artifact_definition.description, expected_description)
bad_args = io.BytesIO(
b'name: SecurityEventLogEvtx\n'
b'doc: Windows Security Event log for Vista or later systems.\n'
b'sources:\n'
b'- type: FILE\n'
b' attributes: {broken: [\'%%environ_systemroot%%\\System32\\'
b'winevt\\Logs\\Security.evtx\']}\n'
b'supported_os: [Windows]\n'
b'urls: [\'http://www.forensicswiki.org/wiki/\n'
b'Windows_XML_Event_Log_(EVTX)\']\n')
generator = artifact_reader.ReadFileObject(bad_args)
with self.assertRaises(errors.FormatError):
next(generator)
def testSourceTypeFunctions(self):
"""Tests the source type functions."""
number_of_source_types = len(
registry.ArtifactDefinitionsRegistry._source_type_classes)
registry.ArtifactDefinitionsRegistry.RegisterSourceType(TestSourceType)
self.assertEqual(
len(registry.ArtifactDefinitionsRegistry._source_type_classes),
number_of_source_types + 1)
with self.assertRaises(KeyError):
registry.ArtifactDefinitionsRegistry.RegisterSourceType(TestSourceType)
registry.ArtifactDefinitionsRegistry.DeregisterSourceType(TestSourceType)
self.assertEqual(
len(registry.ArtifactDefinitionsRegistry._source_type_classes),
number_of_source_types)
registry.ArtifactDefinitionsRegistry.RegisterSourceTypes([TestSourceType])
self.assertEqual(
len(registry.ArtifactDefinitionsRegistry._source_type_classes),
number_of_source_types + 1)
with self.assertRaises(KeyError):
registry.ArtifactDefinitionsRegistry.RegisterSourceTypes([TestSourceType])
source_object = registry.ArtifactDefinitionsRegistry.CreateSourceType(
'test', {'test': 'test123'})
self.assertIsNotNone(source_object)
self.assertEqual(source_object.test, 'test123')
with self.assertRaises(errors.FormatError):
source_object = registry.ArtifactDefinitionsRegistry.CreateSourceType(
'test', {})
with self.assertRaises(errors.FormatError):
source_object = registry.ArtifactDefinitionsRegistry.CreateSourceType(
'bogus', {})
registry.ArtifactDefinitionsRegistry.DeregisterSourceType(TestSourceType)
if __name__ == '__main__':
unittest.main()