Skip to content

Commit

Permalink
Merge pull request #86 from EGA-archive/logging
Browse files Browse the repository at this point in the history
Logging
  • Loading branch information
silverdaz committed Dec 10, 2019
2 parents 4b35bbc + a6e5082 commit 4041f34
Show file tree
Hide file tree
Showing 23 changed files with 210 additions and 473 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/testsuite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
matrix:
# os: [ubuntu-latest, macOS-latest]
os: [ubuntu-latest]
bootstrap: ['', '--archive-backend posix']
bootstrap: ['', '--archive-backend s3']
# bats: ['integration', 'security', 'robustness']
bats: ['integration']

Expand Down
2 changes: 1 addition & 1 deletion deploy/bootstrap/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ EXTRAS=${HERE}/../../extras
VERBOSE=no
FORCE=yes
OPENSSL=openssl
ARCHIVE_BACKEND=s3
ARCHIVE_BACKEND=posix
HOSTNAME_DOMAIN='' #".localega"

PYTHONEXEC=python
Expand Down
11 changes: 10 additions & 1 deletion lega/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,13 @@

# Set default logging handler to avoid "No handler found" warnings.
import logging
logging.getLogger(__name__).addHandler(logging.NullHandler())

# This updates the logging class from all loggers used in this package.
# The new logging class injects a correlation id to the log record.
from .utils.logging import LEGALogger
logging.setLoggerClass(LEGALogger)

# Send warnings using the package warnings to the logging system
# The warnings are logged to a logger named 'py.warnings' with a severity of WARNING.
# See: https://docs.python.org/3/library/logging.html#integration-with-the-warnings-module
logging.captureWarnings(True)
2 changes: 1 addition & 1 deletion lega/conf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def _load_log_conf(self, args=None):
print('Error with --log:', repr(e), file=sys.stderr)

def setup(self, args=None, encoding='utf-8'):
"""Setup, that is all."""
"""Set up."""
self._load_conf(args, encoding)
self._load_log_conf(args)

Expand Down
73 changes: 0 additions & 73 deletions lega/conf/keys.py

This file was deleted.

6 changes: 6 additions & 0 deletions lega/conf/loggers/debug.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ loggers:
handlers: [debugFile,console]
propagate: True
qualname: lega
py.warnings:
level: WARNING
handlers: [debugFile]


handlers:
Expand All @@ -31,5 +34,8 @@ formatters:
style: '{'
datefmt: '%Y-%m-%d %H:%M:%S'
simple:
format: '[{correlation_id}][{name:^10}][{levelname:^6}] (L{lineno}) {message}'
style: '{'
simple_no_correlation:
format: '[{name:^10}][{levelname:^6}] (L{lineno}) {message}'
style: '{'
25 changes: 0 additions & 25 deletions lega/conf/loggers/pgp.yaml

This file was deleted.

4 changes: 2 additions & 2 deletions lega/finalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ def work(data):
"""Read a message containing the ids and add it to the database."""
file_id = data['file_id']
stable_id = data['stable_id']
LOG.info(f"Mapping {file_id} to stable_id {stable_id}")
LOG.info("Mapping file_id %s to stable_id %s", file_id, stable_id)

# Remove file from the inbox
# TODO

db.set_stable_id(file_id, stable_id) # That will flag the entry as 'Ready'

LOG.info(f"Stable ID {stable_id} mapped to {file_id}")
LOG.info("Stable ID %s mapped to %s", stable_id, file_id)
return None


Expand Down
18 changes: 6 additions & 12 deletions lega/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from .conf import CONF
from .utils import db, exceptions, sanitize_user_id, storage
from .utils.amqp import consume, publish, get_connection
from .utils.amqp import consume, get_connection

LOG = logging.getLogger(__name__)

Expand All @@ -48,7 +48,7 @@ def get_header(input_file):
def work(fs, inbox_fs, channel, data):
"""Read a message, split the header and send the remainder to the backend store."""
filepath = data['filepath']
LOG.info(f"Processing {filepath}")
LOG.info('Processing %s', filepath)

# Remove the host part of the user name
user_id = sanitize_user_id(data['user'])
Expand All @@ -75,30 +75,24 @@ def work(fs, inbox_fs, channel, data):
# Record in database
db.mark_in_progress(file_id)

# Sending a progress message to CentralEGA
org_msg['status'] = 'PROCESSING'
LOG.debug(f'Sending message to CentralEGA: {data}')
publish(org_msg, channel, 'cega', 'files.processing')
org_msg.pop('status', None)

# Strip the header out and copy the rest of the file to the archive
LOG.debug('Opening %s', filepath)
with inbox.open(filepath, 'rb') as infile:
LOG.debug(f'Reading header | file_id: {file_id}')
LOG.debug('Reading header | file_id: %s', file_id)
header_bytes = get_header(infile)
header_hex = header_bytes.hex()
data['header'] = header_hex
db.store_header(file_id, header_hex) # header bytes will be .hex()

target = fs.location(file_id)
LOG.info(f'[{fs.__class__.__name__}] Moving the rest of {filepath} to {target}')
LOG.info('[%s] Moving the rest of %s to %s', fs.__class__.__name__, filepath, target)
target_size = fs.copy(infile, target) # It will copy the rest only

LOG.info(f'Archive copying completed. Updating database')
LOG.info('Archive copying completed. Updating database')
db.set_archived(file_id, target, target_size)
data['archive_path'] = target

LOG.debug(f"Reply message: {data}")
LOG.debug("Reply message: %s", data)
return data


Expand Down
14 changes: 0 additions & 14 deletions lega/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,5 @@
"""Utility functions used internally."""

import logging

LOG = logging.getLogger(__name__)


def get_file_content(f, mode='rb'):
"""Retrieve a file content."""
try:
with open(f, mode) as h:
return h.read()
except OSError as e:
LOG.error(f'Error reading {f}: {e!r}')
return None


def sanitize_user_id(user):
"""Return username without host part of an ID on the form name@something."""
Expand Down
48 changes: 27 additions & 21 deletions lega/utils/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import logging
import json
import uuid
import ssl

import pika

from ..conf import CONF
from .logging import _cid

LOG = logging.getLogger(__name__)

Expand All @@ -19,9 +19,9 @@ def get_connection(domain, blocking=True):
heartbeat values are read from the CONF argument.
So are the SSL options.
"""
LOG.info(f'Getting a connection to {domain}')
LOG.info('Getting a connection to %s', domain)
params = CONF.get_value(domain, 'connection', raw=True)
LOG.debug(f"Initializing a connection to: {params}")
LOG.debug("Initializing a connection to: %s", params)
connection_params = pika.connection.URLParameters(params)

# Handling the SSL options
Expand Down Expand Up @@ -65,11 +65,13 @@ def get_connection(domain, blocking=True):

def publish(message, channel, exchange, routing, correlation_id=None):
"""Send a message to the local broker with ``path`` was updated."""
LOG.debug(f'Sending to exchange: {exchange} [routing key: {routing}]')
correlation_id = correlation_id or _cid.get()
assert(correlation_id), "You should not publish without a correlation id"
LOG.debug('Sending to exchange: %s [routing key: %s]', exchange, routing, extra={'correlation_id': correlation_id})
channel.basic_publish(exchange, # exchange
routing, # routing_key
json.dumps(message), # body
properties=pika.BasicProperties(correlation_id=correlation_id or str(uuid.uuid4()),
properties=pika.BasicProperties(correlation_id=correlation_id,
content_type='application/json',
delivery_mode=2))

Expand All @@ -89,28 +91,32 @@ def consume(work, connection, from_queue, to_routing):
"""
assert(from_queue)

LOG.debug(f'Consuming message from {from_queue}')
LOG.debug('Consuming message from %s', from_queue)

from_channel = connection.channel()
from_channel.basic_qos(prefetch_count=1) # One job per worker
to_channel = connection.channel()

def process_request(channel, method_frame, props, body):
correlation_id = props.correlation_id
message_id = method_frame.delivery_tag
LOG.debug(f'Consuming message {message_id} (Correlation ID: {correlation_id})')

# Process message in JSON format
answer = work(json.loads(body)) # Exceptions should be already caught

# Publish the answer
if answer:
assert(to_routing)
publish(answer, to_channel, 'lega', to_routing, correlation_id=props.correlation_id)

# Acknowledgment: Cancel the message resend in case MQ crashes
LOG.debug(f'Sending ACK for message {message_id} (Correlation ID: {correlation_id})')
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
try:
correlation_id = props.correlation_id
_cid.set(correlation_id)
message_id = method_frame.delivery_tag
LOG.debug('Consuming message %s', message_id, extra={'correlation_id': correlation_id})

# Process message in JSON format
answer = work(json.loads(body)) # Exceptions should be already caught

# Publish the answer
if answer:
assert(to_routing)
publish(answer, to_channel, 'lega', to_routing, correlation_id=correlation_id)

# Acknowledgment: Cancel the message resend in case MQ crashes
LOG.debug('Sending ACK for message %s', message_id)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
finally:
_cid.set(None)

# Let's do this
try:
Expand Down
4 changes: 2 additions & 2 deletions lega/utils/checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def calculate(filepath, algo, bsize=8192):
m.update(data)
return m.hexdigest()
except OSError as e:
LOG.error(f'Unable to calculate checksum: {e!r}')
LOG.error('Unable to calculate checksum: %r', e)
return None


Expand Down Expand Up @@ -68,7 +68,7 @@ def get_from_companion(filepath):
with open(companion, 'rt', encoding='utf-8') as f:
return f.read(), h
except OSError as e: # Not found, not readable, ...
LOG.debug(f'Companion {companion}: {e!r}')
LOG.debug('Companion %s: %r', companion, e)
# Check the next

else: # no break statement was encountered
Expand Down

0 comments on commit 4041f34

Please sign in to comment.