Skip to content

Commit

Permalink
Merge a007ffa into 7bdd6b9
Browse files Browse the repository at this point in the history
  • Loading branch information
susam committed Jun 11, 2019
2 parents 7bdd6b9 + a007ffa commit 15126ef
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 84 deletions.
32 changes: 21 additions & 11 deletions cloudmarker/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def __init__(self, audit_key, audit_version, config):
audit_key,
audit_version,
plugin_key,
util.load_plugin(config['plugins'][plugin_key]),
config['plugins'][plugin_key],
input_queue,
)
worker = mp.Process(target=workers.alert_worker, args=args)
Expand All @@ -156,7 +156,7 @@ def __init__(self, audit_key, audit_version, config):
audit_key,
audit_version,
plugin_key,
util.load_plugin(config['plugins'][plugin_key]),
config['plugins'][plugin_key],
input_queue,
self._alert_queues,
)
Expand All @@ -171,7 +171,7 @@ def __init__(self, audit_key, audit_version, config):
audit_key,
audit_version,
plugin_key,
util.load_plugin(config['plugins'][plugin_key]),
config['plugins'][plugin_key],
input_queue,
)
worker = mp.Process(target=workers.store_worker, args=args)
Expand All @@ -184,7 +184,7 @@ def __init__(self, audit_key, audit_version, config):
audit_key,
audit_version,
plugin_key,
util.load_plugin(config['plugins'][plugin_key]),
config['plugins'][plugin_key],
self._store_queues + self._event_queues
)
worker = mp.Process(target=workers.cloud_worker, args=args)
Expand All @@ -197,19 +197,29 @@ def start(self):

begin_record = {'com': {'record_type': 'begin_audit'}}

# Start store and alert workers first before cloud and event
# workers. See next comment to know why.
# Start store and alert workers.
for w in self._store_workers + self._alert_workers:
w.start()

# We want to send begin_audit record to store/alert plugins
# before any cloud/event workers can send their records to them.
# Start cloud and event workers.
for w in self._cloud_workers + self._event_workers:
w.start()

# Send begin_audit record to each sotre and alert plugin.
for q in self._store_queues + self._alert_queues:
q.put(begin_record)

# Now start the cloud and event workers.
for w in self._cloud_workers + self._event_workers:
w.start()
# It would have been nice if we guarantee that begin_audit
# records are sent to each store and alert before any cloud or
# event records are sent because this is difficult to do because
# we must avoid forking a multithreaded process.
#
# See https://stackoverflow.com/q/55924761 for more details on
# why a multihreaded process should not be forked.
#
# The q.put() call starts a feeder thread, thus making the
# current process mulithreaded. Therefore, any forking (the
# w.start() calls) must be done prior to q.put() calls.

def join(self):
"""Wait until all workers terminate."""
Expand Down
106 changes: 63 additions & 43 deletions cloudmarker/test/test_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,53 @@

from cloudmarker import workers

# MockPluginClass pretends to be a mock plugin class.
MockPluginClass = mock.Mock()

# Instantiating MockPluginClass should return a mock that pretends to be
# a mock plugin object.
mock_plugin = mock.Mock()

# Plugin config dictionary that can be used to instantiate the mock
# plugin class.
plugin_config = {
'plugin': 'cloudmarker.test.test_workers.MockPluginClass',
}


class WorkersTest(unittest.TestCase):
"""Tests for worker functions."""

def setUp(self):
# Reset the mock plugin class and mock plugin object to their
# initial state before every test.
mock_plugin.reset_mock(return_value=True, side_effect=True)
MockPluginClass.reset_mock(return_value=True, side_effect=True)
MockPluginClass.return_value = mock_plugin

def test_cloud_worker(self):
# Mock plugin that generates mock records.
mock_records = [
{'raw': {'data': 'record1'}},
{'raw': {'data': 'record2'}}
]
plugin = mock.Mock()
plugin.read = mock.Mock(return_value=mock_records)
mock_plugin.read.return_value = mock_records

# Test output queues for the mock plugin.
out_q1 = mp.Queue()
out_q2 = mp.Queue()

# Invoke the mock plugin with the worker.
workers.cloud_worker('fooaudit', 'fooversion', 'foocloud',
plugin, [out_q1, out_q2])
plugin_config, [out_q1, out_q2])

# Test that the worker invoked the mock plugin's read() method
# and finally invoked the mock plugin's done() method.
expected_calls = [mock.call.read(),
mock.call.done()]
self.assertEqual(plugin.mock_calls, expected_calls)
# Test that the worker instantiated the mock plugin class, then
# invoked the mock plugin's read() method, and finally invoked
# the mock plugin's done() method.
expected_calls = [mock.call(),
mock.call().read(),
mock.call().done()]
self.assertEqual(MockPluginClass.mock_calls, expected_calls)

# Test that the worker has put the two string records in both
# the test output queues.
Expand All @@ -42,9 +63,6 @@ def test_cloud_worker(self):
self.assertEqual(out_q2.get()['raw'], {'data': 'record2'})

def test_store_worker(self):
# Mock plugin.
plugin = mock.Mock()

# Test input queue for the mock plugin.
in_q = mp.Queue()

Expand All @@ -55,20 +73,19 @@ def test_store_worker(self):

# Invoke the mock plugin with the worker.
workers.store_worker('fooaudit', 'fooversion', 'foostore',
plugin, in_q)

# Test that the worker invoked the mock plugin's write()
# method twice (once for each record) and finally invoked the
# mock plugin's done() method (for the None input).
expected_calls = [mock.call.write(mock.ANY),
mock.call.write(mock.ANY),
mock.call.done()]
self.assertEqual(plugin.mock_calls, expected_calls)
plugin_config, in_q)

# Test that the worker instantiated the mock plugin class, then
# invoked the mock plugin's write() method twice (once for each
# record), and finally invoked the mock plugin's done() method
# (for the None input).
expected_calls = [mock.call(),
mock.call().write(mock.ANY),
mock.call().write(mock.ANY),
mock.call().done()]
self.assertEqual(MockPluginClass.mock_calls, expected_calls)

def test_alert_worker(self):
# Mock plugin.
plugin = mock.Mock()

# Test input queue for the mock plugin.
in_q = mp.Queue()

Expand All @@ -79,15 +96,17 @@ def test_alert_worker(self):

# Invoke the mock plugin with the worker.
workers.alert_worker('fooaudit', 'fooversion', 'fooalert',
plugin, in_q)

# Test that the worker invoked the mock plugin's write()
# method twice (once for each record) and finally invoked the
# mock plugin's done() method (for the None input).
expected_calls = [mock.call.write(mock.ANY),
mock.call.write(mock.ANY),
mock.call.done()]
self.assertEqual(plugin.mock_calls, expected_calls)
plugin_config, in_q)

# Test that the worker instantiated the mock plugin class, then
# invoked the mock plugin's write() method twice (once for each
# record), and finally invoked the mock plugin's done() method
# (for the None input).
expected_calls = [mock.call(),
mock.call().write(mock.ANY),
mock.call().write(mock.ANY),
mock.call().done()]
self.assertEqual(MockPluginClass.mock_calls, expected_calls)

def test_event_worker(self):
# A fake_eval function that returns two fake records: length of
Expand All @@ -97,8 +116,7 @@ def fake_eval(s):
yield {'ext': {'upper': s.upper()}}

# Mock plugin.
plugin = mock.Mock()
plugin.eval = mock.Mock(side_effect=fake_eval)
mock_plugin.eval = mock.Mock(side_effect=fake_eval)

# Test input queue and output queues for the mock plugin.
in_q = mp.Queue()
Expand All @@ -112,15 +130,17 @@ def fake_eval(s):

# Invoke the mock plugin with the worker.
workers.event_worker('fooaudit', 'fooversion', 'fooevent',
plugin, in_q, [out_q1, out_q2])

# Test that the worker invoked the mock plugin's eval() method
# twice (once for each input string record) and finally invoked
# the mock plugin's done() method (for the None input).
expected_calls = [mock.call.eval('record1'),
mock.call.eval('record2'),
mock.call.done()]
self.assertEqual(plugin.mock_calls, expected_calls)
plugin_config, in_q, [out_q1, out_q2])

# Test that the worker instantiated the mock plugin class, then
# invoked the mock plugin's eval() method twice (once for each
# input string record), and finally invoked the mock plugin's
# done() method (for the None input).
expected_calls = [mock.call(),
mock.call().eval('record1'),
mock.call().eval('record2'),
mock.call().done()]
self.assertEqual(MockPluginClass.mock_calls, expected_calls)

# Test that the worker has put the values yielded by fake_eval
# in the test output queues.
Expand Down

0 comments on commit 15126ef

Please sign in to comment.