Skip to content

Commit

Permalink
Adding correlation_id everywhere available
Browse files Browse the repository at this point in the history
  • Loading branch information
silverdaz committed Nov 13, 2018
1 parent e0d0f0d commit b869199
Show file tree
Hide file tree
Showing 16 changed files with 126 additions and 73 deletions.
4 changes: 0 additions & 4 deletions lega/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@

'''Clean up script manually triggered'''

import logging

LOG = logging.getLogger(__name__)

def main():
raise NotImplementedError()

Expand Down
9 changes: 8 additions & 1 deletion lega/conf/loggers/debug.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ loggers:
qualname: lega
crypt4gh:
level: DEBUG
handlers: [debugFile,console]
handlers: [crypt4gh]
propagate: True
qualname: lega

Expand All @@ -24,6 +24,10 @@ handlers:
class: logging.StreamHandler
formatter: simple
stream: ext://sys.stderr
crypt4gh:
class: logging.StreamHandler
formatter: simple_no_correlation
stream: ext://sys.stderr
debugFile:
class: logging.FileHandler
formatter: lega
Expand All @@ -36,5 +40,8 @@ formatters:
style: '{'
datefmt: '%Y-%m-%d %H:%M:%S'
simple:
format: '[{correlation_id}][{name:^10}][{levelname:^6}] (L{lineno}) {message}'
style: '{'
simple_no_correlation:
format: '[{name:^10}][{levelname:^6}] (L{lineno}) {message}'
style: '{'
10 changes: 6 additions & 4 deletions lega/finalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@
Note that the upstream is registered via an authenticated mechanism, and uses AMQPS.
'''

import logging

from .conf import configure
from .utils import db, errors, sanitize_user_id
from .utils.amqp import consume
from .utils.logging import LEGALogger

LOG = logging.getLogger(__name__)
LOG = LEGALogger(__name__)

@errors.catch(ret_on_error=(None,True))
def _work(correlation_id, data):
'''Reads a message containing the ids and add it to the database.'''

LOG.info("[%s] Finalizing Stable ID for %s", correlation_id, data)
# Adding correlation ID to context
LOG.add_context(correlation_id)

LOG.info("Finalizing Stable ID for %s", data)

# Clean up username
data['user'] = sanitize_user_id(data['user'])
Expand Down
31 changes: 17 additions & 14 deletions lega/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
'''

import sys
import logging
from pathlib import Path
from functools import partial
import hashlib
Expand All @@ -29,20 +28,24 @@
from .conf import CONF, configure
from .utils import db, exceptions, errors, checksum, sanitize_user_id, storage
from .utils.amqp import consume, publish
from .utils.logging import LEGALogger

LOG = logging.getLogger(__name__)
LOG = LEGALogger(__name__)

@errors.catch(ret_on_error=(None,True))
def _work(fs, correlation_id, data):
'''Reads a message, splits the header and sends the remainder to the backend store.'''

# Adding correlation ID to context
LOG.add_context(correlation_id)

# Keeping data as-is (cuz the decorator is using it)
# It will be augmented, but we keep the original data first
org_msg = data.copy()
data['org_msg'] = org_msg

filepath = data['file_path']
LOG.info("[%s] Processing %s", correlation_id, filepath)
LOG.info("Processing %s", filepath)

# Use user_id, and not elixir_id
user_id = sanitize_user_id(data['user'])
Expand All @@ -53,11 +56,11 @@ def _work(fs, correlation_id, data):

# Find inbox
inbox = Path(CONF.get_value('inbox', 'location', raw=True) % user_id)
LOG.info("[%s] Inbox area: %s", correlation_id, inbox)
LOG.info("Inbox area: %s", inbox)

# Check if file is in inbox
inbox_filepath = inbox / filepath.lstrip('/')
LOG.info("[%s] Inbox file path: %s", correlation_id, inbox_filepath)
LOG.info("Inbox file path: %s", inbox_filepath)

# Record in database
if not inbox_filepath.exists():
Expand All @@ -77,28 +80,28 @@ def _work(fs, correlation_id, data):
# Sending a progress message to CentralEGA
cega_msg = org_msg.copy()
cega_msg['status'] = 'PROCESSING'
LOG.debug('[%s] Sending message to CentralEGA: %s', correlation_id, cega_msg)
publish(cega_msg, 'cega', 'files.processing', correlation_id=correlation_id)
LOG.debug('Sending message to CentralEGA: %s', cega_msg)
publish(cega_msg, 'cega', 'files.processing', correlation_id)

# Strip the header out and copy the rest of the file to the vault
LOG.debug('[%s] Opening %s', correlation_id, inbox_filepath)
LOG.debug('Opening %s', inbox_filepath)
with open(inbox_filepath, 'rb') as infile:
LOG.debug('[%s] Reading header | file_id: %s', correlation_id, file_id)
LOG.debug('Reading header | file_id: %s', file_id)
header = Header.from_stream(infile)

LOG.info('[%s] Parsed HEADER: %s', correlation_id, header)
LOG.info('Parsed HEADER: %s', header)

LOG.info('[%s] Adding header to database', correlation_id)
LOG.info('Adding header to database')
header_hex = bytes(header).hex()
data['header'] = header_hex
db.update(file_id, { 'header': header_hex,
'version': header.version })

target = fs.location(file_id)
LOG.info(f'[%s] [%s] Moving the rest of %s to %s', correlation_id, fs.__class__.__name__, filepath, target)
LOG.info(f'[%s] Moving the rest of %s to %s', fs.__class__.__name__, filepath, target)
target_size = fs.copy(infile, target) # It will copy the rest only

LOG.info('[%s] Vault copying completed. Updating database', correlation_id)
LOG.info('Vault copying completed. Updating database')
storage_type = None
if isinstance(fs, storage.S3Storage):
storage_type = 'S3'
Expand All @@ -111,7 +114,7 @@ def _work(fs, correlation_id, data):
data['vault_path'] = target
data['vault_type'] = storage_type

LOG.debug("[%s] Reply message: %s", correlation_id, data)
LOG.debug("Reply message: %s", data)
return (data, False)

@configure
Expand Down
9 changes: 6 additions & 3 deletions lega/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import sys
import logging
import os
import uuid
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
Expand All @@ -21,8 +22,9 @@
from .conf import CONF, configure
from .utils.amqp import publish
from .utils.checksum import calculate
from .utils.logging import LEGALogger

LOG = logging.getLogger(__name__)
LOG = LEGALogger(__name__)

class Forwarder(asyncio.Protocol):

Expand Down Expand Up @@ -79,8 +81,9 @@ def send_message(self, username, filename):
c = calculate(filepath, 'sha256')
if c:
msg['encrypted_checksums'] = [{'type': 'sha256', 'value': c}]
# Sending
publish(msg, 'cega', 'files.inbox')
# Sending (will create a correlation id)
correlation_id = str(uuid.uuid4())
publish(msg, 'cega', 'files.inbox', correlation_id)

def connection_lost(self, exc):
if self.buf:
Expand Down
4 changes: 2 additions & 2 deletions lega/outgest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
'''

import sys
import logging
import ssl
from pathlib import Path
import asyncio
Expand All @@ -22,8 +21,9 @@
from aiohttp import web, ClientSession, ClientTimeout

from .conf import CONF, configure
from .utils.logging import LEGALogger

LOG = logging.getLogger(__name__)
LOG = LEGALogger(__name__)

async def outgest(r):
# Get token from header
Expand Down
3 changes: 2 additions & 1 deletion lega/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@
from .conf import CONF, configure
from .utils import storage
from .utils import async_db as db
from .utils.logging import LEGALogger

LOG = logging.getLogger(__name__)
LOG = LEGALogger(__name__)

####################################

Expand Down
3 changes: 0 additions & 3 deletions lega/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
"""

import os
import logging

LOG = logging.getLogger(__name__)

def get_secret(f, mode='rt'):
with open(f, mode) as s:
Expand Down
24 changes: 15 additions & 9 deletions lega/utils/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
from time import sleep

from ..conf import CONF
from .logging import LEGALogger

LOG = logging.getLogger(__name__)
LOG = LEGALogger(__name__)

class AMQPConnection():
conn = None
Expand Down Expand Up @@ -124,25 +125,26 @@ def close(self):
if self.chann and not self.chann.is_closed and not self.chann.is_closing:
self.chann.close()
self.chann = None
if self.connection and not self.connection.is_closed and not self.connection.is_closing:
if self.conn and not self.conn.is_closed and not self.conn.is_closing:
self.conn.close()
self.conn = None


connection = AMQPConnection()

def publish(message, exchange, routing, correlation_id=None):
def publish(message, exchange, routing, correlation_id):
'''
Sending a message to the local broker exchange using the given routing key.
If the correlation_id is specified, it is forwarded.
If not, a new one is generated (as a uuid4 string).
The correlation_id must be specified (and then forwarded).
'''
assert( correlation_id )
LOG.add_context(correlation_id)
with connection.channel() as channel:
LOG.debug('[%s] Sending %s to exchange: %s [routing key: %s]', correlation_id, message, exchange, routing)
LOG.debug('Sending %s to exchange: %s [routing key: %s]', message, exchange, routing)
channel.basic_publish(exchange = exchange,
routing_key = routing,
body = json.dumps(message),
properties = pika.BasicProperties(correlation_id=correlation_id or str(uuid.uuid4()),
properties = pika.BasicProperties(correlation_id=correlation_id,
content_type='application/json',
delivery_mode=2))

Expand All @@ -164,8 +166,12 @@ def consume(work, from_queue, to_routing, ack_on_error=True):

def process_request(_channel, method_frame, props, body):
correlation_id = props.correlation_id

# Adding correlation ID to context
LOG.add_context(correlation_id)

message_id = method_frame.delivery_tag
LOG.debug('[%s] Consuming message %s', correlation_id, message_id)
LOG.debug('Consuming message %s', message_id)

# Process message in JSON format
answer, error = work(correlation_id, json.loads(body) ) # Exceptions should be already caught
Expand All @@ -177,7 +183,7 @@ def process_request(_channel, method_frame, props, body):

# Acknowledgment: Cancel the message resend in case MQ crashes
if not error or ack_on_error:
LOG.debug('[%s] Sending ACK for message', correlation_id, message_id)
LOG.debug('Sending ACK for message: %s', message_id)
_channel.basic_ack(delivery_tag=method_frame.delivery_tag)

# Let's do this
Expand Down
3 changes: 2 additions & 1 deletion lega/utils/async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
import aiopg

from ..conf import CONF
from .logging import LEGALogger

LOG = logging.getLogger(__name__)
LOG = LEGALogger(__name__)

class DBConnection():
pool = None
Expand Down
3 changes: 2 additions & 1 deletion lega/utils/checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import hashlib

from .exceptions import UnsupportedHashAlgorithm, CompanionNotFound
from .logging import LEGALogger

LOG = logging.getLogger(__name__)
LOG = LEGALogger(__name__)

# Main map
_DIGEST = {
Expand Down
4 changes: 2 additions & 2 deletions lega/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

import sys
import traceback
import logging
import psycopg2
from socket import gethostname
from contextlib import contextmanager

from ..conf import CONF
from .logging import LEGALogger

LOG = logging.getLogger(__name__)
LOG = LEGALogger(__name__)

class DBConnection():
conn = None
Expand Down

0 comments on commit b869199

Please sign in to comment.