-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #101 from EGA-archive/pipeline
Pipeline
- Loading branch information
Showing
50 changed files
with
1,983 additions
and
1,907 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
# Version 1.2 | ||
|
||
* Less MQ connection sockets: one federated queue, one shovel (and added message ``type`` to distinguish the messages) | ||
* MQ Heartbeat are reintroduced | ||
* ingest and verify are one service: Since we were loading the data in memory, we also decrypt, checksum and move it to a staging area | ||
* 2 instances of a backup microservice are added. Obviously, this is for illustration purpose only, each LocalEGA site might already have their own backup system. Nevertheless, a trust/confirmation is sent to CentralEGA. | ||
* Database pipeline segregated from the main final database | ||
* A save2db service is introduced at the end of the pipeline to save information in the long-term storage database (not the pipeline DB). It can handle also handle the dataset mappings (as a job of type `mapping`). | ||
* Correlation IDs are used for each inbox upload/rename/deletion. However, when several message types are emitted by CentralEGA, the same correlation ID might be reused. Therefore, we introduce a `job_id`, handled by the database. The latter generates new job id if necessary (detecting if repeated messages). | ||
* No leaked information from the LocalEGAs to CentralEGA. We only use checksums and public information | ||
* Support for S3 has been factorized out from the code. The code is smaller and simpler. In order to support an S3-backed storage, the system administrator can for example use [S3-fuse filesystem](https://github.com/s3fs-fuse/s3fs-fuse) | ||
* The pipeline database and mq docker images are migrated back into this repo | ||
|
||
# Version 1.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
#!/usr/bin/env python3 | ||
# -*- coding: utf-8 -*- | ||
|
||
import sqlite3 | ||
|
||
from lega.conf import CONF | ||
from lega.utils.amqp import consume, publish | ||
|
||
|
||
CONN = sqlite3.connect(':memory:') # don't bother, keep it in RAM. Too bad if we restart | ||
|
||
def init_db(): | ||
c = CONN.cursor() | ||
c.execute('''CREATE TABLE accessions (id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
md5 text UNIQUE, | ||
username text, | ||
filepath text)''') | ||
CONN.commit() | ||
|
||
def get_accession_id(md5, username, filepath): | ||
c = CONN.cursor() | ||
c.execute('''INSERT INTO accessions (md5,username,filepath) | ||
VALUES(?,?,?) | ||
ON CONFLICT(md5) DO NOTHING;''', [md5, username, filepath]) | ||
accession_id = c.lastrowid | ||
CONN.commit() | ||
return accession_id | ||
|
||
def work(data): | ||
"""Read a message, split the header and decrypt the remainder.""" | ||
|
||
|
||
decrypted_checksums = data['decrypted_checksums'] | ||
|
||
md5_checksum = None | ||
for c in decrypted_checksums: | ||
if c.get('type') == 'md5': | ||
md5_checksum = c.get('value') | ||
break | ||
|
||
if md5_checksum is None: | ||
data['reason'] = 'Missing md5 checksum' | ||
publish(data, exchange='localega.v1', routing_key='files.error') | ||
|
||
|
||
filepath = data['filepath'] | ||
username = data['user'] | ||
accession_id = get_accession_id(md5_checksum, username, filepath) | ||
accession = f"EGAF{accession_id:0>11}" # I think EBI decided to use 11 digits | ||
print('Using accession id:', accession) # no LOG.debug for __main__ and don't care | ||
|
||
data['type'] = 'accession' | ||
data['accession_id'] = accession | ||
|
||
# Publish the answer | ||
publish(data, exchange='localega.v1', routing_key='accession') | ||
# All good: Ack message | ||
|
||
def main(): | ||
init_db() | ||
consume(work) | ||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
#!/usr/bin/env python | ||
# -*- coding: utf-8 -*- | ||
|
||
import sys | ||
import configparser | ||
|
||
from docopt import docopt | ||
|
||
__doc__ = f''' | ||
Utility to help bootstrap a LocalEGA instance. | ||
Usage: | ||
{sys.argv[0]} [options] <conf> | ||
Options: | ||
-h, --help Prints this help and exit | ||
-v, --version Prints the version and exits | ||
-V, --verbose Prints more output | ||
''' | ||
|
||
def main(conf, args): | ||
|
||
config = configparser.RawConfigParser() | ||
config['DEFAULT'] = { | ||
'queue': 'v1.files.verified', | ||
'exchange': conf.get('mq', 'exchange'), | ||
'cega_exchange': conf.get('mq', 'exchange'), | ||
'cega_error_key': 'files.error', | ||
} | ||
|
||
config['broker'] = { | ||
'connection': conf.get('mq', 'connection'), | ||
'enable_ssl': 'yes', | ||
'verify_peer': 'yes', | ||
'verify_hostname': 'no', | ||
'cacertfile': '/cega/CA.crt', | ||
'certfile': '/cega/ssl.crt', | ||
'keyfile': '/cega/ssl.key', | ||
} | ||
|
||
# output | ||
config.write(sys.stdout) | ||
|
||
|
||
if __name__ == '__main__': | ||
args = docopt(__doc__, | ||
sys.argv[1:], | ||
help=True, | ||
version='CentralEGA accession service boostrap (version 0.2)') | ||
conf = configparser.RawConfigParser() | ||
conf.read(args['<conf>']) | ||
main(conf, args) |
Oops, something went wrong.