Skip to content

Commit

Permalink
Merge 50f971b into b5a25c5
Browse files Browse the repository at this point in the history
  • Loading branch information
susam committed May 10, 2019
2 parents b5a25c5 + 50f971b commit da01255
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 28 deletions.
81 changes: 54 additions & 27 deletions cloudmarker/workers.py
Expand Up @@ -36,19 +36,28 @@ def cloud_worker(audit_key, audit_version, plugin_key, plugin,
"""
worker_name = audit_key + '_' + plugin_key
_log.info('%s: Started', worker_name)
for record in plugin.read():
record['com'] = util.merge_dicts(record.get('com', {}), {
'audit_key': audit_key,
'audit_version': audit_version,
'origin_key': plugin_key,
'origin_class': type(plugin).__name__,
'origin_worker': worker_name,
'origin_type': 'cloud',
})
for q in output_queues:
q.put(record)
try:
for record in plugin.read():
record['com'] = util.merge_dicts(record.get('com', {}), {
'audit_key': audit_key,
'audit_version': audit_version,
'origin_key': plugin_key,
'origin_class': type(plugin).__name__,
'origin_worker': worker_name,
'origin_type': 'cloud',
})
for q in output_queues:
q.put(record)
except Exception as e:
_log.exception('%s: Failed; read() error: %s: %s',
worker_name, type(e).__name__, e)

try:
plugin.done()
except Exception as e:
_log.exception('%s: Failed; done() error: %s: %s',
worker_name, type(e).__name__, e)

plugin.done()
_log.info('%s: Stopped', worker_name)


Expand Down Expand Up @@ -85,21 +94,30 @@ def event_worker(audit_key, audit_version, plugin_key, plugin,
while True:
record = input_queue.get()
if record is None:
plugin.done()
try:
plugin.done()
except Exception as e:
_log.exception('%s: Failed; done() error: %s: %s',
worker_name, type(e).__name__, e)
break

for event_record in plugin.eval(record):
event_record['com'] = \
util.merge_dicts(event_record.get('com', {}), {
'audit_key': audit_key,
'audit_version': audit_version,
'origin_key': plugin_key,
'origin_class': type(plugin).__name__,
'origin_worker': worker_name,
'origin_type': 'event',
})
for q in output_queues:
q.put(event_record)
try:
for event_record in plugin.eval(record):
event_record['com'] = \
util.merge_dicts(event_record.get('com', {}), {
'audit_key': audit_key,
'audit_version': audit_version,
'origin_key': plugin_key,
'origin_class': type(plugin).__name__,
'origin_worker': worker_name,
'origin_type': 'event',
})
for q in output_queues:
q.put(event_record)
except Exception as e:
_log.exception('%s: Failed; eval() error: %s: %s',
worker_name, type(e).__name__, e)

_log.info('%s: Stopped', worker_name)


Expand Down Expand Up @@ -165,7 +183,11 @@ def _write_worker(audit_key, audit_version, plugin_key, plugin,
while True:
record = input_queue.get()
if record is None:
plugin.done()
try:
plugin.done()
except Exception as e:
_log.exception('%s: Failed; done() error: %s: %s',
worker_name, type(e).__name__, e)
break

record['com'] = util.merge_dicts(record.get('com', {}), {
Expand All @@ -176,5 +198,10 @@ def _write_worker(audit_key, audit_version, plugin_key, plugin,
'target_worker': worker_name,
'target_type': worker_type,
})
plugin.write(record)

try:
plugin.write(record)
except Exception as e:
_log.exception('%s: Failed; write() error: %s: %s',
worker_name, type(e).__name__, e)
_log.info('%s: Stopped', worker_name)
3 changes: 2 additions & 1 deletion pylama.ini
Expand Up @@ -18,9 +18,10 @@ ignore = R0902
# R0902 Too many instance attributes (10/7) [pylint]

[pylama:cloudmarker/workers.py]
ignore = R0913
ignore = R0913,W0703

# R0913 Too many arguments (6/5) [pylint]
# W0703 Catching too general exception Exception [pylint]

[pylama:cloudmarker/clouds/gcpcloud.py]
ignore = E1101
Expand Down

0 comments on commit da01255

Please sign in to comment.