Skip to content

Commit

Permalink
Populate alert_worker in com bucket
Browse files Browse the repository at this point in the history
Prior to this change, the `com` bucket of events alerted with an alert
plugin looked like the following example:

  "com": {
    "record_type": "mock_event",
    "origin_worker": "mockaudit-mockevent",
    "origin_type": "event",
    "event_worker": "mockaudit-mockevent",
    "store_worker": "mockaudit-filestore"
  }

Note that the last key in the example above is named as `store_worker`
although alerting is done by alert plugins. This was so because store
plugins and alert plugins implement the exact same interface via
duck-typing, so both types of plugins were worked on by a function named
`store_worker`.

While this makes sense for us (the developers), finding a `store_worker`
key in data written by an alert plugin could be confusing to a user.
Therefore, with this change, the `com` bucket of events alerted with an
alert plugin looks like this:

  "com": {
    "record_type": "mock_event",
    "origin_worker": "mockaudit-mockevent",
    "origin_type": "event",
    "event_worker": "mockaudit-mockevent",
    "alert_worker": "mockaudit-filestore"
  }

To accomplish this behaviour, `workers.py` has been refactored by moving
the existing `store_worker` functionality to an internal `_write_worker`
function which can work on both store and alert plugins. It accepts an
additional parameter that tells it whether to populate `store_worker` or
`alert_worker` in the `com` bucket. Now there are two functions
`store_worker` and `alert_worker` which invoke with the `_write_worker`
function with the additional parameter to populate the `com` bucket
appropriately.
  • Loading branch information
susam committed Apr 28, 2019
1 parent d242e1d commit 7fb7422
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cloudmarker/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def __init__(self, audit_name, config):
util.load_plugin(config['plugins'][name]),
input_queue,
)
worker = mp.Process(target=workers.store_worker, args=args)
worker = mp.Process(target=workers.alert_worker, args=args)
self._alert_workers.append(worker)
self._alert_queues.append(input_queue)

Expand Down
47 changes: 37 additions & 10 deletions cloudmarker/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,7 @@ def store_worker(worker_name, store_plugin, input_queue):
store_plugin (object): Store plugin object.
input_queue (multiprocessing.Queue): Queue to read records from.
"""
_log.info('%s: Started', worker_name)
while True:
record = input_queue.get()
if record is None:
store_plugin.done()
break
record.setdefault('com', {})
record['com']['store_worker'] = worker_name
store_plugin.write(record)
_log.info('%s: Stopped', worker_name)
_write_worker(worker_name, store_plugin, input_queue, 'store')


def event_worker(worker_name, event_plugin, input_queue, output_queues):
Expand Down Expand Up @@ -113,3 +104,39 @@ def event_worker(worker_name, event_plugin, input_queue, output_queues):
for q in output_queues:
q.put(event_record)
_log.info('%s: Stopped', worker_name)


def alert_worker(worker_name, alert_plugin, input_queue):
"""Worker function for alert plugins.
This function behaves like :func:`cloudmarker.workers.store_worker`.
See its documentation for details.
Arguments:
worker_name (str): Display name for the worker.
alert_plugin (object): Alert plugin object.
input_queue (multiprocessing.Queue): Queue to read records from.
worker_type (str): Either ``'store'`` or ``'alert'``.
"""
_write_worker(worker_name, alert_plugin, input_queue, 'alert')


def _write_worker(worker_name, write_plugin, input_queue, worker_type):
"""Worker function for store and alert plugins.
Arguments:
worker_name (str): Display name for the worker.
write_plugin (object): Store plugin or alert plugin object.
input_queue (multiprocessing.Queue): Queue to read records from.
worker_type (str): Either ``'store'`` or ``'alert'``.
"""
_log.info('%s: Started', worker_name)
while True:
record = input_queue.get()
if record is None:
write_plugin.done()
break
record.setdefault('com', {})
record['com'][worker_type + '_worker'] = worker_name
write_plugin.write(record)
_log.info('%s: Stopped', worker_name)

0 comments on commit 7fb7422

Please sign in to comment.