Skip to content

Commit

Permalink
Merge pull request #22 from EGA-archive/inbox-s3
Browse files Browse the repository at this point in the history
Using S3 as inbox backend storage
  • Loading branch information
dtitov committed Feb 8, 2019
2 parents 9eae1de + e07b0c6 commit efdc4b7
Show file tree
Hide file tree
Showing 15 changed files with 258 additions and 137 deletions.
7 changes: 4 additions & 3 deletions deploy/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,23 @@ help:
@echo "where <target> is: 'bootstrap', 'up' 'ps', 'down', 'network' or 'clean'\n"

# If DEPLOY_DEV is yes, we use dummy passwords
bootstrap-dev: DEV="DEPLOY_DEV=yes "
bootstrap-dev: DEPLOY_DEV=yes
.env private/lega.yml private bootstrap bootstrap-dev:
@${DEV}bootstrap/run.sh ${ARGS} || { cat private/.err; exit 1; }
@bootstrap/run.sh ${ARGS} || { cat private/.err; exit 1; }


up: .env private/lega.yml
@docker-compose up -d ${OMIT}

clean-volumes:
docker volume rm lega_db lega_inbox lega_s3
-docker volume rm lega_inbox-s3

ps:
@docker-compose ps

down: #.env
@[[ -f private/lega.yml ]] && docker-compose down -v || echo "No recipe to bring containers down\nHave you bootstrapped? (ie make bootstrap)"
@[[ -f private/lega.yml ]] && docker-compose down -v || echo -e "No recipe to bring containers down\nHave you bootstrapped? (ie make bootstrap)"

clean:
rm -rf .env private
Expand Down
107 changes: 86 additions & 21 deletions deploy/bootstrap/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ VERBOSE=no
FORCE=yes
OPENSSL=openssl
INBOX=openssh
INBOX_BACKEND=posix
KEYSERVER=lega
REAL_CEGA=no

Expand All @@ -25,6 +26,7 @@ function usage {
echo -e "\t--openssl <value> \tPath to the Openssl executable [Default: ${OPENSSL}]"
echo -e "\t--inbox <value> \tSelect inbox \"openssh\" or \"mina\" [Default: ${INBOX}]"
echo -e "\t--keyserver <value> \tSelect keyserver \"lega\" or \"ega\" [Default: ${KEYSERVER}]"
echo -e "\t--inbox-backend <value> \tSelect the inbox backend: S3 or POSIX [Default: ${INBOX_BACKEND}]"
echo -e "\t--genkey <value> \tPath to PGP key generator [Default: ${GEN_KEY}]"
echo -e "\t--pythonexec <value> \tPython execute command [Default: ${PYTHONEXEC}]"
echo -e "\t--with-real-cega \tUse the real Central EGA Message broker and Authentication Service"
Expand All @@ -44,8 +46,9 @@ while [[ $# -gt 0 ]]; do
--verbose|-v) VERBOSE=yes;;
--polite|-p) FORCE=no;;
--openssl) OPENSSL=$2; shift;;
--inbox) INBOX=$2; shift;;
--keyserver) KEYSERVER=$2; shift;;
--inbox) INBOX=${2,,}; shift;;
--inbox-backend) INBOX_BACKEND=${2,,}; shift;;
--keyserver) KEYSERVER=${2,,}; shift;;
--genkey) GEN_KEY=$2; shift;;
--pythonexec) PYTHONEXEC=$2; shift;;
--with-real-cega) REAL_CEGA=yes;;
Expand Down Expand Up @@ -174,18 +177,8 @@ keyserver_endpoint = https://keys:8443/retrieve/%s/private
keyserver_endpoint = https://keys:8443/retrieve/%s/private
EOF
fi
cat >> ${PRIVATE}/conf.ini <<EOF
[inbox]
location = /ega/inbox/%s
chroot_sessions = True

[vault]
driver = S3Storage
url = http://vault:9000
access_key = ${S3_ACCESS_KEY}
secret_key = ${S3_SECRET_KEY}
#region = lega
cat >> ${PRIVATE}/conf.ini <<EOF
## Connecting to Local EGA
[broker]
Expand All @@ -202,8 +195,34 @@ password = ${DB_LEGA_IN_PASSWORD}
database = lega
try = 30
sslmode = require
[vault]
driver = S3Storage
url = http://vault:9000
access_key = ${S3_ACCESS_KEY}
secret_key = ${S3_SECRET_KEY}
#region = lega
EOF

if [[ ${INBOX_BACKEND} == 's3' ]]; then
cat >> ${PRIVATE}/conf.ini <<EOF
[inbox]
driver = S3Storage
url = http://inbox-s3-backend:9000
access_key = ${S3_ACCESS_KEY_INBOX}
secret_key = ${S3_SECRET_KEY_INBOX}
#region = lega
EOF
else
# Default: POSIX file system
cat >> ${PRIVATE}/conf.ini <<EOF
[inbox]
location = /ega/inbox/%s/
chroot_sessions = True
EOF
fi

#########################################################################
# Specifying the LocalEGA components in the docke-compose file
#########################################################################
Expand All @@ -221,6 +240,15 @@ volumes:
db:
inbox:
vault:
EOF

if [[ ${INBOX_BACKEND} == 's3' ]]; then
cat >> ${PRIVATE}/lega.yml <<EOF
inbox-s3:
EOF
fi

cat >> ${PRIVATE}/lega.yml <<EOF
services:
Expand Down Expand Up @@ -294,6 +322,10 @@ cat >> ${PRIVATE}/lega.yml <<EOF
environment:
- CEGA_ENDPOINT=${CEGA_USERS_ENDPOINT%/}/%s?idType=username
- CEGA_ENDPOINT_CREDS=${CEGA_USERS_CREDS}
- S3_ACCESS_KEY=${S3_ACCESS_KEY_INBOX}
- S3_SECRET_KEY=${S3_SECRET_KEY_INBOX}
- S3_ENDPOINT=inbox-s3-backend:9000
- USE_SSL=false
ports:
- "${DOCKER_PORT_inbox}:2222"
image: nbisweden/ega-mina-inbox
Expand All @@ -312,6 +344,7 @@ cat >> ${PRIVATE}/lega.yml <<EOF # SFTP inbox
volumes:
- ./conf.ini:/etc/ega/conf.ini:ro
- ../images/inbox/entrypoint.sh:/usr/local/bin/entrypoint.sh
- ../../lega:/home/lega/.local/lib/python3.6/site-packages/lega
- inbox:/ega/inbox
EOF
fi
Expand All @@ -327,8 +360,8 @@ cat >> ${PRIVATE}/lega.yml <<EOF
labels:
lega_label: "finalize"
volumes:
- ./conf.ini:/etc/ega/conf.ini:ro
- ./../../lega:/home/lega/.local/lib/python3.6/site-packages/lega
- ./conf.ini:/etc/ega/conf.ini:ro
- ../../lega:/home/lega/.local/lib/python3.6/site-packages/lega
restart: on-failure:3
networks:
- lega
Expand All @@ -349,9 +382,9 @@ cat >> ${PRIVATE}/lega.yml <<EOF
- AWS_ACCESS_KEY_ID=${S3_ACCESS_KEY}
- AWS_SECRET_ACCESS_KEY=${S3_SECRET_KEY}
volumes:
- inbox:/ega/inbox
- ./conf.ini:/etc/ega/conf.ini:ro
- ./../../lega:/home/lega/.local/lib/python3.6/site-packages/lega
- inbox:/ega/inbox
- ./conf.ini:/etc/ega/conf.ini:ro
- ../../lega:/home/lega/.local/lib/python3.6/site-packages/lega
restart: on-failure:3
networks:
- lega
Expand Down Expand Up @@ -405,7 +438,7 @@ cat >> ${PRIVATE}/lega.yml <<EOF
- ./certs/ssl.key:/etc/ega/ssl.key:ro
- ./pgp/ega.sec:/etc/ega/pgp/ega.sec:ro
- ./pgp/ega2.sec:/etc/ega/pgp/ega2.sec:ro
- ./../../lega:/home/lega/.local/lib/python3.6/site-packages/lega
- ../../lega:/home/lega/.local/lib/python3.6/site-packages/lega
restart: on-failure:3
networks:
- lega
Expand All @@ -432,8 +465,8 @@ cat >> ${PRIVATE}/lega.yml <<EOF
- AWS_ACCESS_KEY_ID=${S3_ACCESS_KEY}
- AWS_SECRET_ACCESS_KEY=${S3_SECRET_KEY}
volumes:
- ./conf.ini:/etc/ega/conf.ini:ro
- ./../../lega:/home/lega/.local/lib/python3.6/site-packages/lega
- ./conf.ini:/etc/ega/conf.ini:ro
- ../../lega:/home/lega/.local/lib/python3.6/site-packages/lega
restart: on-failure:3
networks:
- lega
Expand Down Expand Up @@ -492,6 +525,28 @@ cat >> ${PRIVATE}/lega.yml <<EOF
EOF

if [[ ${INBOX_BACKEND} == 's3' ]]; then
cat >> ${PRIVATE}/lega.yml <<EOF
# Inbox S3 Backend Storage
inbox-s3-backend:
hostname: inbox-s3-backend
container_name: inbox-s3-backend
labels:
lega_label: "inbox-s3-backend"
image: minio/minio
environment:
- MINIO_ACCESS_KEY=${S3_ACCESS_KEY_INBOX}
- MINIO_SECRET_KEY=${S3_SECRET_KEY_INBOX}
volumes:
- inbox-s3:/data
restart: on-failure:3
networks:
- lega
ports:
- "${DOCKER_PORT_s3_inbox}:9000"
command: server /data
EOF
fi

if [[ ${REAL_CEGA} != 'yes' ]]; then

Expand Down Expand Up @@ -612,4 +667,14 @@ LEGA_PASSWORD = ${LEGA_PASSWORD}
KEYS_PASSWORD = ${KEYS_PASSWORD}
EOF

if [[ ${INBOX_BACKEND} == 's3' ]]; then
cat >> ${PRIVATE}/.trace <<EOF
#
# Inbox S3 backend
DOCKER_PORT_s3_inbox = ${DOCKER_PORT_s3_inbox}
S3_ACCESS_KEY_INBOX = ${S3_ACCESS_KEY_INBOX}
S3_SECRET_KEY_INBOX = ${S3_SECRET_KEY_INBOX}
EOF
fi

task_complete "Bootstrap complete"
6 changes: 6 additions & 0 deletions deploy/bootstrap/settings.rc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ PGP_PASSPHRASE=$(generate_password 16)
S3_ACCESS_KEY=$(generate_password 16)
S3_SECRET_KEY=$(generate_password 32)

if [[ ${INBOX_BACKEND} == 's3' ]]; then # S3 backend for inbox
DOCKER_PORT_s3_inbox=9001
S3_ACCESS_KEY_INBOX=$(generate_password 16)
S3_SECRET_KEY_INBOX=$(generate_password 32)
fi

LOG_LEVEL=DEBUG

LEGA_PASSWORD=$(generate_password 32)
Expand Down
20 changes: 9 additions & 11 deletions lega/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import sys
import logging
from pathlib import Path
from functools import partial

from legacryptor.crypt4gh import get_header
Expand All @@ -33,7 +32,7 @@

@db.catch_error
@db.crypt4gh_to_user_errors
def work(fs, channel, data):
def work(fs, inbox_fs, channel, data):
"""Read a message, split the header and send the remainder to the backend store."""
filepath = data['filepath']
LOG.info(f"Processing {filepath}")
Expand All @@ -50,14 +49,12 @@ def work(fs, channel, data):
file_id = db.insert_file(filepath, user_id)
data['file_id'] = file_id # must be there: database error uses it

# Find inbox
inbox = Path(CONF.get_value('inbox', 'location', raw=True) % user_id)
LOG.info(f"Inbox area: {inbox}")
# Instantiate the inbox backend
inbox = inbox_fs(user_id)
LOG.info("Inbox backend: %s", inbox)

# Check if file is in inbox
inbox_filepath = inbox / filepath.lstrip('/')
LOG.info(f"Inbox file path: {inbox_filepath}")
if not inbox_filepath.exists():
if not inbox.exists(filepath):
raise exceptions.NotFoundInInbox(filepath) # return early

# Ok, we have the file in the inbox
Expand All @@ -72,8 +69,8 @@ def work(fs, channel, data):
org_msg.pop('status', None)

# Strip the header out and copy the rest of the file to the vault
LOG.debug(f'Opening {inbox_filepath}')
with open(inbox_filepath, 'rb') as infile:
LOG.debug('Opening %s', filepath)
with inbox.open(filepath, 'rb') as infile:
LOG.debug(f'Reading header | file_id: {file_id}')
beginning, header = get_header(infile)

Expand All @@ -100,9 +97,10 @@ def main(args=None):

CONF.setup(args) # re-conf

inbox_fs = getattr(storage, CONF.get_value('inbox', 'driver', default='FileStorage'))
fs = getattr(storage, CONF.get_value('vault', 'driver', default='FileStorage'))
broker = get_connection('broker')
do_work = partial(work, fs(), broker.channel())
do_work = partial(work, fs('vault', 'lega'), partial(inbox_fs, 'inbox'), broker.channel())

# upstream link configured in local broker
consume(do_work, broker, 'files', 'archived')
Expand Down
3 changes: 2 additions & 1 deletion lega/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def send_message(self, username, path):
inbox = self.inbox_location % username

if self.isolation:
filepath, filename = (os.path.join(inbox, path.lstrip('/')), path)
p = path.lstrip('/')
filepath, filename = (os.path.join(inbox, p), p)
else:
filepath, filename = (path, path[len(inbox):])

Expand Down
2 changes: 1 addition & 1 deletion lega/utils/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def get_connection(domain, blocking=True):

def publish(message, channel, exchange, routing, correlation_id=None):
"""Send a message to the local broker with ``path`` was updated."""
LOG.debug(f'Sending {message} to exchange: {exchange} [routing key: {routing}]')
LOG.debug(f'Sending to exchange: {exchange} [routing key: {routing}]')
channel.basic_publish(exchange=exchange,
routing_key=routing,
body=json.dumps(message),
Expand Down

0 comments on commit efdc4b7

Please sign in to comment.