Skip to content

Commit

Permalink
removing the correlation id from each logger after it is done with it
Browse files Browse the repository at this point in the history
  • Loading branch information
silverdaz committed Nov 15, 2018
1 parent 0f97fe2 commit e47fa8f
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 39 deletions.
3 changes: 2 additions & 1 deletion lega/finalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _work(correlation_id, data):
'''Reads a message containing the ids and add it to the database.'''

# Adding correlation ID to context
LOG.add_context(correlation_id)
LOG.add_correlation_id(correlation_id)

LOG.info("Finalizing Stable ID for %s", data)

Expand All @@ -45,6 +45,7 @@ def _work(correlation_id, data):

# We should revert back the ownership of the file now

LOG.remove_correlation_id()
# Clean up files is left for the cleanup script. Triggered manually
return None, False # No result, no error

Expand Down
3 changes: 2 additions & 1 deletion lega/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def _work(fs, correlation_id, data):
'''Reads a message, splits the header and sends the remainder to the backend store.'''

# Adding correlation ID to context
LOG.add_context(correlation_id)
LOG.add_correlation_id(correlation_id)

# Keeping data as-is (cuz the decorator is using it)
# It will be augmented, but we keep the original data first
Expand Down Expand Up @@ -115,6 +115,7 @@ def _work(fs, correlation_id, data):
data['vault_type'] = storage_type

LOG.debug("Reply message: %s", data)
LOG.remove_correlation_id()
return (data, False)

@configure
Expand Down
28 changes: 20 additions & 8 deletions lega/utils/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def publish(message, exchange, routing, correlation_id):
The correlation_id must be specified (and then forwarded).
'''
assert( correlation_id )
LOG.add_context(correlation_id)
LOG.add_correlation_id(correlation_id)
with connection.channel() as channel:
LOG.debug('Sending %s to exchange: %s [routing key: %s]', message, exchange, routing)
channel.basic_publish(exchange = exchange,
Expand All @@ -146,6 +146,8 @@ def publish(message, exchange, routing, correlation_id):
properties = pika.BasicProperties(correlation_id=correlation_id,
content_type='application/json',
delivery_mode=2))
LOG.remove_correlation_id()


def consume(work, from_queue, to_routing, ack_on_error=True):
'''Blocking function, registering callback ``work`` to be called.
Expand All @@ -165,9 +167,7 @@ def consume(work, from_queue, to_routing, ack_on_error=True):

def process_request(_channel, method_frame, props, body):
correlation_id = props.correlation_id

# Adding correlation ID to context
LOG.add_context(correlation_id)
LOG.add_correlation_id(correlation_id)

message_id = method_frame.delivery_tag
LOG.debug('Consuming message %s', message_id)
Expand All @@ -184,6 +184,7 @@ def process_request(_channel, method_frame, props, body):
if not error or ack_on_error:
LOG.debug('Sending ACK for message: %s', message_id)
_channel.basic_ack(delivery_tag=method_frame.delivery_tag)
LOG.remove_correlation_id()

# Let's do this
LOG.debug('MQ setup')
Expand All @@ -196,10 +197,21 @@ def process_request(_channel, method_frame, props, body):
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
except Exception as e:
connection.close()
break
except (pika.exceptions.ConnectionClosed,
pika.exceptions.ConsumerCancelled,
pika.exceptions.ChannelClosed,
pika.exceptions.ChannelAlreadyClosing,
pika.exceptions.AMQPChannelError,
pika.exceptions.ChannelError,
pika.exceptions.IncompatibleProtocolError) as e:
LOG.debug('Retrying after %s', e)
connection.close()
continue
break

connection.close()
# # Note: Let it raise any other exception and bail out.
# except Exception as e:
# LOG.critical('%r', e)
# connection.close()
# break
# #sys.exit(2)
46 changes: 21 additions & 25 deletions lega/utils/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,28 @@ def log_trace():
LOG.error('Exception: %s in %s on line: %s', exc_type, fname, lineno)

def handle_error(e, data):
try:
# Re-raise in case of AssertionError
if isinstance(e,AssertionError):
raise e

# Is it from the user?
from_user = isinstance(e,FromUser) or isinstance(e,ValueError)
# Re-raise in case of AssertionError
if isinstance(e,AssertionError):
raise e

# Is it from the user?
from_user = isinstance(e,FromUser) or isinstance(e,ValueError)

cause = e.__cause__ or e
LOG.error('%r', cause) # repr(cause) = Technical
cause = e.__cause__ or e
LOG.error('%r', cause) # repr(cause) = Technical

# Locate the error
log_trace()
# Locate the error
log_trace()

file_id = data.get('file_id', None) # should be there
if file_id:
set_error(file_id, cause, from_user)
LOG.debug('Catching error on file id: %s', file_id)
if from_user: # Send to CentralEGA
org_msg = data.pop('org_msg', None) # should be there
org_msg['reason'] = str(cause) # str = Informal
LOG.info('Sending user error to local broker: %s', org_msg)
publish(org_msg, 'cega', 'files.error', correlation_id)
except Exception as e2:
LOG.error('While treating "%s", we caught "%r"', e, e2)
print(correlation_id, '|', repr(e), 'caused', repr(e2), file=sys.stderr)

file_id = data.get('file_id', None) # should be there
if file_id:
set_error(file_id, cause, from_user)
LOG.debug('Catching error on file id: %s', file_id)
if from_user: # Send to CentralEGA
org_msg = data.pop('org_msg', None) # should be there
org_msg['reason'] = str(cause) # str = Informal
LOG.info('Sending user error to local broker: %s', org_msg)
publish(org_msg, 'cega', 'files.error', correlation_id)

def catch(ret_on_error=None):
'''Decorator to store the raised exception in the database'''
Expand All @@ -74,12 +68,14 @@ def wrapper(*args, **kwargs):
data = args[-1] # data is the last argument

# Adding correlation ID to context
LOG.add_context(correlation_id)
LOG.add_correlation_id(correlation_id)

handle_error(e, data)
# Note: let it fail and bail out if handle_error raises an exception itself

# Should we also revert back the ownership of the file?

LOG.remove_correlation_id()
return ret_on_error
return wrapper
return catch_error
6 changes: 4 additions & 2 deletions lega/utils/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ def format(self, record):


class LEGALogger(logging.LoggerAdapter):
correlation_id = None

def __init__(self, name):
logger = logging.getLogger(name)
super().__init__(logger, { 'correlation_id': None })

def add_context(self, correlation_id):
def add_correlation_id(self, correlation_id):
self.extra['correlation_id'] = correlation_id

def remove_correlation_id(self):
self.extra['correlation_id'] = None
4 changes: 2 additions & 2 deletions lega/verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
def _work(mover, correlation_id, data):
'''Verifying that the file in the vault can be properly decrypted.'''

# Adding correlation ID to context
LOG.add_context(correlation_id)
LOG.add_correlation_id(correlation_id)

LOG.info('Verification | message: %s', data)

Expand Down Expand Up @@ -95,6 +94,7 @@ def _work(mover, correlation_id, data):
org_msg['decrypted_checksums'] = [{ 'type': 'sha256', 'value': checksum },
{ 'type': 'md5', 'value': md5_digest }] # for stable id
LOG.debug("Reply message: %s", org_msg)
LOG.remove_correlation_id()
return (org_msg, False)

@configure
Expand Down

0 comments on commit e47fa8f

Please sign in to comment.