Skip to content

Commit

Permalink
Add audit version to all records
Browse files Browse the repository at this point in the history
This change adds a field named `audit_version` to every record. The
value of this field is a string that represents the time at which the
audits started. The value of this field is in `YYYYmmddHHMMSS` format,
e.g., `20190429180829`.

This version string is created only once at the beginning of a run of
all configured audits. The same version string is then used for all
audits. This way, all data obtained during a single run get the same
version string regardless of the actual start time of the individual
audits.

Additionally, this change adds two new record types.

  - `begin_audit`: A record with `record_type` as `begin_audit`
    indicates that a specific audit has started. The `audit_key` field
    in the `com` bucket shows which audit has started.

  - `end_audit`: A record of this type indicates that a specific audit
     has ended. The `audit_key` field in the `com` bucket shows which
     audit has ended.

Note that the same destination target may contain multiple `begin_audit`
record at the beginning of an audit. For example, if the same Splunk
instance is configured both as a store as well as an alert, then it
would receive one `begin_audit` record sent to the store worker and one
more sent to the alert worker. This holds good for `end_audit` record
too.

The audit version string in these records are useful in selecting or
querying the data obtained from the most recent run of an audit.
  • Loading branch information
susam committed May 3, 2019
1 parent 509a790 commit f27c3df
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 81 deletions.
96 changes: 66 additions & 30 deletions cloudmarker/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,29 @@ def main():
# depending on the command line options.
if args.now:
_log.info('Starting job now')
job(config)
_run(config)
else:
_log.info('Scheduled to run job everyday at %s', config['schedule'])
schedule.every().day.at(config['schedule']).do(job, config)
schedule.every().day.at(config['schedule']).do(_run, config)
while True:
schedule.run_pending()
time.sleep(60)


def job(config):
def _run(config):
"""Run the audits.
Arguments:
config (dict): Configuration dictionary.
"""
start_time = time.localtime()
_send_email(config.get('email'), 'job', start_time)
_send_email(config.get('email'), 'run', start_time)

# Create an audit object for each audit configured to be run.
audit_version = time.strftime('%Y%m%d%H%M%S', time.gmtime())
audits = []
for audit_name in config['run']:
audits.append(Audit(audit_name, config))
for audit_key in config['run']:
audits.append(Audit(audit_key, audit_version, config))

# Start all audits.
for audit in audits:
Expand All @@ -87,7 +88,7 @@ def job(config):
audit.join()

end_time = time.localtime()
_send_email(config.get('email'), 'job', start_time, end_time)
_send_email(config.get('email'), 'run', start_time, end_time)


class Audit:
Expand All @@ -97,7 +98,7 @@ class Audit:
input queues for a single audit configuration.
"""

def __init__(self, audit_name, config):
def __init__(self, audit_key, audit_version, config):
"""Create an instance of :class:`Audit` from configuration.
A single audit definition (from a list of audit definitions
Expand All @@ -109,17 +110,19 @@ def __init__(self, audit_name, config):
the audit workflow.
Arguments:
audit_name (str): Key name for an audit configuration. This
audit_key (str): Key name for an audit configuration. This
key is looked for in ``config['audits']``.
audit_version (str): Audit version string.
config (dict): Configuration dictionary. This is the
entire configuration dictionary that contains
top-level keys named ``clouds``, ``stores``, ``events``,
``alerts``, ``audits``, ``run``, etc.
"""
self._start_time = time.localtime()
self._audit_name = audit_name
self._audit_key = audit_key
self._audit_version = audit_version
self._config = config
audit_config = config['audits'][audit_name]
audit_config = config['audits'][audit_key]

# We keep all workers in these lists.
self._cloud_workers = []
Expand All @@ -133,23 +136,27 @@ def __init__(self, audit_name, config):
self._alert_queues = []

# Create alert workers and queues.
for name in audit_config['alerts']:
for plugin_key in audit_config['alerts']:
input_queue = mp.Queue()
args = (
audit_name + '-' + name,
util.load_plugin(config['plugins'][name]),
audit_key,
audit_version,
plugin_key,
util.load_plugin(config['plugins'][plugin_key]),
input_queue,
)
worker = mp.Process(target=workers.alert_worker, args=args)
self._alert_workers.append(worker)
self._alert_queues.append(input_queue)

# Create event_workers workers and queues.
for name in audit_config['events']:
for plugin_key in audit_config['events']:
input_queue = mp.Queue()
args = (
audit_name + '-' + name,
util.load_plugin(config['plugins'][name]),
audit_key,
audit_version,
plugin_key,
util.load_plugin(config['plugins'][plugin_key]),
input_queue,
self._alert_queues,
)
Expand All @@ -158,22 +165,26 @@ def __init__(self, audit_name, config):
self._event_queues.append(input_queue)

# Create store workers and queues.
for name in audit_config['stores']:
for plugin_key in audit_config['stores']:
input_queue = mp.Queue()
args = (
audit_name + '-' + name,
util.load_plugin(config['plugins'][name]),
audit_key,
audit_version,
plugin_key,
util.load_plugin(config['plugins'][plugin_key]),
input_queue,
)
worker = mp.Process(target=workers.store_worker, args=args)
self._store_workers.append(worker)
self._store_queues.append(input_queue)

# Create cloud workers.
for name in audit_config['clouds']:
for plugin_key in audit_config['clouds']:
args = (
audit_name + '-' + name,
util.load_plugin(config['plugins'][name]),
audit_key,
audit_version,
plugin_key,
util.load_plugin(config['plugins'][plugin_key]),
self._store_queues + self._event_queues
)
worker = mp.Process(target=workers.cloud_worker, args=args)
Expand All @@ -182,8 +193,21 @@ def __init__(self, audit_name, config):
def start(self):
"""Start audit by starting all workers."""
_send_email(self._config.get('email'), 'audit', self._start_time)
for w in (self._cloud_workers + self._store_workers +
self._event_workers + self._alert_workers):

begin_record = {'com': {'record_type': 'begin_audit'}}

# Start store and alert workers first before cloud and event
# workers. See next comment to know why.
for w in self._store_workers + self._alert_workers:
w.start()

# We want to send begin_audit record to store/alert plugins
# before any cloud/event workers can send their records to them.
for q in self._store_queues + self._alert_queues:
q.put(begin_record)

# Now start the cloud and event workers.
for w in self._cloud_workers + self._event_workers:
w.start()

def join(self):
Expand All @@ -192,24 +216,36 @@ def join(self):
for w in self._cloud_workers:
w.join()

# Stop store workers and event workers.
for q in self._store_queues + self._event_queues:
end_record = {'com': {'record_type': 'end_audit'}}

# Stop store workers.
for q in self._store_queues:
q.put(end_record)
q.put(None)

# Stop event workers.
for q in self._event_queues:
q.put(None)

# Wait for store workers and event_workers workers to terminate.
for w in self._store_workers + self._event_workers:
# Wait for store workers to terminate.
for w in self._store_workers:
w.join()

# Wait for event workers to terminate.
for w in self._event_workers:
w.join()

# Stop alert workers.
for q in self._alert_queues:
q.put(end_record)
q.put(None)

# Wait for alert workers to terminate.
for w in self._alert_workers:
w.join()

end_time = time.localtime()
_send_email(self._config.get('email'), self._audit_name,
_send_email(self._config.get('email'), self._audit_key,
self._start_time, end_time)


Expand Down
2 changes: 1 addition & 1 deletion cloudmarker/stores/filestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def write(self, record):
record (dict): Data to write to the file system.
"""
worker_name = record['com']['origin_worker']
worker_name = record.get('com', {}).get('origin_worker', 'no_worker')

tmp_file_path = os.path.join(self._path, worker_name) + '.tmp'
if worker_name not in self._worker_names:
Expand Down
33 changes: 30 additions & 3 deletions cloudmarker/test/test_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def test_cloud_worker(self):
out_q2 = mp.Queue()

# Invoke the mock plugin with the worker.
workers.cloud_worker('foo', plugin, [out_q1, out_q2])
workers.cloud_worker('fooaudit', 'fooversion', 'foocloud',
plugin, [out_q1, out_q2])

# Test that the worker invoked the mock plugin's read() method
# and finally invoked the mock plugin's done() method.
Expand Down Expand Up @@ -53,7 +54,32 @@ def test_store_worker(self):
in_q.put(None)

# Invoke the mock plugin with the worker.
workers.store_worker('foo', plugin, in_q)
workers.store_worker('fooaudit', 'fooversion', 'foostore',
plugin, in_q)

# Test that the worker invoked the mock plugin's write()
# method twice (once for each record) and finally invoked the
# mock plugin's done() method (for the None input).
expected_calls = [mock.call.write(mock.ANY),
mock.call.write(mock.ANY),
mock.call.done()]
self.assertEqual(plugin.mock_calls, expected_calls)

def test_alert_worker(self):
# Mock plugin.
plugin = mock.Mock()

# Test input queue for the mock plugin.
in_q = mp.Queue()

# Put two mock records and None in the test input queue.
in_q.put({'raw': {'data': 'record1'}})
in_q.put({'raw': {'data': 'record2'}})
in_q.put(None)

# Invoke the mock plugin with the worker.
workers.alert_worker('fooaudit', 'fooversion', 'fooalert',
plugin, in_q)

# Test that the worker invoked the mock plugin's write()
# method twice (once for each record) and finally invoked the
Expand Down Expand Up @@ -85,7 +111,8 @@ def fake_eval(s):
in_q.put(None)

# Invoke the mock plugin with the worker.
workers.event_worker('foo', plugin, in_q, [out_q1, out_q2])
workers.event_worker('fooaudit', 'fooversion', 'fooevent',
plugin, in_q, [out_q1, out_q2])

# Test that the worker invoked the mock plugin's eval() method
# twice (once for each input string record) and finally invoked
Expand Down
Loading

0 comments on commit f27c3df

Please sign in to comment.