Skip to content
This repository has been archived by the owner on Dec 16, 2019. It is now read-only.

Commit

Permalink
Merge 9f5cf95 into 5ad62e2
Browse files Browse the repository at this point in the history
  • Loading branch information
viklund committed Sep 13, 2018
2 parents 5ad62e2 + 9f5cf95 commit 8869a10
Show file tree
Hide file tree
Showing 28 changed files with 199 additions and 189 deletions.
11 changes: 2 additions & 9 deletions docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,6 @@ Ingestion Worker
:members:


..
***************
Quality Control
***************
.. automodule:: lega.qc
:members:

*********
Keyserver
*********
Expand All @@ -64,7 +56,8 @@ Utility Functions
lega.utils.storage
lega.utils.eureka
lega.utils.exceptions
lega.utils.logging
lega.utils.logging.LEGAHandler
lega.utils.logging.JSONFormatter



Expand Down
31 changes: 7 additions & 24 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
from unittest.mock import MagicMock

# Get the project root dir, which is the parent dir of this
#sys.path.insert(0, os.path.dirname(os.getcwd()))
sys.path.insert(0, os.path.abspath('..'))
#print('PYTHONPATH =', sys.path)

import lega
import lega # noqa: E402

# -- General configuration ------------------------------------------------

Expand Down Expand Up @@ -39,7 +37,7 @@ def __getattr__(cls, name):
'sphinx.ext.viewcode',
'sphinx.ext.githubpages',
'sphinx.ext.todo',
]
]

# Add any paths that contain templates here, relative to this directory.
templates_path = ['templates']
Expand Down Expand Up @@ -97,21 +95,11 @@ def __getattr__(cls, name):

# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
#html_theme = 'alabaster'
# html_theme_options = {
# 'fixed_sidebar': True,
# 'show_powered_by': False,
# #'badge_branch': 'dev',
# 'github_repo': 'https://github.com/NBISweden/LocalEGA',
# 'github_button': True,
# }

html_theme = 'sphinx_rtd_theme'
html_theme_options = {
'collapse_navigation': True,
'sticky_navigation': True,
#'navigation_depth': 4,
'display_version': True,
'prev_next_buttons_location': 'bottom',
}
Expand All @@ -127,13 +115,7 @@ def __getattr__(cls, name):
# This is required for the alabaster theme
# refs: http://alabaster.readthedocs.io/en/latest/installation.html#sidebars
html_sidebars = {
'**': [
#'about.html',
#'navigation.html',
#'relations.html', # needs 'show_related': True theme option to display
#'searchbox.html',
#'donate.html',
]
'**': []
}


Expand All @@ -142,9 +124,10 @@ def __getattr__(cls, name):
def setup(app):
app.add_stylesheet('custom.css')


# -- Other stuff ----------------------------------------------------------
htmlhelp_basename = 'LocalEGA'
latex_elements = {}
latex_documents = [ (master_doc, 'LocalEGA.tex', 'Local EGA', 'NBIS System Developers', 'manual') ]
man_pages = [ (master_doc, 'localega', 'Local EGA', [author], 1) ]
texinfo_documents = [ (master_doc, 'LocalEGA', 'Local EGA', author, 'LocalEGA', 'Local extension to the European Genomic Archive', 'Miscellaneous') ]
latex_documents = [(master_doc, 'LocalEGA.tex', 'Local EGA', 'NBIS System Developers', 'manual')]
man_pages = [(master_doc, 'localega', 'Local EGA', [author], 1)]
texinfo_documents = [(master_doc, 'LocalEGA', 'Local EGA', author, 'LocalEGA', 'Local extension to the European Genomic Archive', 'Miscellaneous')]
17 changes: 16 additions & 1 deletion docs/tests.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,22 @@ Unit Tests
Unit tests are minimal: Given a set of input values for a chosen
function, they execute the function and check if the output has the
expected values. Moreover, they capture triggered exceptions and
errors. Unit tests can be run using the ``tox`` commands.
errors. Additionally we also perform pep8 and pep257 style guide checks
using `flake8 <http://flake8.pycqa.org/en/latest/>`_ with the following
configuration:

.. code-block:: yaml
[flake8]
ignore = E226,E302
exclude =
docker,
extras,
.tox
max-line-length = 160
max-complexity = 10
Unit tests can be run using the ``tox`` commands.

.. code-block:: console
Expand Down
2 changes: 1 addition & 1 deletion lega/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__title__ = 'Local EGA'
__version__ = VERSION = '1.1'
__author__ = 'Frédéric Haziza'
#__license__ = 'Apache 2.0'
__license__ = 'Apache 2.0'
__copyright__ = 'Local EGA @ NBIS Sweden'

# Set default logging handler to avoid "No handler found" warnings.
Expand Down
7 changes: 2 additions & 5 deletions lega/conf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import sys
import os
import configparser
import logging
from logging.config import fileConfig, dictConfig
import lega.utils.logging
from pathlib import Path
import yaml
from hashlib import md5
Expand Down Expand Up @@ -138,9 +136,8 @@ def get_value(self, section, option, conv=str, default=None, raw=False):
``section`` and ``option`` are mandatory while ``conv``, ``default`` (fallback) and ``raw`` are optional.
"""
result = os.environ.get(f'{section.upper()}_{option.upper()}', None)
if result is not None: # it might be empty
if result is not None: # it might be empty
return self._convert(result, conv)
#if self.has_option(section, option):
return self._convert(self.get(section, option, fallback=default, raw=raw), conv)

def _convert(self, value, conv):
Expand All @@ -155,7 +152,7 @@ def _convert(self, value, conv):
else:
raise ValueError(f"Invalid truth value: {val}")
else:
return conv(value) # raise error in case we can't convert an empty value
return conv(value) # raise error in case we can't convert an empty value


CONF = Configuration()
Expand Down
8 changes: 4 additions & 4 deletions lega/conf/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ def main(args=None):
parser = argparse.ArgumentParser(description="Forward message between CentralEGA's broker and the local one",
allow_abbrev=False)
parser.add_argument('--conf', help='configuration file, in INI or YAML format')
parser.add_argument('--log', help='configuration file for the loggers')
parser.add_argument('--log', help='configuration file for the loggers')

parser.add_argument('--list', dest='list_content', action='store_true', help='Lists the content of the configuration file')
pargs = parser.parse_args(args)
CONF.setup( args )

CONF.setup(args)

print(repr(CONF))

Expand Down
15 changes: 8 additions & 7 deletions lega/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from legacryptor.crypt4gh import get_header

from .conf import CONF
from .utils import db, exceptions, checksum, sanitize_user_id, storage
from .utils import db, exceptions, sanitize_user_id, storage
from .utils.amqp import consume, publish, get_connection

LOG = logging.getLogger(__name__)
Expand All @@ -49,7 +49,7 @@ def work(fs, channel, data):

# Insert in database
file_id = db.insert_file(filepath, user_id)
data['file_id'] = file_id # must be there: database error uses it
data['file_id'] = file_id # must be there: database error uses it

# Find inbox
inbox = Path(CONF.get_value('inbox', 'location', raw=True) % user_id)
Expand All @@ -59,7 +59,7 @@ def work(fs, channel, data):
inbox_filepath = inbox / filepath.lstrip('/')
LOG.info(f"Inbox file path: {inbox_filepath}")
if not inbox_filepath.exists():
raise exceptions.NotFoundInInbox(filepath) # return early
raise exceptions.NotFoundInInbox(filepath) # return early

# Ok, we have the file in the inbox

Expand All @@ -71,7 +71,7 @@ def work(fs, channel, data):
LOG.debug(f'Sending message to CentralEGA: {data}')
publish(org_msg, channel, 'cega', 'files.processing')
org_msg.pop('status', None)

# Strip the header out and copy the rest of the file to the vault
LOG.debug(f'Opening {inbox_filepath}')
with open(inbox_filepath, 'rb') as infile:
Expand All @@ -80,11 +80,11 @@ def work(fs, channel, data):

header_hex = (beginning+header).hex()
data['header'] = header_hex
db.store_header(file_id, header_hex) # header bytes will be .hex()
db.store_header(file_id, header_hex) # header bytes will be .hex()

target = fs.location(file_id)
LOG.info(f'[{fs.__class__.__name__}] Moving the rest of {filepath} to {target}')
target_size = fs.copy(infile, target) # It will copy the rest only
target_size = fs.copy(infile, target) # It will copy the rest only

LOG.info(f'Vault copying completed. Updating database')
db.set_archived(file_id, target, target_size)
Expand All @@ -97,7 +97,7 @@ def main(args=None):
if not args:
args = sys.argv[1:]

CONF.setup(args) # re-conf
CONF.setup(args) # re-conf

fs = getattr(storage, CONF.get_value('vault', 'driver', default='FileStorage'))
broker = get_connection('broker')
Expand All @@ -106,5 +106,6 @@ def main(args=None):
# upstream link configured in local broker
consume(do_work, broker, 'files', 'archived')


if __name__ == '__main__':
main()
22 changes: 11 additions & 11 deletions lega/keyserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,11 @@ def _check_limit(self):

def clear(self):
"""Clear all cache."""
#self.store = dict()
self.store.clear()


_cache = None # key IDs are uppercase
_active = None # will be a KeyID (not a key name)
_cache = None # key IDs are uppercase
_active = None # will be a KeyID (not a key name)

####################################
# Caching the keys
Expand Down Expand Up @@ -127,13 +126,13 @@ async def retrieve_active_key(request):
key_type = request.match_info['key_type'].lower()
LOG.debug(f'Requesting active ({key_type}) key')
if key_type not in ('public', 'private'):
return web.HTTPForbidden() # web.HTTPBadRequest()
return web.HTTPForbidden() # web.HTTPBadRequest()
key_format = 'armored' if request.content_type == 'text/plain' else None
if _active is None:
return web.HTTPNotFound()
k = _cache.get(_active, key_type, key_format=key_format)
if k:
return web.Response(body=k) # web.Response(text=k.hex())
return web.Response(body=k) # web.Response(text=k.hex())
else:
LOG.warn(f"Requested active ({key_type}) key not found.")
return web.HTTPNotFound()
Expand All @@ -144,13 +143,13 @@ async def retrieve_key(request):
requested_id = request.match_info['requested_id']
key_type = request.match_info['key_type'].lower()
if key_type not in ('public', 'private'):
return web.HTTPForbidden() # web.HTTPBadRequest()
return web.HTTPForbidden() # web.HTTPBadRequest()
key_id = requested_id[-16:].upper()
key_format = 'armored' if request.content_type == 'text/plain' else None
LOG.debug(f'Requested {key_type.upper()} key with ID {requested_id}')
k = _cache.get(key_id, key_type, key_format=key_format)
if k:
return web.Response(body=k) # web.Response(text=value.hex())
return web.Response(body=k) # web.Response(text=value.hex())
else:
LOG.warn(f"Requested key {requested_id} not found.")
return web.HTTPNotFound()
Expand Down Expand Up @@ -181,7 +180,7 @@ async def healthcheck(request):
@routes.get('/admin/ttl')
async def check_ttl(request):
"""Evict from the cache if TTL expired
and return the keys that survived""" # ehh...why? /Fred
and return the keys that survived""" # ehh...why? /Fred
LOG.debug('Admin TTL')
expire = _cache.check_ttl()
if expire:
Expand All @@ -196,7 +195,8 @@ def load_keys_conf(store):
_cache = Cache()
# Load all the keys in the store
for section in store.sections():
_unlock_key(section, **dict(store.items(section))) # includes defaults
_unlock_key(section, **dict(store.items(section))) # includes defaults


alive = True # used to set if the keyserver is alive in the shutdown

Expand Down Expand Up @@ -239,8 +239,8 @@ def main(args=None):

host = CONF.get_value('keyserver', 'host') # fallbacks are in defaults.ini
port = CONF.get_value('keyserver', 'port', conv=int)
health_check_url='http://{}:{}{}'.format(host, port, CONF.get_value('keyserver', 'health_endpoint'))
status_check_url='http://{}:{}{}'.format(host, port, CONF.get_value('keyserver', 'status_endpoint'))
health_check_url = 'http://{}:{}{}'.format(host, port, CONF.get_value('keyserver', 'health_endpoint'))
status_check_url = 'http://{}:{}{}'.format(host, port, CONF.get_value('keyserver', 'status_endpoint'))

eureka_endpoint = CONF.get_value('eureka', 'endpoint')

Expand Down
3 changes: 2 additions & 1 deletion lega/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ def main(args=None):
if not args:
args = sys.argv[1:]

CONF.setup(args) # re-conf
CONF.setup(args) # re-conf

broker = get_connection('broker')

# upstream link configured in local broker
consume(work, broker, 'stableIDs', None)


if __name__ == '__main__':
main()
28 changes: 16 additions & 12 deletions lega/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@
import os
import asyncio
import uvloop

from .conf import CONF
from .utils.amqp import get_connection, publish
from .utils.checksum import calculate


asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

host = '127.0.0.1'
port = 8888
delim = b'$'

from .conf import CONF
from .utils.amqp import get_connection, publish
from .utils.checksum import calculate, supported_algorithms

LOG = logging.getLogger(__name__)

class Forwarder(asyncio.Protocol):
Expand Down Expand Up @@ -53,9 +55,9 @@ def parse(self, data):
# We have 2 bars
pos1 = data.find(delim)
username = data[:pos1]
pos2 = data.find(delim,pos1+1)
pos2 = data.find(delim, pos1+1)
filename = data[pos1+1:pos2]
yield (username.decode(),filename.decode())
yield (username.decode(), filename.decode())
data = data[pos2+1:]

def data_received(self, data):
Expand All @@ -70,13 +72,15 @@ def data_received(self, data):

def send_message(self, username, filename):
inbox = self.inbox_location % username
filepath, filename = (os.path.join(inbox, filename.lstrip('/')), filename) if self.isolation \
else (filename, filename[len(inbox):]) # surely there is better!
filepath, filename = (filename, filename[len(inbox):])
if self.isolation:
filepath, filename = (os.path.join(inbox, filename.lstrip('/')), filename)

LOG.debug("Filepath %s", filepath)
msg = { 'user': username,
'filepath': filename,
'filesize': os.stat(filepath).st_size
}
msg = {'user': username,
'filepath': filename,
'filesize': os.stat(filepath).st_size
}
c = calculate(filepath, 'sha256')
if c:
msg['encrypted_integrity'] = {'algorithm': 'sha256', 'checksum': c}
Expand Down
Loading

0 comments on commit 8869a10

Please sign in to comment.