-
Notifications
You must be signed in to change notification settings - Fork 17
/
workers.py
207 lines (172 loc) · 7.71 KB
/
workers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""Worker functions.
The functions in this module wrap around plugin classes such that these
worker functions can be specified as the ``target`` parameter while
launching a new subprocess with :class:`multiprocessing.Process`.
Each worker function can run as a separate subprocess. While wrapping
around a plugin class, each worker function creates the multiprocessing
queues necessary to pass records from one plugin class to another.
"""
import logging
from cloudmarker import util
_log = logging.getLogger(__name__)
def cloud_worker(audit_key, audit_version, plugin_key, plugin,
output_queues):
"""Worker function for cloud plugins.
This function expects the ``plugin`` object to implement a ``read``
method that yields records. This function calls this ``read`` method
to retrieve records and puts each record into each queue in
``output_queues``.
Arguments:
audit_key (str): Audit key name in configuration.
audit_version (str): Audit version string.
plugin_key (str): Plugin key name in configuration.
plugin (object): Cloud plugin object.
output_queues (list): List of :class:`multiprocessing.Queue`
objects to write records to.
"""
worker_name = audit_key + '_' + plugin_key
_log.info('%s: Started', worker_name)
try:
for record in plugin.read():
record['com'] = util.merge_dicts(record.get('com', {}), {
'audit_key': audit_key,
'audit_version': audit_version,
'origin_key': plugin_key,
'origin_class': type(plugin).__name__,
'origin_worker': worker_name,
'origin_type': 'cloud',
})
for q in output_queues:
q.put(record)
except Exception as e:
_log.exception('%s: Failed; read() error: %s: %s',
worker_name, type(e).__name__, e)
try:
plugin.done()
except Exception as e:
_log.exception('%s: Failed; done() error: %s: %s',
worker_name, type(e).__name__, e)
_log.info('%s: Stopped', worker_name)
def event_worker(audit_key, audit_version, plugin_key, plugin,
input_queue, output_queues):
"""Worker function for event plugins.
This function expects the ``plugin`` object to implement a ``eval``
method that accepts a single record as a parameter and yields one or
more records, and a ``done`` method to perform cleanup work in the
end.
This function gets records from ``input_queue`` and passes each
record to the ``eval`` method of ``plugin``. Then it puts each
record yielded by the ``eval`` method into each queue in
``output_queues``.
When there are no more records in the ``input_queue``, i.e., once
``None`` is found in the ``input_queue``, this function calls the
``done`` method of the ``plugin`` to indicate that record
processing is over.
Arguments:
audit_key (str): Audit key name in configuration.
audit_version (str): Audit version string.
plugin_key (str): Plugin key name in configuration.
plugin (object): Store plugin object.
input_queue (multiprocessing.Queue): Queue to read records from.
output_queues (list): List of :class:`multiprocessing.Queue`
objects to write records to.
"""
worker_name = audit_key + '_' + plugin_key
_log.info('%s: Started', worker_name)
while True:
record = input_queue.get()
if record is None:
try:
plugin.done()
except Exception as e:
_log.exception('%s: Failed; done() error: %s: %s',
worker_name, type(e).__name__, e)
break
try:
for event_record in plugin.eval(record):
event_record['com'] = \
util.merge_dicts(event_record.get('com', {}), {
'audit_key': audit_key,
'audit_version': audit_version,
'origin_key': plugin_key,
'origin_class': type(plugin).__name__,
'origin_worker': worker_name,
'origin_type': 'event',
})
for q in output_queues:
q.put(event_record)
except Exception as e:
_log.exception('%s: Failed; eval() error: %s: %s',
worker_name, type(e).__name__, e)
_log.info('%s: Stopped', worker_name)
def store_worker(audit_key, audit_version, plugin_key, plugin,
input_queue):
"""Worker function for store plugins.
This function expects the ``plugin`` object to implement a
``write`` method that accepts a single record as a parameter and a
``done`` method to perform cleanup work in the end.
This function gets records from ``input_queue`` and passes each
record to the ``write`` method of ``plugin``.
When there are no more records in the ``input_queue``, i.e., once
``None`` is found in the ``input_queue``, this function calls the
``done`` method of the ``plugin`` to indicate that record
processing is over.
Arguments:
audit_key (str): Audit key name in configuration.
audit_version (str): Audit version string.
plugin_key (str): Plugin key name in configuration.
plugin (object): Store plugin object.
input_queue (multiprocessing.Queue): Queue to read records from.
"""
_write_worker(audit_key, audit_version, plugin_key, plugin,
input_queue, 'store')
def alert_worker(audit_key, audit_version, plugin_key, plugin,
input_queue):
"""Worker function for alert plugins.
This function behaves like :func:`cloudmarker.workers.store_worker`.
See its documentation for details.
Arguments:
audit_key (str): Audit key name in configuration.
audit_version (str): Audit version string.
plugin_key (str): Plugin key name in configuration.
plugin (object): Alert plugin object.
input_queue (multiprocessing.Queue): Queue to read records from.
"""
_write_worker(audit_key, audit_version, plugin_key, plugin,
input_queue, 'alert')
def _write_worker(audit_key, audit_version, plugin_key, plugin,
input_queue, worker_type):
"""Worker function for store and alert plugins.
Arguments:
audit_key (str): Audit key name in configuration
audit_version (str): Audit version string.
plugin_key (str): Plugin key name in configuration.
plugin (object): Store plugin or alert plugin object.
input_queue (multiprocessing.Queue): Queue to read records from.
worker_type (str): Either ``'store'`` or ``'alert'``.
"""
worker_name = audit_key + '_' + plugin_key
_log.info('%s: Started', worker_name)
while True:
record = input_queue.get()
if record is None:
try:
plugin.done()
except Exception as e:
_log.exception('%s: Failed; done() error: %s: %s',
worker_name, type(e).__name__, e)
break
record['com'] = util.merge_dicts(record.get('com', {}), {
'audit_key': audit_key,
'audit_version': audit_version,
'target_key': plugin_key,
'target_class': type(plugin).__name__,
'target_worker': worker_name,
'target_type': worker_type,
})
try:
plugin.write(record)
except Exception as e:
_log.exception('%s: Failed; write() error: %s: %s',
worker_name, type(e).__name__, e)
_log.info('%s: Stopped', worker_name)