diff --git a/cloudmarker/workers.py b/cloudmarker/workers.py index 7ba126a..d706ce6 100644 --- a/cloudmarker/workers.py +++ b/cloudmarker/workers.py @@ -36,19 +36,28 @@ def cloud_worker(audit_key, audit_version, plugin_key, plugin, """ worker_name = audit_key + '_' + plugin_key _log.info('%s: Started', worker_name) - for record in plugin.read(): - record['com'] = util.merge_dicts(record.get('com', {}), { - 'audit_key': audit_key, - 'audit_version': audit_version, - 'origin_key': plugin_key, - 'origin_class': type(plugin).__name__, - 'origin_worker': worker_name, - 'origin_type': 'cloud', - }) - for q in output_queues: - q.put(record) + try: + for record in plugin.read(): + record['com'] = util.merge_dicts(record.get('com', {}), { + 'audit_key': audit_key, + 'audit_version': audit_version, + 'origin_key': plugin_key, + 'origin_class': type(plugin).__name__, + 'origin_worker': worker_name, + 'origin_type': 'cloud', + }) + for q in output_queues: + q.put(record) + except Exception as e: + _log.exception('%s: Failed; read() error: %s: %s', + worker_name, type(e).__name__, e) + + try: + plugin.done() + except Exception as e: + _log.exception('%s: Failed; done() error: %s: %s', + worker_name, type(e).__name__, e) - plugin.done() _log.info('%s: Stopped', worker_name) @@ -85,21 +94,30 @@ def event_worker(audit_key, audit_version, plugin_key, plugin, while True: record = input_queue.get() if record is None: - plugin.done() + try: + plugin.done() + except Exception as e: + _log.exception('%s: Failed; done() error: %s: %s', + worker_name, type(e).__name__, e) break - for event_record in plugin.eval(record): - event_record['com'] = \ - util.merge_dicts(event_record.get('com', {}), { - 'audit_key': audit_key, - 'audit_version': audit_version, - 'origin_key': plugin_key, - 'origin_class': type(plugin).__name__, - 'origin_worker': worker_name, - 'origin_type': 'event', - }) - for q in output_queues: - q.put(event_record) + try: + for event_record in plugin.eval(record): + event_record['com'] = \ + util.merge_dicts(event_record.get('com', {}), { + 'audit_key': audit_key, + 'audit_version': audit_version, + 'origin_key': plugin_key, + 'origin_class': type(plugin).__name__, + 'origin_worker': worker_name, + 'origin_type': 'event', + }) + for q in output_queues: + q.put(event_record) + except Exception as e: + _log.exception('%s: Failed; eval() error: %s: %s', + worker_name, type(e).__name__, e) + _log.info('%s: Stopped', worker_name) @@ -165,7 +183,11 @@ def _write_worker(audit_key, audit_version, plugin_key, plugin, while True: record = input_queue.get() if record is None: - plugin.done() + try: + plugin.done() + except Exception as e: + _log.exception('%s: Failed; done() error: %s: %s', + worker_name, type(e).__name__, e) break record['com'] = util.merge_dicts(record.get('com', {}), { @@ -176,5 +198,10 @@ def _write_worker(audit_key, audit_version, plugin_key, plugin, 'target_worker': worker_name, 'target_type': worker_type, }) - plugin.write(record) + + try: + plugin.write(record) + except Exception as e: + _log.exception('%s: Failed; write() error: %s: %s', + worker_name, type(e).__name__, e) _log.info('%s: Stopped', worker_name) diff --git a/pylama.ini b/pylama.ini index 81c4af5..b88433f 100644 --- a/pylama.ini +++ b/pylama.ini @@ -18,9 +18,10 @@ ignore = R0902 # R0902 Too many instance attributes (10/7) [pylint] [pylama:cloudmarker/workers.py] -ignore = R0913 +ignore = R0913,W0703 # R0913 Too many arguments (6/5) [pylint] +# W0703 Catching too general exception Exception [pylint] [pylama:cloudmarker/clouds/gcpcloud.py] ignore = E1101