Skip to content

Commit

Permalink
Merge pull request #99 from EGA-archive/amqp-library
Browse files Browse the repository at this point in the history
Updating the amqp library from pika to amqpstorm to handle differently the long running tasks
  • Loading branch information
silverdaz committed May 21, 2020
2 parents 4c923ce + cfc3b5b commit 4b8ea04
Show file tree
Hide file tree
Showing 17 changed files with 384 additions and 199 deletions.
49 changes: 30 additions & 19 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
##########################
## Build env
## Build env, it works with debian buster 3.8
##########################
FROM python:3.8-alpine3.11 as BUILD
FROM python:slim as BUILD

RUN apk add git libressl-dev postgresql-dev gcc musl-dev libffi-dev make
ENV DEBIAN_FRONTEND noninteractive

RUN apt-get update && apt-get install -y --no-install-recommends \
gcc make bzip2 \
libpq5 libpq-dev libffi-dev libssl-dev libc-dev-bin libc-dev \
&& rm -rf /var/lib/apt/lists/*

# This will pin the package versions
COPY deploy/requirements.txt /root/LocalEGA/requirements.txt
Expand All @@ -18,33 +23,39 @@ RUN pip install /root/LocalEGA
##########################
## Final image
##########################
FROM python:3.8-alpine3.11
FROM python:slim

LABEL maintainer "EGA System Developers"
LABEL org.label-schema.schema-version="1.0"
LABEL org.label-schema.vcs-url="https://github.com/EGA-archive/LocalEGA"


RUN apk add --no-cache --update libressl postgresql-libs

ARG LEGA_UID=1000
ARG LEGA_GID=1000

RUN addgroup -g ${LEGA_GID} lega && \
adduser -D -G lega -u ${LEGA_UID} lega
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update && \
apt-get install -y --no-install-recommends \
libpq5 \
&& rm -rf /var/lib/apt/lists/*

COPY --from=BUILD usr/local/lib/python3.8/ usr/local/lib/python3.8/
COPY --from=BUILD /usr/local/bin/ega-* /usr/local/bin/

RUN mkdir -p /ega/archive && \
chgrp lega /ega/archive && \
chmod 2770 /ega/archive
VOLUME /ega/archive

RUN mkdir -p /etc/ega && \
chgrp lega /etc/ega && \
ARG LEGA_UID=1000
ARG LEGA_GID=1000
ARG LEGA_USERNAME=lega
ARG LEGA_GROUPNAME=lega

RUN ldconfig -v && \
groupadd -g ${LEGA_GID} -r ${LEGA_GROUPNAME} && \
useradd -M -g ${LEGA_GROUPNAME} -u ${LEGA_UID} ${LEGA_USERNAME} && \
mkdir -p /ega/archive && \
chgrp ${LEGA_USERNAME} /ega/archive && \
chmod 2770 /ega/archive && \
mkdir -p /etc/ega && \
chgrp ${LEGA_USERNAME} /etc/ega && \
chmod 2770 /etc/ega

VOLUME /ega/archive
VOLUME /etc/ega

USER lega
USER ${LEGA_USERNAME}

3 changes: 2 additions & 1 deletion deploy/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ preflight-check:
# the rabbitmq shovel to CentralEGA (the federated queue can be late, it doesn't hurt)

logs:
@docker logs -f logs
@docker-compose logs -f
# @docker-compose logs -f logs

####################################################
## Base Image
Expand Down
6 changes: 4 additions & 2 deletions deploy/bootstrap/lega-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#!/bin/sh
set -e

cp /etc/ega/ssl.key /etc/ega/ssl.key.lega
chmod 400 /etc/ega/ssl.key.lega
if ! test -e /etc/ega/ssl.key.lega; then
cp /etc/ega/ssl.key /etc/ega/ssl.key.lega
chmod 400 /etc/ega/ssl.key.lega
fi

exec $@
9 changes: 8 additions & 1 deletion deploy/bootstrap/run/lega/finalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ def main(conf, args):
with_docker_secrets = args['--secrets']

config = configparser.RawConfigParser()
config['DEFAULT'] = {'log':'debug'}

config['DEFAULT'] = {
'queue': 'stableIDs',
'exchange': 'lega',
# 'routing_key': 'backup1',
}


config['inbox'] = {
'location': r'/ega/inbox/%s/',
'chroot_sessions': True,
Expand Down
8 changes: 7 additions & 1 deletion deploy/bootstrap/run/lega/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ def main(conf, args):
with_docker_secrets = args['--secrets']

config = configparser.RawConfigParser()
config['DEFAULT'] = {'log':'debug'}

config['DEFAULT'] = {
'queue': 'files',
'exchange': 'lega',
'routing_key': 'archived',
}

config['inbox'] = {
'location': r'/ega/inbox/%s/',
'chroot_sessions': True,
Expand Down
45 changes: 24 additions & 21 deletions deploy/bootstrap/run/lega/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
'''

LEGA_LOG='LEGA_LOG=debug'
# LEGA_LOG='INGESTION_LOG=centralized'

def main(cega_conf, conf, args):
lega = {
'version': '3.7',
Expand Down Expand Up @@ -142,7 +145,7 @@ def main(cega_conf, conf, args):

'ingest': {
'environment': [
'LEGA_LOG=centralized',
LEGA_LOG,
] + ([
'S3_ACCESS_KEY='+conf.get('s3','access_key'),
'S3_SECRET_KEY='+conf.get('s3','secret_key'),
Expand Down Expand Up @@ -173,7 +176,7 @@ def main(cega_conf, conf, args):

'verify': {
'environment': [
'LEGA_LOG=centralized',
LEGA_LOG,
] + ([
'S3_ACCESS_KEY='+conf.get('s3','access_key'),
'S3_SECRET_KEY='+conf.get('s3','secret_key'),
Expand Down Expand Up @@ -201,7 +204,7 @@ def main(cega_conf, conf, args):

'finalize': {
'environment': [
'LEGA_LOG=centralized'
LEGA_LOG,
],
'hostname': f'finalize{HOSTNAME_DOMAIN}',
'image': 'egarchive/lega-base:latest',
Expand All @@ -222,23 +225,23 @@ def main(cega_conf, conf, args):
'command': ["ega-finalize"],
},

# Collect logs to a central location.
# Vector.dev, logstash, or a custom code can receive them
'logs': {
'hostname': f'logs{HOSTNAME_DOMAIN}',
'image': 'python:3.8-alpine3.11',
'container_name': f'logs{HOSTNAME_DOMAIN}',
'volumes': [
'../bootstrap/udplogs.py:/logserver.py',
],
'networks': [
'internal',
# 'private-db',
# 'private-vault',
# 'external',
],
'entrypoint': ['python', '/logserver.py']
}
# # Collect logs to a central location.
# # Vector.dev, logstash, or a custom code can receive them
# 'logs': {
# 'hostname': f'logs{HOSTNAME_DOMAIN}',
# 'image': 'python:3.8-alpine3.11',
# 'container_name': f'logs{HOSTNAME_DOMAIN}',
# 'volumes': [
# '../bootstrap/udplogs.py:/logserver.py',
# ],
# 'networks': [
# 'internal',
# # 'private-db',
# # 'private-vault',
# # 'external',
# ],
# 'entrypoint': ['python', '/logserver.py']
# }

}

Expand Down Expand Up @@ -311,7 +314,7 @@ def main(cega_conf, conf, args):
for s in ['ingest', 'verify', 'finalize']:
service = lega['services'][s]
volumes = service['volumes']
volumes.append('../../lega:/home/lega/.local/lib/python3.6/site-packages/lega')
volumes.append('../../lega:/home/lega/.local/lib/python3.8/site-packages/lega')
del service['command']
service['entrypoint'] = ["/bin/sleep", "1000000000000"]

Expand Down
4 changes: 3 additions & 1 deletion deploy/bootstrap/run/lega/verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def main(conf, args):

config = configparser.RawConfigParser()
config['DEFAULT'] = {
'log':'debug',
'queue': 'archived',
'exchange': 'lega',
'routing_key': 'completed',
'master_key': 'c4gh_file',
}

Expand Down
27 changes: 15 additions & 12 deletions deploy/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
AMQPStorm==2.7.2
bcrypt==3.1.7
boto3==1.11.7
botocore==1.14.7
cffi==1.13.2
cryptography==2.8
boto3==1.11.7
certifi==2020.4.5.1
cffi==1.14.0
chardet==3.0.4
crypt4gh==1.3
cryptography==2.9.2
docopt==0.6.2
docutils==0.15.2
jmespath==0.9.4
pika==1.1.0
psycopg2==2.8.4
pycparser==2.19
idna==2.9
pamqp==2.3.0
psycopg2-binary==2.8.5
pycparser==2.20
PyNaCl==1.3.0
python-dateutil==2.8.1
PyYAML==5.3
PyYAML==5.3.1
requests==2.23.0
s3transfer==0.3.1
six==1.14.0
urllib3==1.25.8
git+https://github.com/EGA-archive/crypt4gh.git
urllib3==1.25.9
crypt4gh==1.3
2 changes: 1 addition & 1 deletion lega/conf/loggers/debug.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ root:

loggers:
lega:
level: INFO
level: DEBUG
handlers: [debugFile,console]
propagate: True
qualname: lega
Expand Down
4 changes: 2 additions & 2 deletions lega/finalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ def work(data):
db.set_stable_id(file_id, stable_id) # That will flag the entry as 'Ready'

LOG.info("Stable ID %s mapped to %s", stable_id, file_id)
return (None, False)
# Not publishing any answer


def main():
"""Listen for incoming stable IDs."""
# upstream link configured in local broker
consume(work, 'stableIDs', None)
consume(work)


if __name__ == '__main__':
Expand Down
7 changes: 4 additions & 3 deletions lega/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from .conf import CONF
from .utils import db, exceptions, errors, sanitize_user_id, storage
from .utils.amqp import consume
from .utils.amqp import consume, publish

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -91,7 +91,8 @@ def work(fs, inbox_fs, data):
data['archive_path'] = target

LOG.debug("Reply message: %s", data)
return (data, False)
# Publish the answer
publish(data)


def main():
Expand All @@ -101,7 +102,7 @@ def main():
do_work = partial(work, fs('archive', 'lega'), partial(inbox_fs, 'inbox'))

# upstream link configured in local broker
consume(do_work, 'files', 'archived')
consume(do_work)


if __name__ == '__main__':
Expand Down
34 changes: 33 additions & 1 deletion lega/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,47 @@
Used internally.
"""

import logging
import os
import sys
import hashlib
import traceback
from functools import wraps

LOG = logging.getLogger(__name__)

def sanitize_user_id(user):
"""Return username without host part of an ID on the form name@something."""
return user.split('@')[0]


def redact_url(url):
"""Remove user:password from the URL."""
protocol = url[:url.index('://')+3]
remainder = url.split('@', 1)[-1]
# return f'{protocol}[redacted]@{remainder}'
return protocol + '[redacted]@' + remainder

def clean_message(data):

for key in ['staged_path', 'staged_name',
'target_size']:
try:
del data[key]
except KeyError as ke:
pass
# return data

def log_trace():
"""Locate the error."""
exc_type, _, exc_tb = sys.exc_info()
# traceback.print_tb(exc_tb)
g = traceback.walk_tb(exc_tb)
try:
#frame, lineno = next(g) # that should be the decorator
frame, lineno = next(g) # that should be where is happened
except StopIteration:
pass # In case the trace is too short

# fname = os.path.split(frame.f_code.co_filename)[1]
fname = frame.f_code.co_filename
LOG.error('Exception: %s in %s on line: %s', exc_type, fname, lineno, exc_info=True)

0 comments on commit 4b8ea04

Please sign in to comment.