Skip to content

Commit

Permalink
Adjusting logging for outgest and streamer per request (They are asyn…
Browse files Browse the repository at this point in the history
…c, I can not use add_context)
  • Loading branch information
silverdaz committed Nov 13, 2018
1 parent b869199 commit d533a88
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 48 deletions.
4 changes: 3 additions & 1 deletion deploy/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ private/lega.yml private bootstrap bootstrap-dev:
egarchive/lega --prefix $(PROJECT_NAME) ${ARGS}

up: private/lega.yml
@docker-compose --log-level ERROR -f $< up -d
@docker-compose --log-level ERROR -f $< up -d mq db vault
@sleep 6
@docker-compose --log-level ERROR -f $< up -d inbox ingest verify finalize outgest streamer

down:
-@docker-compose --log-level ERROR down -v
Expand Down
41 changes: 26 additions & 15 deletions lega/outgest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import sys
import ssl
from pathlib import Path
import uuid
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
Expand All @@ -26,60 +27,70 @@
LOG = LEGALogger(__name__)

async def outgest(r):

correlation_id = str(uuid.uuid4())

# Get token from header
auth = r.headers.get('Authorization')
if not auth.lower().startswith('bearer '):
raise web.HTTPBadRequest(reason='Invalid request')
access_token = auth[7:]
LOG.debug('Access Token from header: %s', access_token)
LOG.debug('Access Token from header: %s', access_token, extra={'correlation_id': correlation_id})
if not access_token:
LOG.error('Invalid Access: Missing access_token')
LOG.error('Invalid Access: Missing access_token', extra={'correlation_id': correlation_id})
raise web.HTTPBadRequest(reason='Invalid Request')

# Get POST data and check it
data = await r.post()
stable_id = data.get('stable_id')
if not stable_id:
LOG.error('Invalid Request: Missing stable ID')
LOG.error('Invalid Request: Missing stable ID', extra={'correlation_id': correlation_id})
raise web.HTTPBadRequest(reason='Invalid Request')
# method = data.get('method')
# if not method:
# LOG.error('Invalid Request: Missing reencryption method')
# LOG.error('Invalid Request: Missing reencryption method', extra={'correlation_id': correlation_id})
# raise web.HTTPBadRequest(reason='Invalid Request')
pubkey = data.get('pubkey')
if not pubkey:
LOG.error('Invalid Request: Missing public key for reencryption')
LOG.error('Invalid Request: Missing public key for reencryption', extra={'correlation_id': correlation_id})
raise web.HTTPBadRequest(reason='Invalid Request')

# Check now Permissions, for that stable_id. If 200 OK, then granted
permissions_url = r.app['permissions_url'] % stable_id

async with ClientSession() as session:
LOG.debug('POST Request: %s', permissions_url)
LOG.debug('POST Request: %s', permissions_url, extra={'correlation_id': correlation_id})
async with session.request('GET',
permissions_url,
headers={ 'Authorization': auth, # same as above
'Accept': 'application/json',
'Cache-Control': 'no-cache' }) as response:
'Cache-Control': 'no-cache',
'correlation_id': correlation_id
}) as response:
if response.status > 200:
LOG.error("Invalid permissions for stable_id %s [status: %s]", stable_id, response.status)
LOG.error("Invalid permissions for stable_id %s [status: %s]",
stable_id, response.status,
extra={'correlation_id': correlation_id})
raise web.HTTPBadRequest(text='Invalid request')

# Valid Permissions: Forward to Re-Encryption
LOG.info("Valid Request and Permissions: Forwarding to Re-Encryption Streamer")
LOG.info("Valid Request and Permissions: Forwarding to Re-Encryption Streamer", extra={'correlation_id': correlation_id})
streamer_url = CONF.get_value('DEFAULT', 'streamer_endpoint')
timeout = ClientTimeout(total=CONF.get_value('DEFAULT', 'timeout', conv=int, default=300))
async with ClientSession(timeout=timeout) as session:
LOG.debug('POST Request: %s', streamer_url)
LOG.debug('POST Request: %s', streamer_url, extra={'correlation_id': correlation_id})
async with session.request('POST',
streamer_url,
headers={ 'Content-Type': 'application/json' },
headers={
'Content-Type': 'application/json',
'correlation_id': correlation_id
},
json={ 'stable_id': stable_id,
'pubkey': pubkey,
'client_ip': r.remote }) as response:

LOG.debug('Response: %s', response)
LOG.debug('Response type: %s', response.headers.get('CONTENT-TYPE'))
LOG.debug('Response: %s', response, extra={'correlation_id': correlation_id})
LOG.debug('Response type: %s', response.headers.get('CONTENT-TYPE'), extra={'correlation_id': correlation_id})
if response.status > 200:
raise web.HTTPBadRequest(reason=f'HTTP status code: {response.status}')

Expand Down Expand Up @@ -108,8 +119,8 @@ def main():
if CONF.get_value('DEFAULT', 'enable_ssl', conv=bool, default=True):
ssl_certfile = Path(CONF.get_value('DEFAULT', 'ssl_certfile')).expanduser()
ssl_keyfile = Path(CONF.get_value('DEFAULT', 'ssl_keyfile')).expanduser()
LOG.debug(f'Certfile: {ssl_certfile}')
LOG.debug(f'Keyfile: {ssl_keyfile}')
LOG.debug('Certfile: %s', ssl_certfile, extra={'correlation_id': correlation_id})
LOG.debug('Keyfile: %s', ssl_keyfile, extra={'correlation_id': correlation_id})
sslcontext = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
sslcontext.check_hostname = False
sslcontext.load_cert_chain(ssl_certfile, ssl_keyfile)
Expand Down
37 changes: 21 additions & 16 deletions lega/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import sys
import os
import logging
import io
import asyncio
import uvloop
Expand Down Expand Up @@ -44,14 +43,14 @@ async def init(app):

# Load the LocalEGA private key
key_location = CONF.get_value('DEFAULT', 'private_key')
LOG.info(f'Retrieving the Private Key from {key_location}')
LOG.info('Retrieving the Private Key from %s', key_location)
with open(key_location, 'rt') as k: # text file
privkey = PrivateKey(k.read(), KeyFormatter)
app['private_key'] = privkey

# Load the LocalEGA header signing key
signing_key_location = CONF.get_value('DEFAULT', 'signing_key')
LOG.info(f'Retrieving the Signing Key from {signing_key_location}')
LOG.info('Retrieving the Signing Key from %s', signing_key_location)
if signing_key_location:
with open(signing_key_location, 'rt') as k: # hex file
app['signing_key'] = ed25519.SigningKey(bytes.fromhex(k.read()))
Expand All @@ -76,20 +75,25 @@ async def shutdown(app):
def request_context(func):
async def wrapper(r):

LOG.debug('Init Context')
correlation_id = r.headers.get('correlation_id')
if not correlation_id:
LOG.error('No correlation id in the header')
#raise web.HTTPBadRequest(reason='Invalid request')

LOG.debug('Init Context', extra={'correlation_id': correlation_id})

# Getting post data
data = await r.json()
LOG.debug('Data: %s', data)
LOG.debug('Data: %s', data, extra={'correlation_id': correlation_id})

stable_id = data.get('stable_id')
if not stable_id: # It should be there. Assertion instead?
LOG.error('Missing stable ID')
LOG.error('Missing stable ID', extra={'correlation_id': correlation_id})
raise web.HTTPUnprocessableEntity(reason='Missing stable ID')

pubkey = data.get('pubkey')
if not pubkey: # It should be there. Assertion instead?
LOG.error('Missing public key for the re-encryption')
LOG.error('Missing public key for the re-encryption', extra={'correlation_id': correlation_id})
raise web.HTTPUnprocessableEntity(reason='Missing public key')
# Load it
pubkey = PublicKey(pubkey, KeyFormatter)
Expand All @@ -110,27 +114,28 @@ async def wrapper(r):
start_coordinate=startCoordinate,
end_coordinate=endCoordinate)
if not request_info:
LOG.error('Unable to create a request entry')
LOG.error('Unable to create a request entry', extra={'correlation_id': correlation_id})
raise web.HTTPServiceUnavailable(reason='Unable to process request')

# Request started
request_id, header, vault_path, vault_type, _, _, _ = request_info

# Set up file transfer type
LOG.info('Loading the vault handler: %s', vault_type)
LOG.info('Loading the vault handler: %s', vault_type, extra={'correlation_id': correlation_id})
if vault_type == 'S3':
mover = storage.S3Storage()
elif vault_type == 'POSIX':
mover = storage.FileStorage()
else:
LOG.error('Invalid storage method: %s', vault_type)
LOG.error('Invalid storage method: %s', vault_type, extra={'correlation_id': correlation_id})
raise web.HTTPUnprocessableEntity(reason='Unsupported storage type')

async def db_update(message):
await db.update_status(request_id, message)

# Do the job
response, dlsize = await func(r,
correlation_id,
db_update,
pubkey,
r.app['private_key'],
Expand All @@ -147,28 +152,28 @@ async def db_update(message):
if isinstance(err,AssertionError):
raise err
cause = err.__cause__ or err
LOG.error(f'{cause!r}') # repr = Technical
LOG.error('%r', cause, extra={'correlation_id': correlation_id}) # repr = Technical
if request_id:
await db.set_error(request_id, cause)
raise web.HTTPServiceUnavailable(reason='Unable to process request')
return wrapper


@request_context
async def outgest(r, set_progress, pubkey, privkey, signing_key, header, vault_path, mover, chunk_size=1<<22):
async def outgest(r, correlation_id, set_progress, pubkey, privkey, signing_key, header, vault_path, mover, chunk_size=1<<22):

# Crypt4GH encryption
await set_progress('REENCRYPTING')
LOG.info('Re-encrypting the header') # in hex -> bytes, and take away 16 bytes
LOG.info('Re-encrypting the header', extra={'correlation_id': correlation_id}) # in hex -> bytes, and take away 16 bytes
header_obj = Header.from_stream(io.BytesIO(bytes.fromhex(header)))
reencrypted_header = header_obj.reencrypt(pubkey, privkey, signing_key=signing_key)
renc_header = bytes(reencrypted_header)
LOG.debug('Org header %s', header)
LOG.debug('Reenc header %s', renc_header.hex())
LOG.debug('Org header %s', header, extra={'correlation_id': correlation_id})
LOG.debug('Reenc header %s', renc_header.hex(), extra={'correlation_id': correlation_id})

# Read the rest from the vault
await set_progress('STREAMING')
LOG.info('Opening vault file: %s', vault_path)
LOG.info('Opening vault file: %s', vault_path, extra={'correlation_id': correlation_id})
with mover.open(vault_path, 'rb') as vfile:

# Ready to answer
Expand Down
17 changes: 1 addition & 16 deletions lega/utils/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,12 @@ def format(self, record):
return json.dumps(log_record) #, ensure_ascii=False)


# class LEGALogger(logging.LoggerAdapter):
# correlation_id = None

# def __init__(self, name):
# logger = logging.getLogger(name)
# super().__init__(logger, {})

# def add_context(self, correlation_id):
# self.correlation_id = correlation_id

# def process(self, msg, kwargs):
# if self.correlation_id:
# return '[%s] %s' % (self.correlation_id, msg), kwargs
# return msg, kwargs

class LEGALogger(logging.LoggerAdapter):
correlation_id = None

def __init__(self, name):
logger = logging.getLogger(name)
super().__init__(logger, { 'correlation_id': None})
super().__init__(logger, { 'correlation_id': None })

def add_context(self, correlation_id):
self.extra['correlation_id'] = correlation_id

0 comments on commit d533a88

Please sign in to comment.