Skip to content
This repository has been archived by the owner on Dec 16, 2019. It is now read-only.

Make everything validate with flake8 #350

Merged
merged 25 commits into from
Sep 13, 2018
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4d34b74
Flake8 correction of ingest.py
viklund Sep 12, 2018
70385c7
Flake8 correction of keyserver.py
viklund Sep 12, 2018
5b3b844
Flake8 correction of mapper.py
viklund Sep 12, 2018
15af4dd
Flake8 correction of notifications.py
viklund Sep 12, 2018
83a4bba
Flake8 correction of verify.py
viklund Sep 12, 2018
f0c6e55
Flake8 correction of __init__.py
viklund Sep 12, 2018
be7d8cf
Flake8 correction of conf/__init__.py
viklund Sep 12, 2018
539da23
Flake8 correction of conf/__main__.py
viklund Sep 12, 2018
ad0895f
Flake8 correction of utils/__init__.py
viklund Sep 12, 2018
bf9e137
Flake8 correction of utils/amqp.py
viklund Sep 12, 2018
79dc733
Flake8 correction of utils/checksum.py
viklund Sep 12, 2018
ce80de3
Flake8 correction of utils/eureka.py
viklund Sep 12, 2018
3de80b7
Flake8 correction of utils/exceptions.py
viklund Sep 12, 2018
bb3eaf7
Flake8 correction of utils/logging.py
viklund Sep 12, 2018
c0bbfdb
Flake8 correction of utils/storage.py
viklund Sep 12, 2018
7d35c7b
Flake8 correction of utils/db.py
viklund Sep 12, 2018
074aed5
Updated tests to work with flake8-ified verify.py
viklund Sep 12, 2018
4d5c5e9
Flake8 correction of tests/ subdirectory
viklund Sep 12, 2018
8c1c79e
Updated tox configuration to run flake8
viklund Sep 12, 2018
ccf39ea
Run flake8 in travis
viklund Sep 12, 2018
ea41402
Streamlined flake8 config for tox
viklund Sep 12, 2018
386a571
Flake8 correction of setup.py
viklund Sep 12, 2018
04b7412
Flake8 correction of docs/setup.py
viklund Sep 12, 2018
152dcda
small update to docs for testing.
blankdots Sep 12, 2018
9f5cf95
remove lega.qc from plus logging change
blankdots Sep 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lega/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__title__ = 'Local EGA'
__version__ = VERSION = '1.1'
__author__ = 'Frédéric Haziza'
#__license__ = 'Apache 2.0'
__license__ = 'Apache 2.0'
__copyright__ = 'Local EGA @ NBIS Sweden'

# Set default logging handler to avoid "No handler found" warnings.
Expand Down
7 changes: 2 additions & 5 deletions lega/conf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import sys
import os
import configparser
import logging
from logging.config import fileConfig, dictConfig
import lega.utils.logging
from pathlib import Path
import yaml
from hashlib import md5
Expand Down Expand Up @@ -138,9 +136,8 @@ def get_value(self, section, option, conv=str, default=None, raw=False):
``section`` and ``option`` are mandatory while ``conv``, ``default`` (fallback) and ``raw`` are optional.
"""
result = os.environ.get(f'{section.upper()}_{option.upper()}', None)
if result is not None: # it might be empty
if result is not None: # it might be empty
return self._convert(result, conv)
#if self.has_option(section, option):
return self._convert(self.get(section, option, fallback=default, raw=raw), conv)

def _convert(self, value, conv):
Expand All @@ -155,7 +152,7 @@ def _convert(self, value, conv):
else:
raise ValueError(f"Invalid truth value: {val}")
else:
return conv(value) # raise error in case we can't convert an empty value
return conv(value) # raise error in case we can't convert an empty value


CONF = Configuration()
Expand Down
8 changes: 4 additions & 4 deletions lega/conf/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ def main(args=None):
parser = argparse.ArgumentParser(description="Forward message between CentralEGA's broker and the local one",
allow_abbrev=False)
parser.add_argument('--conf', help='configuration file, in INI or YAML format')
parser.add_argument('--log', help='configuration file for the loggers')
parser.add_argument('--log', help='configuration file for the loggers')

parser.add_argument('--list', dest='list_content', action='store_true', help='Lists the content of the configuration file')
pargs = parser.parse_args(args)
CONF.setup( args )

CONF.setup(args)

print(repr(CONF))

Expand Down
15 changes: 8 additions & 7 deletions lega/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from legacryptor.crypt4gh import get_header

from .conf import CONF
from .utils import db, exceptions, checksum, sanitize_user_id, storage
from .utils import db, exceptions, sanitize_user_id, storage
from .utils.amqp import consume, publish, get_connection

LOG = logging.getLogger(__name__)
Expand All @@ -49,7 +49,7 @@ def work(fs, channel, data):

# Insert in database
file_id = db.insert_file(filepath, user_id)
data['file_id'] = file_id # must be there: database error uses it
data['file_id'] = file_id # must be there: database error uses it

# Find inbox
inbox = Path(CONF.get_value('inbox', 'location', raw=True) % user_id)
Expand All @@ -59,7 +59,7 @@ def work(fs, channel, data):
inbox_filepath = inbox / filepath.lstrip('/')
LOG.info(f"Inbox file path: {inbox_filepath}")
if not inbox_filepath.exists():
raise exceptions.NotFoundInInbox(filepath) # return early
raise exceptions.NotFoundInInbox(filepath) # return early

# Ok, we have the file in the inbox

Expand All @@ -71,7 +71,7 @@ def work(fs, channel, data):
LOG.debug(f'Sending message to CentralEGA: {data}')
publish(org_msg, channel, 'cega', 'files.processing')
org_msg.pop('status', None)

# Strip the header out and copy the rest of the file to the vault
LOG.debug(f'Opening {inbox_filepath}')
with open(inbox_filepath, 'rb') as infile:
Expand All @@ -80,11 +80,11 @@ def work(fs, channel, data):

header_hex = (beginning+header).hex()
data['header'] = header_hex
db.store_header(file_id, header_hex) # header bytes will be .hex()
db.store_header(file_id, header_hex) # header bytes will be .hex()

target = fs.location(file_id)
LOG.info(f'[{fs.__class__.__name__}] Moving the rest of {filepath} to {target}')
target_size = fs.copy(infile, target) # It will copy the rest only
target_size = fs.copy(infile, target) # It will copy the rest only

LOG.info(f'Vault copying completed. Updating database')
db.set_archived(file_id, target, target_size)
Expand All @@ -97,7 +97,7 @@ def main(args=None):
if not args:
args = sys.argv[1:]

CONF.setup(args) # re-conf
CONF.setup(args) # re-conf

fs = getattr(storage, CONF.get_value('vault', 'driver', default='FileStorage'))
broker = get_connection('broker')
Expand All @@ -106,5 +106,6 @@ def main(args=None):
# upstream link configured in local broker
consume(do_work, broker, 'files', 'archived')


if __name__ == '__main__':
main()
22 changes: 11 additions & 11 deletions lega/keyserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,11 @@ def _check_limit(self):

def clear(self):
"""Clear all cache."""
#self.store = dict()
self.store.clear()


_cache = None # key IDs are uppercase
_active = None # will be a KeyID (not a key name)
_cache = None # key IDs are uppercase
_active = None # will be a KeyID (not a key name)

####################################
# Caching the keys
Expand Down Expand Up @@ -127,13 +126,13 @@ async def retrieve_active_key(request):
key_type = request.match_info['key_type'].lower()
LOG.debug(f'Requesting active ({key_type}) key')
if key_type not in ('public', 'private'):
return web.HTTPForbidden() # web.HTTPBadRequest()
return web.HTTPForbidden() # web.HTTPBadRequest()
key_format = 'armored' if request.content_type == 'text/plain' else None
if _active is None:
return web.HTTPNotFound()
k = _cache.get(_active, key_type, key_format=key_format)
if k:
return web.Response(body=k) # web.Response(text=k.hex())
return web.Response(body=k) # web.Response(text=k.hex())
else:
LOG.warn(f"Requested active ({key_type}) key not found.")
return web.HTTPNotFound()
Expand All @@ -144,13 +143,13 @@ async def retrieve_key(request):
requested_id = request.match_info['requested_id']
key_type = request.match_info['key_type'].lower()
if key_type not in ('public', 'private'):
return web.HTTPForbidden() # web.HTTPBadRequest()
return web.HTTPForbidden() # web.HTTPBadRequest()
key_id = requested_id[-16:].upper()
key_format = 'armored' if request.content_type == 'text/plain' else None
LOG.debug(f'Requested {key_type.upper()} key with ID {requested_id}')
k = _cache.get(key_id, key_type, key_format=key_format)
if k:
return web.Response(body=k) # web.Response(text=value.hex())
return web.Response(body=k) # web.Response(text=value.hex())
else:
LOG.warn(f"Requested key {requested_id} not found.")
return web.HTTPNotFound()
Expand Down Expand Up @@ -181,7 +180,7 @@ async def healthcheck(request):
@routes.get('/admin/ttl')
async def check_ttl(request):
"""Evict from the cache if TTL expired
and return the keys that survived""" # ehh...why? /Fred
and return the keys that survived""" # ehh...why? /Fred
LOG.debug('Admin TTL')
expire = _cache.check_ttl()
if expire:
Expand All @@ -196,7 +195,8 @@ def load_keys_conf(store):
_cache = Cache()
# Load all the keys in the store
for section in store.sections():
_unlock_key(section, **dict(store.items(section))) # includes defaults
_unlock_key(section, **dict(store.items(section))) # includes defaults


alive = True # used to set if the keyserver is alive in the shutdown

Expand Down Expand Up @@ -239,8 +239,8 @@ def main(args=None):

host = CONF.get_value('keyserver', 'host') # fallbacks are in defaults.ini
port = CONF.get_value('keyserver', 'port', conv=int)
health_check_url='http://{}:{}{}'.format(host, port, CONF.get_value('keyserver', 'health_endpoint'))
status_check_url='http://{}:{}{}'.format(host, port, CONF.get_value('keyserver', 'status_endpoint'))
health_check_url = 'http://{}:{}{}'.format(host, port, CONF.get_value('keyserver', 'health_endpoint'))
status_check_url = 'http://{}:{}{}'.format(host, port, CONF.get_value('keyserver', 'status_endpoint'))

eureka_endpoint = CONF.get_value('eureka', 'endpoint')

Expand Down
3 changes: 2 additions & 1 deletion lega/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ def main(args=None):
if not args:
args = sys.argv[1:]

CONF.setup(args) # re-conf
CONF.setup(args) # re-conf

broker = get_connection('broker')

# upstream link configured in local broker
consume(work, broker, 'stableIDs', None)


if __name__ == '__main__':
main()
28 changes: 16 additions & 12 deletions lega/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@
import os
import asyncio
import uvloop

from .conf import CONF
from .utils.amqp import get_connection, publish
from .utils.checksum import calculate


asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

host = '127.0.0.1'
port = 8888
delim = b'$'

from .conf import CONF
from .utils.amqp import get_connection, publish
from .utils.checksum import calculate, supported_algorithms

LOG = logging.getLogger(__name__)

class Forwarder(asyncio.Protocol):
Expand Down Expand Up @@ -53,9 +55,9 @@ def parse(self, data):
# We have 2 bars
pos1 = data.find(delim)
username = data[:pos1]
pos2 = data.find(delim,pos1+1)
pos2 = data.find(delim, pos1+1)
filename = data[pos1+1:pos2]
yield (username.decode(),filename.decode())
yield (username.decode(), filename.decode())
data = data[pos2+1:]

def data_received(self, data):
Expand All @@ -70,13 +72,15 @@ def data_received(self, data):

def send_message(self, username, filename):
inbox = self.inbox_location % username
filepath, filename = (os.path.join(inbox, filename.lstrip('/')), filename) if self.isolation \
else (filename, filename[len(inbox):]) # surely there is better!
filepath, filename = (filename, filename[len(inbox):])
if self.isolation:
filepath, filename = (os.path.join(inbox, filename.lstrip('/')), filename)

LOG.debug("Filepath %s", filepath)
msg = { 'user': username,
'filepath': filename,
'filesize': os.stat(filepath).st_size
}
msg = {'user': username,
'filepath': filename,
'filesize': os.stat(filepath).st_size
}
c = calculate(filepath, 'sha256')
if c:
msg['encrypted_integrity'] = {'algorithm': 'sha256', 'checksum': c}
Expand Down
3 changes: 1 addition & 2 deletions lega/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def get_file_content(f, mode='rb'):
try:
with open( f, mode) as h:
with open(f, mode) as h:
return h.read()
except OSError as e:
LOG.error(f'Error reading {f}: {e!r}')
Expand All @@ -18,4 +18,3 @@ def sanitize_user_id(user):
'''Returns username without host part of an ID on the form name@something'''

return user.split('@')[0]

26 changes: 13 additions & 13 deletions lega/utils/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def get_connection(domain, blocking=True):
CONF.get_value(domain, 'password', default='guest')
),
'connection_attempts': CONF.get_value(domain, 'connection_attempts', conv=int, default=10),
'retry_delay': CONF.get_value(domain,'retry_delay', conv=int, default=10), # seconds
'retry_delay': CONF.get_value(domain, 'retry_delay', conv=int, default=10), # seconds
}
heartbeat = CONF.get_value(domain, 'heartbeat', conv=int, default=0)
if heartbeat is not None: # can be 0
Expand All @@ -46,7 +46,7 @@ def get_connection(domain, blocking=True):
params['ssl_options'] = {
'ca_certs': CONF.get_value(domain, 'cacert'),
'certfile': CONF.get_value(domain, 'cert'),
'keyfile': CONF.get_value(domain, 'keyfile'),
'keyfile': CONF.get_value(domain, 'keyfile'),
'cert_reqs': 2, # ssl.CERT_REQUIRED is actually <VerifyMode.CERT_REQUIRED: 2>
}

Expand All @@ -62,12 +62,12 @@ def publish(message, channel, exchange, routing, correlation_id=None):
Sending a message to the local broker with ``path`` was updated
'''
LOG.debug(f'Sending {message} to exchange: {exchange} [routing key: {routing}]')
channel.basic_publish(exchange = exchange,
routing_key = routing,
body = json.dumps(message),
properties = pika.BasicProperties(correlation_id=correlation_id or str(uuid.uuid4()),
content_type='application/json',
delivery_mode=2))
channel.basic_publish(exchange=exchange,
routing_key=routing,
body=json.dumps(message),
properties=pika.BasicProperties(correlation_id=correlation_id or str(uuid.uuid4()),
content_type='application/json',
delivery_mode=2))


def consume(work, connection, from_queue, to_routing):
Expand All @@ -84,12 +84,12 @@ def consume(work, connection, from_queue, to_routing):
routing key.
'''

assert( from_queue )
assert(from_queue)

LOG.debug(f'Consuming message from {from_queue}')

from_channel = connection.channel()
from_channel.basic_qos(prefetch_count=1) # One job per worker
from_channel.basic_qos(prefetch_count=1) # One job per worker
to_channel = connection.channel()

def process_request(channel, method_frame, props, body):
Expand All @@ -98,12 +98,12 @@ def process_request(channel, method_frame, props, body):
LOG.debug(f'Consuming message {message_id} (Correlation ID: {correlation_id})')

# Process message in JSON format
answer = work( json.loads(body) ) # Exceptions should be already caught
answer = work(json.loads(body)) # Exceptions should be already caught

# Publish the answer
if answer:
assert( to_routing )
publish(answer, to_channel, 'lega', to_routing, correlation_id = props.correlation_id)
assert(to_routing)
publish(answer, to_channel, 'lega', to_routing, correlation_id=props.correlation_id)

# Acknowledgment: Cancel the message resend in case MQ crashes
LOG.debug(f'Sending ACK for message {message_id} (Correlation ID: {correlation_id})')
Expand Down
11 changes: 5 additions & 6 deletions lega/utils/checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def calculate(filepath, algo, bsize=8192):
'''
try:
m = instantiate(algo)
with open(filepath, 'rb') as f: # Open the file in binary mode. No encoding dance.
with open(filepath, 'rb') as f: # Open the file in binary mode. No encoding dance.
while True:
data = f.read(bsize)
if not data:
Expand All @@ -43,10 +43,10 @@ def calculate(filepath, algo, bsize=8192):
return None


def is_valid(filepath, digest, hashAlgo = 'md5'):
def is_valid(filepath, digest, hashAlgo='md5'):
'''Verify the integrity of a file against a hash value.'''

assert( isinstance(digest,str) )
assert(isinstance(digest, str))

res = calculate(filepath, hashAlgo)
LOG.debug('Calculated digest: '+res)
Expand All @@ -67,10 +67,9 @@ def get_from_companion(filepath):
try:
with open(companion, 'rt', encoding='utf-8') as f:
return f.read(), h
except OSError as e: # Not found, not readable, ...
except OSError as e: # Not found, not readable, ...
LOG.debug(f'Companion {companion}: {e!r}')
# Check the next

else: # no break statement was encountered
else: # no break statement was encountered
raise CompanionNotFound(filepath)

Loading