Skip to content

Commit

Permalink
Make all logs print the correlation id if they have one
Browse files Browse the repository at this point in the history
  • Loading branch information
silverdaz committed Nov 7, 2018
1 parent f12b441 commit ee4ea16
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 45 deletions.
4 changes: 3 additions & 1 deletion deploy/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ private/lega.yml private bootstrap bootstrap-dev:
egarchive/lega --prefix $(PROJECT_NAME) ${ARGS}

up: private/lega.yml
@docker-compose --log-level ERROR -f $< up -d
@docker-compose --log-level ERROR -f $< up -d db mq vault inbox
@sleep 5 # Allow mq to be ready with its users, so other containers can consume from the queues
@docker-compose --log-level ERROR -f $< up -d ingest verify finalize outgest streamer

down:
-@docker-compose --log-level ERROR down -v
Expand Down
6 changes: 3 additions & 3 deletions lega/finalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
LOG = logging.getLogger(__name__)

@errors.catch(ret_on_error=(None,True))
def work(data):
def _work(correlation_id, data):
'''Reads a message containing the ids and add it to the database.'''

LOG.info("Finalizing Stable ID for %s",data)
LOG.info("[%s] Finalizing Stable ID for %s", correlation_id, data)

# Clean up username
data['user'] = sanitize_user_id(data['user'])
Expand All @@ -49,7 +49,7 @@ def work(data):
@configure
def main():
# upstream link configured in local broker
consume(work, 'stableIDs', None, ack_on_error=True) # on error, don't retry the message
consume(_work, 'stableIDs', None, ack_on_error=True) # on error, don't retry the message

if __name__ == '__main__':
main()
31 changes: 17 additions & 14 deletions lega/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@

from .conf import CONF, configure
from .utils import db, exceptions, errors, checksum, sanitize_user_id, storage
from .utils.amqp import consume, publish, tell_cega
from .utils.amqp import consume, publish

LOG = logging.getLogger(__name__)

@errors.catch(ret_on_error=(None,True))
def work(fs, data):
def _work(fs, correlation_id, data):
'''Reads a message, splits the header and sends the remainder to the backend store.'''

# Keeping data as-is (cuz the decorator is using it)
Expand All @@ -42,7 +42,7 @@ def work(fs, data):
data['org_msg'] = org_msg

filepath = data['file_path']
LOG.info(f"Processing {filepath}")
LOG.info("[%s] Processing %s", correlation_id, filepath)

# Use user_id, and not elixir_id
user_id = sanitize_user_id(data['user'])
Expand All @@ -53,11 +53,11 @@ def work(fs, data):

# Find inbox
inbox = Path(CONF.get_value('inbox', 'location', raw=True) % user_id)
LOG.info(f"Inbox area: {inbox}")
LOG.info("[%s] Inbox area: %s", correlation_id, inbox)

# Check if file is in inbox
inbox_filepath = inbox / filepath.lstrip('/')
LOG.info(f"Inbox file path: {inbox_filepath}")
LOG.info("[%s] Inbox file path: %s", correlation_id, inbox_filepath)

# Record in database
if not inbox_filepath.exists():
Expand All @@ -75,27 +75,30 @@ def work(fs, data):
})

# Sending a progress message to CentralEGA
tell_cega(org_msg, 'PROCESSING')
cega_msg = org_msg.copy()
cega_msg['status'] = 'PROCESSING'
LOG.debug('[%s] Sending message to CentralEGA: %s', correlation_id, cega_msg)
publish(cega_msg, 'cega', 'files.processing', correlation_id=correlation_id)

# Strip the header out and copy the rest of the file to the vault
LOG.debug(f'Opening {inbox_filepath}')
LOG.debug('[%s] Opening %s', correlation_id, inbox_filepath)
with open(inbox_filepath, 'rb') as infile:
LOG.debug(f'Reading header | file_id: {file_id}')
LOG.debug('[%s] Reading header | file_id: %s', correlation_id, file_id)
header = Header.from_stream(infile)

LOG.info('Parsed HEADER: %s', header)
LOG.info('[%s] Parsed HEADER: %s', correlation_id, header)

LOG.info(f'Adding header to database')
LOG.info('[%s] Adding header to database', correlation_id)
header_hex = bytes(header).hex()
data['header'] = header_hex
db.update(file_id, { 'header': header_hex,
'version': header.version })

target = fs.location(file_id)
LOG.info(f'[{fs.__class__.__name__}] Moving the rest of {filepath} to {target}')
LOG.info(f'[%s] [%s] Moving the rest of %s to %s', correlation_id, fs.__class__.__name__, filepath, target)
target_size = fs.copy(infile, target) # It will copy the rest only

LOG.info(f'Vault copying completed. Updating database')
LOG.info('[%s] Vault copying completed. Updating database', correlation_id)
storage_type = None
if isinstance(fs, storage.S3Storage):
storage_type = 'S3'
Expand All @@ -108,14 +111,14 @@ def work(fs, data):
data['vault_path'] = target
data['vault_type'] = storage_type

LOG.debug(f"Reply message: {data}")
LOG.debug("[%s] Reply message: %s", correlation_id, data)
return (data, False)

@configure
def main():

fs = getattr(storage, CONF.get_value('vault', 'driver', default='FileStorage'))
do_work = partial(work, fs())
do_work = partial(_work, fs())

# upstream link configured in local broker
consume(do_work, 'files', 'archived')
Expand Down
21 changes: 8 additions & 13 deletions lega/utils/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@ def open_channel(force_reconnect=False):

def publish(message, exchange, routing, correlation_id=None):
'''
Sending a message to the local broker with ``path`` was updated
Sending a message to the local broker exchange using the given routing key.
If the correlation_id is specified, it is forwarded.
If not, a new one is generated (as a uuid4 string).
'''
open_channel()
LOG.debug(f'Sending {message} to exchange: {exchange} [routing key: {routing}]')
LOG.debug('[%s] Sending %s to exchange: %s [routing key: %s]', correlation_id, message, exchange, routing)
channel.basic_publish(exchange = exchange,
routing_key = routing,
body = json.dumps(message),
Expand All @@ -109,19 +111,19 @@ def consume(work, from_queue, to_routing, ack_on_error=True):
def process_request(_channel, method_frame, props, body):
correlation_id = props.correlation_id
message_id = method_frame.delivery_tag
LOG.debug(f'Consuming message {message_id} (Correlation ID: {correlation_id})')
LOG.debug('[%s] Consuming message %s', correlation_id, message_id)

# Process message in JSON format
answer, error = work( json.loads(body) ) # Exceptions should be already caught
answer, error = work(correlation_id, json.loads(body) ) # Exceptions should be already caught

# Publish the answer
if answer:
assert( to_routing )
publish(answer, 'lega', to_routing, correlation_id = props.correlation_id)
publish(answer, 'lega', to_routing, correlation_id=correlation_id)

# Acknowledgment: Cancel the message resend in case MQ crashes
if not error or ack_on_error:
LOG.debug(f'Sending ACK for message {message_id} (Correlation ID: {correlation_id})')
LOG.debug('[%s] Sending ACK for message', correlation_id, message_id)
_channel.basic_ack(delivery_tag=method_frame.delivery_tag)

# Let's do this
Expand All @@ -141,10 +143,3 @@ def process_request(_channel, method_frame, props, body):
break

close()


def tell_cega(message, status):
message['status'] = status
LOG.debug(f'Sending message to CentralEGA: {message}')
publish(message, 'cega', 'files.processing')
message.pop('status', None)
6 changes: 3 additions & 3 deletions lega/utils/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def log_trace():
fname = frame.f_code.co_filename
LOG.error(f'Exception: {exc_type} in {fname} on line: {lineno}')

def handle_error(e, data):
def handle_error(e, correlation_id, data):
try:
# Re-raise in case of AssertionError
if isinstance(e,AssertionError):
Expand All @@ -49,12 +49,12 @@ def handle_error(e, data):
file_id = data.get('file_id', None) # should be there
if file_id:
set_error(file_id, cause, from_user)
LOG.debug('Catching error on file id: %s', file_id)
LOG.debug('[%s] Catching error on file id: %s', correlation_id, file_id)
if from_user: # Send to CentralEGA
org_msg = data.pop('org_msg', None) # should be there
org_msg['reason'] = str(cause) # str = Informal
LOG.info(f'Sending user error to local broker: {org_msg}')
publish(org_msg, 'cega', 'files.error')
publish(org_msg, 'cega', 'files.error', correlation_id=correlation_id)
except Exception as e2:
LOG.error(f'While treating "{e}", we caught "{e2!r}"')
print(repr(e), 'caused', repr(e2), file=sys.stderr)
Expand Down
22 changes: 11 additions & 11 deletions lega/verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,27 @@
LOG = logging.getLogger(__name__)

@errors.catch(ret_on_error=(None,True))
def work(mover, data):
def _work(mover, correlation_id, data):
'''Verifying that the file in the vault can be properly decrypted.'''

LOG.info('Verification | message: %s', data)
LOG.info('[%s] Verification | message: %s', correlation_id, data)

file_id = data['file_id']

if db.is_disabled(file_id):
LOG.info('Operation canceled because database entry marked as DISABLED (for file_id %s)', file_id)
LOG.info('[%s] Operation canceled because database entry marked as DISABLED (for file_id %s)', correlation_id, file_id)
return None, False # do nothing

header = bytes.fromhex(data['header']) # in hex -> bytes
vault_path = data['vault_path']

# Load the LocalEGA private key
key_location = CONF.get_value('DEFAULT', 'private_key')
LOG.info(f'Retrieving the Private Key from {key_location}')
LOG.info('[%s] Retrieving the Private Key from %s', correlation_id, key_location)
with open(key_location, 'rb') as k:
privkey = PrivateKey(k.read(), KeyFormatter)

LOG.info('Opening vault file: %s', vault_path)
LOG.info('[%s] Opening vault file: %s', correlation_id, vault_path)
# If you can decrypt... the checksum is valid

header = Header.from_stream(io.BytesIO(header))
Expand All @@ -69,9 +69,9 @@ def work(mover, data):

# Convert to hex
checksum = checksum.hex()
LOG.info('Verification completed [sha256: %s]', checksum)
LOG.info('[%s] Verification completed [sha256: %s]', correlation_id, checksum)
md5_digest = md.hexdigest()
LOG.info('Verification completed [md5: %s]', md5_digest)
LOG.info('[%s] Verification completed [md5: %s]', correlation_id, md5_digest)

# Updating the database
db.update(file_id, { 'status': 'COMPLETED',
Expand All @@ -82,22 +82,22 @@ def work(mover, data):

# Send to QC
data.pop('status', None)
LOG.debug(f'Sending message to QC: {data}')
publish(data, 'lega', 'qc') # We keep the org msg in there
LOG.debug('[%s] Sending message to QC: ',correlation_id, data)
publish(data, 'lega', 'qc', correlation_id=correlation_id) # We keep the org msg in there

# Shape successful message
org_msg = data['org_msg']
org_msg.pop('file_id', None)
org_msg['decrypted_checksums'] = [{ 'type': 'sha256', 'value': checksum },
{ 'type': 'md5', 'value': md5_digest }] # for stable id
LOG.debug(f"Reply message: {org_msg}")
LOG.debug("[%s] Reply message: ",correlation_id, org_msg)
return (org_msg, False)

@configure
def main():

fs = getattr(storage, CONF.get_value('vault', 'driver', default='FileStorage'))
do_work = partial(work, fs())
do_work = partial(_work, fs())

consume(do_work, 'archived', 'completed')

Expand Down

0 comments on commit ee4ea16

Please sign in to comment.