Skip to content

Commit

Permalink
Merge pull request #88 from EGA-archive/retry-loops
Browse files Browse the repository at this point in the history
Improve robustness
  • Loading branch information
silverdaz committed Dec 13, 2019
2 parents ad7eabf + d99b812 commit f9843e9
Show file tree
Hide file tree
Showing 30 changed files with 823 additions and 731 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/testsuite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ jobs:
# os: [ubuntu-latest, macOS-latest]
os: [ubuntu-latest]
bootstrap: ['', '--archive-backend s3']
# bats: ['integration', 'security', 'robustness']
bats: ['integration']
bats: ['integration', 'security', 'robustness']

runs-on: ${{ matrix.os }}

Expand Down
File renamed without changes.
15 changes: 7 additions & 8 deletions deploy/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,17 @@ preflight-check:
## Base Image
####################################################

# Must find better, but working so far
MAIN_REPO := $(abspath $(dir $(abspath $(lastword $(MAKEFILE_LIST))))/..)

IMAGE_ARGS=
# eg --no-cache
# eg --build-arg LEGA_GID=1000

image:
cd $(MAIN_REPO) && \
docker build -f Dockerfile $(IMAGE_ARGS) \
--build-arg LEGA_GID=1000 \
--tag egarchive/lega-base:latest \
.
cd .. && \
docker build -f Dockerfile $(IMAGE_ARGS) --tag egarchive/lega-base:latest .


####################################################
# Cleaning images

define remove_dangling
docker images $(1) -f "dangling=true" -q | uniq | while read n; do docker rmi -f $$n; done
Expand Down
9 changes: 0 additions & 9 deletions deploy/bootstrap/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,6 @@ services:
environment:
- LEGA_LOG=debug
hostname: ingest${HOSTNAME_DOMAIN}
depends_on:
- db
- mq
image: egarchive/lega-base:latest
container_name: ingest${HOSTNAME_DOMAIN}
volumes:
Expand Down Expand Up @@ -464,9 +461,6 @@ cat >> ${PRIVATE}/lega.yml <<EOF
# Consistency Control
verify:
depends_on:
- db
- mq
environment:
- LEGA_LOG=debug
hostname: verify${HOSTNAME_DOMAIN}
Expand Down Expand Up @@ -509,9 +503,6 @@ cat >> ${PRIVATE}/lega.yml <<EOF
environment:
- LEGA_LOG=debug
hostname: finalize${HOSTNAME_DOMAIN}
depends_on:
- db
- mq
image: egarchive/lega-base:latest
container_name: finalize${HOSTNAME_DOMAIN}
volumes:
Expand Down
6 changes: 4 additions & 2 deletions lega/conf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Configuration(configparser.ConfigParser):

def _load_log(self):
"""Try to load `filename` as configuration file."""
global LOG_FILE
if not LOG_FILE:
print('No logging supplied', file=sys.stderr)
return
Expand All @@ -46,6 +47,7 @@ def _load_log(self):
if _logger.exists():
with open(_logger, 'r') as stream:
dictConfig(yaml.load(stream))
LOG_FILE = _logger
return

# Otherwise trying it as a path
Expand Down Expand Up @@ -79,9 +81,9 @@ def setup(self):

def __repr__(self):
"""Show the configuration files."""
res = 'Configuration file: {CONF_FILE}'
res = f'Configuration file: {CONF_FILE}'
if LOG_FILE:
res += '\nLogging settings loaded from {LOG_FILE}'
res += f'\nLogging settings loaded from {LOG_FILE}'
return res

def get_value(self, section, option, conv=str, default=None, raw=False):
Expand Down
14 changes: 6 additions & 8 deletions lega/finalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
import logging

from .conf import configure
from .utils import db
from .utils.amqp import consume, get_connection
from .utils import db, errors
from .utils.amqp import consume

LOG = logging.getLogger(__name__)


@db.catch_error
@errors.catch(ret_on_error=(None, True))
def work(data):
"""Read a message containing the ids and add it to the database."""
file_id = data['file_id']
Expand All @@ -34,16 +34,14 @@ def work(data):
db.set_stable_id(file_id, stable_id) # That will flag the entry as 'Ready'

LOG.info("Stable ID %s mapped to %s", stable_id, file_id)
return None
return (None, False)


@configure
def main(args=None):
def main():
"""Listen for incoming stable IDs."""
broker = get_connection('broker')

# upstream link configured in local broker
consume(work, broker, 'stableIDs', None)
consume(work, 'stableIDs', None)


if __name__ == '__main__':
Expand Down
18 changes: 8 additions & 10 deletions lega/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
from crypt4gh import header

from .conf import CONF, configure
from .utils import db, exceptions, sanitize_user_id, storage
from .utils.amqp import consume, get_connection
from .utils import db, exceptions, errors, sanitize_user_id, storage
from .utils.amqp import consume

LOG = logging.getLogger(__name__)

Expand All @@ -42,9 +42,8 @@ def get_header(input_file):
# return header.serialize(header_packets)


@db.catch_error
@db.crypt4gh_to_user_errors
def work(fs, inbox_fs, channel, data):
@errors.catch(ret_on_error=(None, True))
def work(fs, inbox_fs, data):
"""Read a message, split the header and send the remainder to the backend store."""
filepath = data['filepath']
LOG.info('Processing %s', filepath)
Expand Down Expand Up @@ -92,19 +91,18 @@ def work(fs, inbox_fs, channel, data):
data['archive_path'] = target

LOG.debug("Reply message: %s", data)
return data
return (data, False)


@configure
def main(args=None):
def main():
"""Run ingest service."""
inbox_fs = getattr(storage, CONF.get_value('inbox', 'storage_driver', default='FileStorage'))
fs = getattr(storage, CONF.get_value('archive', 'storage_driver', default='FileStorage'))
broker = get_connection('broker')
do_work = partial(work, fs('archive', 'lega'), partial(inbox_fs, 'inbox'), broker.channel())
do_work = partial(work, fs('archive', 'lega'), partial(inbox_fs, 'inbox'))

# upstream link configured in local broker
consume(do_work, broker, 'files', 'archived')
consume(do_work, 'files', 'archived')


if __name__ == '__main__':
Expand Down

0 comments on commit f9843e9

Please sign in to comment.