Skip to content
This repository has been archived by the owner on Dec 16, 2019. It is now read-only.

Commit

Permalink
Merge 8c1c79e into 5ad62e2
Browse files Browse the repository at this point in the history
  • Loading branch information
viklund committed Sep 12, 2018
2 parents 5ad62e2 + 8c1c79e commit b11231d
Show file tree
Hide file tree
Showing 24 changed files with 172 additions and 151 deletions.
2 changes: 1 addition & 1 deletion lega/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__title__ = 'Local EGA'
__version__ = VERSION = '1.1'
__author__ = 'Frédéric Haziza'
#__license__ = 'Apache 2.0'
__license__ = 'Apache 2.0'
__copyright__ = 'Local EGA @ NBIS Sweden'

# Set default logging handler to avoid "No handler found" warnings.
Expand Down
7 changes: 2 additions & 5 deletions lega/conf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import sys
import os
import configparser
import logging
from logging.config import fileConfig, dictConfig
import lega.utils.logging
from pathlib import Path
import yaml
from hashlib import md5
Expand Down Expand Up @@ -138,9 +136,8 @@ def get_value(self, section, option, conv=str, default=None, raw=False):
``section`` and ``option`` are mandatory while ``conv``, ``default`` (fallback) and ``raw`` are optional.
"""
result = os.environ.get(f'{section.upper()}_{option.upper()}', None)
if result is not None: # it might be empty
if result is not None: # it might be empty
return self._convert(result, conv)
#if self.has_option(section, option):
return self._convert(self.get(section, option, fallback=default, raw=raw), conv)

def _convert(self, value, conv):
Expand All @@ -155,7 +152,7 @@ def _convert(self, value, conv):
else:
raise ValueError(f"Invalid truth value: {val}")
else:
return conv(value) # raise error in case we can't convert an empty value
return conv(value) # raise error in case we can't convert an empty value


CONF = Configuration()
Expand Down
8 changes: 4 additions & 4 deletions lega/conf/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ def main(args=None):
parser = argparse.ArgumentParser(description="Forward message between CentralEGA's broker and the local one",
allow_abbrev=False)
parser.add_argument('--conf', help='configuration file, in INI or YAML format')
parser.add_argument('--log', help='configuration file for the loggers')
parser.add_argument('--log', help='configuration file for the loggers')

parser.add_argument('--list', dest='list_content', action='store_true', help='Lists the content of the configuration file')
pargs = parser.parse_args(args)
CONF.setup( args )

CONF.setup(args)

print(repr(CONF))

Expand Down
15 changes: 8 additions & 7 deletions lega/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from legacryptor.crypt4gh import get_header

from .conf import CONF
from .utils import db, exceptions, checksum, sanitize_user_id, storage
from .utils import db, exceptions, sanitize_user_id, storage
from .utils.amqp import consume, publish, get_connection

LOG = logging.getLogger(__name__)
Expand All @@ -49,7 +49,7 @@ def work(fs, channel, data):

# Insert in database
file_id = db.insert_file(filepath, user_id)
data['file_id'] = file_id # must be there: database error uses it
data['file_id'] = file_id # must be there: database error uses it

# Find inbox
inbox = Path(CONF.get_value('inbox', 'location', raw=True) % user_id)
Expand All @@ -59,7 +59,7 @@ def work(fs, channel, data):
inbox_filepath = inbox / filepath.lstrip('/')
LOG.info(f"Inbox file path: {inbox_filepath}")
if not inbox_filepath.exists():
raise exceptions.NotFoundInInbox(filepath) # return early
raise exceptions.NotFoundInInbox(filepath) # return early

# Ok, we have the file in the inbox

Expand All @@ -71,7 +71,7 @@ def work(fs, channel, data):
LOG.debug(f'Sending message to CentralEGA: {data}')
publish(org_msg, channel, 'cega', 'files.processing')
org_msg.pop('status', None)

# Strip the header out and copy the rest of the file to the vault
LOG.debug(f'Opening {inbox_filepath}')
with open(inbox_filepath, 'rb') as infile:
Expand All @@ -80,11 +80,11 @@ def work(fs, channel, data):

header_hex = (beginning+header).hex()
data['header'] = header_hex
db.store_header(file_id, header_hex) # header bytes will be .hex()
db.store_header(file_id, header_hex) # header bytes will be .hex()

target = fs.location(file_id)
LOG.info(f'[{fs.__class__.__name__}] Moving the rest of {filepath} to {target}')
target_size = fs.copy(infile, target) # It will copy the rest only
target_size = fs.copy(infile, target) # It will copy the rest only

LOG.info(f'Vault copying completed. Updating database')
db.set_archived(file_id, target, target_size)
Expand All @@ -97,7 +97,7 @@ def main(args=None):
if not args:
args = sys.argv[1:]

CONF.setup(args) # re-conf
CONF.setup(args) # re-conf

fs = getattr(storage, CONF.get_value('vault', 'driver', default='FileStorage'))
broker = get_connection('broker')
Expand All @@ -106,5 +106,6 @@ def main(args=None):
# upstream link configured in local broker
consume(do_work, broker, 'files', 'archived')


if __name__ == '__main__':
main()
22 changes: 11 additions & 11 deletions lega/keyserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,11 @@ def _check_limit(self):

def clear(self):
"""Clear all cache."""
#self.store = dict()
self.store.clear()


_cache = None # key IDs are uppercase
_active = None # will be a KeyID (not a key name)
_cache = None # key IDs are uppercase
_active = None # will be a KeyID (not a key name)

####################################
# Caching the keys
Expand Down Expand Up @@ -127,13 +126,13 @@ async def retrieve_active_key(request):
key_type = request.match_info['key_type'].lower()
LOG.debug(f'Requesting active ({key_type}) key')
if key_type not in ('public', 'private'):
return web.HTTPForbidden() # web.HTTPBadRequest()
return web.HTTPForbidden() # web.HTTPBadRequest()
key_format = 'armored' if request.content_type == 'text/plain' else None
if _active is None:
return web.HTTPNotFound()
k = _cache.get(_active, key_type, key_format=key_format)
if k:
return web.Response(body=k) # web.Response(text=k.hex())
return web.Response(body=k) # web.Response(text=k.hex())
else:
LOG.warn(f"Requested active ({key_type}) key not found.")
return web.HTTPNotFound()
Expand All @@ -144,13 +143,13 @@ async def retrieve_key(request):
requested_id = request.match_info['requested_id']
key_type = request.match_info['key_type'].lower()
if key_type not in ('public', 'private'):
return web.HTTPForbidden() # web.HTTPBadRequest()
return web.HTTPForbidden() # web.HTTPBadRequest()
key_id = requested_id[-16:].upper()
key_format = 'armored' if request.content_type == 'text/plain' else None
LOG.debug(f'Requested {key_type.upper()} key with ID {requested_id}')
k = _cache.get(key_id, key_type, key_format=key_format)
if k:
return web.Response(body=k) # web.Response(text=value.hex())
return web.Response(body=k) # web.Response(text=value.hex())
else:
LOG.warn(f"Requested key {requested_id} not found.")
return web.HTTPNotFound()
Expand Down Expand Up @@ -181,7 +180,7 @@ async def healthcheck(request):
@routes.get('/admin/ttl')
async def check_ttl(request):
"""Evict from the cache if TTL expired
and return the keys that survived""" # ehh...why? /Fred
and return the keys that survived""" # ehh...why? /Fred
LOG.debug('Admin TTL')
expire = _cache.check_ttl()
if expire:
Expand All @@ -196,7 +195,8 @@ def load_keys_conf(store):
_cache = Cache()
# Load all the keys in the store
for section in store.sections():
_unlock_key(section, **dict(store.items(section))) # includes defaults
_unlock_key(section, **dict(store.items(section))) # includes defaults


alive = True # used to set if the keyserver is alive in the shutdown

Expand Down Expand Up @@ -239,8 +239,8 @@ def main(args=None):

host = CONF.get_value('keyserver', 'host') # fallbacks are in defaults.ini
port = CONF.get_value('keyserver', 'port', conv=int)
health_check_url='http://{}:{}{}'.format(host, port, CONF.get_value('keyserver', 'health_endpoint'))
status_check_url='http://{}:{}{}'.format(host, port, CONF.get_value('keyserver', 'status_endpoint'))
health_check_url = 'http://{}:{}{}'.format(host, port, CONF.get_value('keyserver', 'health_endpoint'))
status_check_url = 'http://{}:{}{}'.format(host, port, CONF.get_value('keyserver', 'status_endpoint'))

eureka_endpoint = CONF.get_value('eureka', 'endpoint')

Expand Down
3 changes: 2 additions & 1 deletion lega/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ def main(args=None):
if not args:
args = sys.argv[1:]

CONF.setup(args) # re-conf
CONF.setup(args) # re-conf

broker = get_connection('broker')

# upstream link configured in local broker
consume(work, broker, 'stableIDs', None)


if __name__ == '__main__':
main()
28 changes: 16 additions & 12 deletions lega/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@
import os
import asyncio
import uvloop

from .conf import CONF
from .utils.amqp import get_connection, publish
from .utils.checksum import calculate


asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

host = '127.0.0.1'
port = 8888
delim = b'$'

from .conf import CONF
from .utils.amqp import get_connection, publish
from .utils.checksum import calculate, supported_algorithms

LOG = logging.getLogger(__name__)

class Forwarder(asyncio.Protocol):
Expand Down Expand Up @@ -53,9 +55,9 @@ def parse(self, data):
# We have 2 bars
pos1 = data.find(delim)
username = data[:pos1]
pos2 = data.find(delim,pos1+1)
pos2 = data.find(delim, pos1+1)
filename = data[pos1+1:pos2]
yield (username.decode(),filename.decode())
yield (username.decode(), filename.decode())
data = data[pos2+1:]

def data_received(self, data):
Expand All @@ -70,13 +72,15 @@ def data_received(self, data):

def send_message(self, username, filename):
inbox = self.inbox_location % username
filepath, filename = (os.path.join(inbox, filename.lstrip('/')), filename) if self.isolation \
else (filename, filename[len(inbox):]) # surely there is better!
filepath, filename = (filename, filename[len(inbox):])
if self.isolation:
filepath, filename = (os.path.join(inbox, filename.lstrip('/')), filename)

LOG.debug("Filepath %s", filepath)
msg = { 'user': username,
'filepath': filename,
'filesize': os.stat(filepath).st_size
}
msg = {'user': username,
'filepath': filename,
'filesize': os.stat(filepath).st_size
}
c = calculate(filepath, 'sha256')
if c:
msg['encrypted_integrity'] = {'algorithm': 'sha256', 'checksum': c}
Expand Down
3 changes: 1 addition & 2 deletions lega/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def get_file_content(f, mode='rb'):
try:
with open( f, mode) as h:
with open(f, mode) as h:
return h.read()
except OSError as e:
LOG.error(f'Error reading {f}: {e!r}')
Expand All @@ -18,4 +18,3 @@ def sanitize_user_id(user):
'''Returns username without host part of an ID on the form name@something'''

return user.split('@')[0]

26 changes: 13 additions & 13 deletions lega/utils/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def get_connection(domain, blocking=True):
CONF.get_value(domain, 'password', default='guest')
),
'connection_attempts': CONF.get_value(domain, 'connection_attempts', conv=int, default=10),
'retry_delay': CONF.get_value(domain,'retry_delay', conv=int, default=10), # seconds
'retry_delay': CONF.get_value(domain, 'retry_delay', conv=int, default=10), # seconds
}
heartbeat = CONF.get_value(domain, 'heartbeat', conv=int, default=0)
if heartbeat is not None: # can be 0
Expand All @@ -46,7 +46,7 @@ def get_connection(domain, blocking=True):
params['ssl_options'] = {
'ca_certs': CONF.get_value(domain, 'cacert'),
'certfile': CONF.get_value(domain, 'cert'),
'keyfile': CONF.get_value(domain, 'keyfile'),
'keyfile': CONF.get_value(domain, 'keyfile'),
'cert_reqs': 2, # ssl.CERT_REQUIRED is actually <VerifyMode.CERT_REQUIRED: 2>
}

Expand All @@ -62,12 +62,12 @@ def publish(message, channel, exchange, routing, correlation_id=None):
Sending a message to the local broker with ``path`` was updated
'''
LOG.debug(f'Sending {message} to exchange: {exchange} [routing key: {routing}]')
channel.basic_publish(exchange = exchange,
routing_key = routing,
body = json.dumps(message),
properties = pika.BasicProperties(correlation_id=correlation_id or str(uuid.uuid4()),
content_type='application/json',
delivery_mode=2))
channel.basic_publish(exchange=exchange,
routing_key=routing,
body=json.dumps(message),
properties=pika.BasicProperties(correlation_id=correlation_id or str(uuid.uuid4()),
content_type='application/json',
delivery_mode=2))


def consume(work, connection, from_queue, to_routing):
Expand All @@ -84,12 +84,12 @@ def consume(work, connection, from_queue, to_routing):
routing key.
'''

assert( from_queue )
assert(from_queue)

LOG.debug(f'Consuming message from {from_queue}')

from_channel = connection.channel()
from_channel.basic_qos(prefetch_count=1) # One job per worker
from_channel.basic_qos(prefetch_count=1) # One job per worker
to_channel = connection.channel()

def process_request(channel, method_frame, props, body):
Expand All @@ -98,12 +98,12 @@ def process_request(channel, method_frame, props, body):
LOG.debug(f'Consuming message {message_id} (Correlation ID: {correlation_id})')

# Process message in JSON format
answer = work( json.loads(body) ) # Exceptions should be already caught
answer = work(json.loads(body)) # Exceptions should be already caught

# Publish the answer
if answer:
assert( to_routing )
publish(answer, to_channel, 'lega', to_routing, correlation_id = props.correlation_id)
assert(to_routing)
publish(answer, to_channel, 'lega', to_routing, correlation_id=props.correlation_id)

# Acknowledgment: Cancel the message resend in case MQ crashes
LOG.debug(f'Sending ACK for message {message_id} (Correlation ID: {correlation_id})')
Expand Down
11 changes: 5 additions & 6 deletions lega/utils/checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def calculate(filepath, algo, bsize=8192):
'''
try:
m = instantiate(algo)
with open(filepath, 'rb') as f: # Open the file in binary mode. No encoding dance.
with open(filepath, 'rb') as f: # Open the file in binary mode. No encoding dance.
while True:
data = f.read(bsize)
if not data:
Expand All @@ -43,10 +43,10 @@ def calculate(filepath, algo, bsize=8192):
return None


def is_valid(filepath, digest, hashAlgo = 'md5'):
def is_valid(filepath, digest, hashAlgo='md5'):
'''Verify the integrity of a file against a hash value.'''

assert( isinstance(digest,str) )
assert(isinstance(digest, str))

res = calculate(filepath, hashAlgo)
LOG.debug('Calculated digest: '+res)
Expand All @@ -67,10 +67,9 @@ def get_from_companion(filepath):
try:
with open(companion, 'rt', encoding='utf-8') as f:
return f.read(), h
except OSError as e: # Not found, not readable, ...
except OSError as e: # Not found, not readable, ...
LOG.debug(f'Companion {companion}: {e!r}')
# Check the next

else: # no break statement was encountered
else: # no break statement was encountered
raise CompanionNotFound(filepath)

Loading

0 comments on commit b11231d

Please sign in to comment.