Skip to content

Commit

Permalink
Updating the database download tables
Browse files Browse the repository at this point in the history
  • Loading branch information
silverdaz committed Nov 15, 2018
1 parent fbd2715 commit 5ea25e1
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 146 deletions.
198 changes: 75 additions & 123 deletions deploy/images/db/download.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,23 @@ CREATE SCHEMA local_ega_download;

SET search_path TO local_ega_download;

CREATE TABLE local_ega_download.status (
id INTEGER,
code VARCHAR(32) NOT NULL,
description TEXT,
-- contraints
PRIMARY KEY(id), UNIQUE (id), UNIQUE (code)
);

INSERT INTO local_ega_download.status(id,code,description)
VALUES (10, 'INIT' , 'Initializing a download request'),
(20, 'REENCRYPTING', 'Re-Encrypting the header for a given user'),
(30, 'STREAMING' , 'Streaming file from the Vault'),
(40, 'DONE' , 'Download completed'), -- checksums are in the Crypt4GH formatted file
-- and validated by the decryptor
(0, 'ERROR' , 'An Error occured, check the error table');
CREATE TYPE status AS ENUM ('SUCCESS', 'ERROR');


CREATE TABLE local_ega_download.main (
CREATE TABLE local_ega_download.requests (
id SERIAL, PRIMARY KEY(id), UNIQUE (id),

-- which files was downloaded
file_id INTEGER NOT NULL REFERENCES local_ega.main(id), -- No "ON DELETE CASCADE"
start_coordinate BIGINT DEFAULT 0,
end_coordinate BIGINT NULL, -- might be missing

-- Status/Progress
status VARCHAR NOT NULL REFERENCES local_ega_download.status (code), -- No "ON DELETE CASCADE"
-- DEFAULT 'INIT' ?

-- user info
user_info TEXT NULL,
client_ip TEXT NULL,

-- Stats
start_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp(),
end_timestamp TIMESTAMP,
bytes BIGINT DEFAULT 0,
speed FLOAT DEFAULT 0.0, -- bytes per seconds

-- table logs
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp(),
last_modified TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp()
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp()
);


-- Insert new request, and return some vault information
CREATE TYPE request_type AS (req_id INTEGER,
header TEXT,
Expand All @@ -56,133 +28,113 @@ CREATE TYPE request_type AS (req_id INTEGER,
unencrypted_checksum VARCHAR,
unencrypted_checksum_type local_ega.checksum_algorithm);


CREATE FUNCTION make_request(sid local_ega.main.stable_id%TYPE,
uinfo local_ega_download.main.user_info%TYPE,
cip local_ega_download.main.client_ip%TYPE,
scoord local_ega_download.main.start_coordinate%TYPE DEFAULT 0,
ecoord local_ega_download.main.end_coordinate%TYPE DEFAULT NULL)
uinfo local_ega_download.requests.user_info%TYPE,
cip local_ega_download.requests.client_ip%TYPE,
scoord local_ega_download.requests.start_coordinate%TYPE DEFAULT 0,
ecoord local_ega_download.requests.end_coordinate%TYPE DEFAULT NULL)
RETURNS request_type AS $make_request$
#variable_conflict use_column
DECLARE
req local_ega_download.request_type;
vault_rec local_ega.main%ROWTYPE;
vault_rec local_ega.vault_files%ROWTYPE;
rid INTEGER;
BEGIN

-- Reading from main directly
SELECT * INTO vault_rec FROM local_ega.main WHERE status = 'READY' AND stable_id = sid LIMIT 1;
-- Find the file
SELECT * INTO vault_rec FROM local_ega.vault_files WHERE stable_id = sid LIMIT 1;

IF vault_rec IS NULL THEN
RAISE EXCEPTION 'Vault file not found for stable_id: % ', sid;
END IF;

INSERT INTO local_ega_download.main (file_id, status, user_info, client_ip, start_coordinate, end_coordinate)
VALUES (vault_rec.id, 'INIT', uinfo, cip, scoord, ecoord)
RETURNING local_ega_download.main.id INTO rid;

-- New entry, or reuse old entry
INSERT INTO local_ega_download.requests (file_id, user_info, client_ip, start_coordinate, end_coordinate)
VALUES (vault_rec.file_id, uinfo, cip, scoord, ecoord)
ON CONFLICT (id) DO NOTHING
RETURNING local_ega_download.requests.id INTO rid;

-- result
req.req_id := rid;
req.header := vault_rec.header;
req.vault_path := vault_rec.vault_file_reference;
req.vault_type := vault_rec.vault_file_type;
req.file_size := vault_rec.vault_file_size;
req.unencrypted_checksum := vault_rec.vault_file_checksum;
req.unencrypted_checksum_type := vault_rec.vault_file_checksum_type;
req.vault_path := vault_rec.vault_path;
req.vault_type := vault_rec.vault_type;
req.file_size := vault_rec.vault_filesize;
req.unencrypted_checksum := vault_rec.unencrypted_checksum;
req.unencrypted_checksum_type := vault_rec.unencrypted_checksum_type;
RETURN req;
END;
$make_request$ LANGUAGE plpgsql;


-- When there is an updated, remember the timestamp
CREATE FUNCTION download_updated()
RETURNS TRIGGER AS $download_updated$
BEGIN
NEW.last_modified = clock_timestamp();
RETURN NEW;
END;
$download_updated$ LANGUAGE plpgsql;
CREATE TABLE local_ega_download.success (
id SERIAL, PRIMARY KEY(id), UNIQUE (id),

CREATE TRIGGER download_updated AFTER UPDATE ON local_ega_download.main FOR EACH ROW EXECUTE PROCEDURE download_updated();
-- which requested file it was
req_id INTEGER NOT NULL REFERENCES local_ega_download.requests(id), -- No "ON DELETE CASCADE"

-- Stats
bytes BIGINT DEFAULT 0,
speed FLOAT DEFAULT 0.0, -- bytes per seconds

-- Mark a download as complete, and calculate the speed
CREATE FUNCTION download_complete(reqid local_ega_download.main.id%TYPE,
dlsize local_ega_download.main.bytes%TYPE)
RETURNS void AS $download_complete$
DECLARE
fid local_ega.main.id%TYPE;
curr_timestamp TIMESTAMP;
BEGIN
curr_timestamp := clock_timestamp();
UPDATE local_ega_download.main
SET status = 'DONE',
end_timestamp = curr_timestamp,
bytes = dlsize,
speed = dlsize / extract( epoch from (curr_timestamp - start_timestamp)) -- extract (epoch for interval) = elapsed seconds
-- now pray for no div by zero
WHERE id = reqid
RETURNING file_id INTO fid;

-- turn off active errors
UPDATE local_ega_download.main_errors SET active = FALSE WHERE file_id = fid;
END;
$download_complete$ LANGUAGE plpgsql;
-- table logs
occured_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp()
);


-- Convenience utility. It hides the table name from the programmatic side
CREATE FUNCTION update_status(reqid local_ega_download.main.id%TYPE,
msg local_ega_download.main.status%TYPE)
RETURNS void AS $update_status$
#variable_conflict use_column
-- Mark a download as complete, and calculate the speed
CREATE FUNCTION download_complete(rid local_ega_download.requests.id%TYPE,
dlsize local_ega_download.success.bytes%TYPE,
s local_ega_download.success.speed%TYPE)
RETURNS void AS $insert_success$
BEGIN
UPDATE local_ega_download.main SET status = msg WHERE id = reqid;
INSERT INTO local_ega_download.success(req_id,bytes,speed)
VALUES(rid,dlsize,s);
END;
$update_status$ LANGUAGE plpgsql;
$insert_success$ LANGUAGE plpgsql;


-- ##################################################
-- ERRORS
-- ##################################################
CREATE TABLE local_ega_download.main_errors (
id SERIAL, PRIMARY KEY(id), UNIQUE (id),
active BOOLEAN NOT NULL DEFAULT TRUE,
file_id INTEGER NOT NULL REFERENCES local_ega.main(id), -- ON DELETE CASCADE,
req_id INTEGER NOT NULL REFERENCES local_ega_download.main(id), -- ON DELETE CASCADE,

code TEXT NOT NULL,
description TEXT NOT NULL,

hostname TEXT, -- where it happened

-- table logs
occured_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp()
);
CREATE TABLE local_ega_download.errors (
id SERIAL, PRIMARY KEY(id), UNIQUE (id),
req_id INTEGER NOT NULL REFERENCES local_ega_download.requests(id), -- ON DELETE CASCADE,

-- Just showing the current/active errors
CREATE VIEW local_ega_download.errors AS
SELECT id, code, description, hostname, occured_at FROM local_ega_download.main_errors
WHERE active = TRUE;
code TEXT NOT NULL,
description TEXT NOT NULL,

CREATE FUNCTION insert_error(req_id local_ega_download.main.id%TYPE,
h local_ega_download.errors.hostname%TYPE,
etype local_ega_download.errors.code%TYPE,
msg local_ega_download.errors.description%TYPE)
RETURNS void AS $download_error$
DECLARE
fid local_ega_download.main.file_id%TYPE;
-- where it happened
hostname TEXT,

-- table logs
occured_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp()
);

CREATE FUNCTION insert_error(rid local_ega_download.requests.id%TYPE,
h local_ega_download.errors.hostname%TYPE,
etype local_ega_download.errors.code%TYPE,
msg local_ega_download.errors.description%TYPE)
RETURNS void AS $insert_error$
BEGIN
INSERT INTO local_ega_download.errors (req_id,hostname,code,description)
VALUES (rid, h, etype, msg);
END;
$insert_error$ LANGUAGE plpgsql;

UPDATE local_ega_download.main
SET status = 'ERROR'
WHERE id = req_id
RETURNING file_id INTO fid;

IF fid IS NULL THEN
RAISE EXCEPTION 'Request id not found: %', req_id;
END IF;

INSERT INTO local_ega_download.main_errors (file_id,req_id,hostname,code,description)
VALUES (fid,req_id, h, etype, msg);
-- ##################################################
-- EBI views
-- ##################################################

END;
$download_error$ LANGUAGE plpgsql;
CREATE VIEW local_ega_download.events AS
SELECT r.file_id AS file_id
, r.start_coordinate AS start_coordinate
, r.end_coordinate AS end_coordinate
, r.user_info AS email
, r.client_ip AS client_ip
, e.code AS event_type
, e.description AS event
FROM local_ega_download.errors e
INNER JOIN local_ega_download.requests r
ON r.id = e.req_id;
2 changes: 1 addition & 1 deletion deploy/images/db/grants.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA local_ega_download TO lega_out;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA local_ega_download TO lega_out; -- Don't forget the sequences

-- Read-Only access for lega_out
GRANT SELECT ON local_ega.main TO lega_out;
GRANT SELECT ON local_ega.vault_files TO lega_out;
GRANT USAGE ON SCHEMA local_ega TO lega_out;
15 changes: 14 additions & 1 deletion deploy/images/db/main.sql
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ CREATE TABLE local_ega.main (

-- Encryption/Decryption
encryption_method VARCHAR REFERENCES local_ega.vault_encryption (mode), -- ON DELETE CASCADE,
version INTEGER , -- DEFAULT 1, -- Crypt4GH version
version INTEGER NULL, -- DEFAULT 1, -- Crypt4GH version
header TEXT, -- Crypt4GH header
session_key_checksum VARCHAR(128) NULL, -- NOT NULL, -- To check if session key already used
session_key_checksum_type checksum_algorithm,
Expand Down Expand Up @@ -268,6 +268,19 @@ CREATE TRIGGER mark_ready
FOR EACH ROW WHEN (NEW.status = 'READY')
EXECUTE PROCEDURE mark_ready();

-- View for Data-out
CREATE VIEW local_ega.vault_files AS
SELECT id AS file_id
, stable_id AS stable_id
, vault_file_reference AS vault_path
, vault_file_type AS vault_type
, vault_file_size AS vault_filesize
, vault_file_checksum AS unencrypted_checksum
, vault_file_checksum_type AS unencrypted_checksum_type
, header AS header
, version AS version
FROM local_ega.main
WHERE status = 'READY';

-- ##########################################################################
-- About the encryption
Expand Down
14 changes: 6 additions & 8 deletions lega/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import sys
import os
import io
import time
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
Expand Down Expand Up @@ -99,6 +100,7 @@ async def wrapper(r):
pubkey = PublicKey(pubkey, KeyFormatter)

request_id = None
start_time = time.perf_counter()
try:

# Fetch information and Create request
Expand Down Expand Up @@ -130,13 +132,9 @@ async def wrapper(r):
LOG.error('Invalid storage method: %s', vault_type, extra={'correlation_id': correlation_id})
raise web.HTTPUnprocessableEntity(reason='Unsupported storage type')

async def db_update(message):
await db.update_status(request_id, message)

# Do the job
response, dlsize = await func(r,
correlation_id,
db_update,
pubkey,
r.app['private_key'],
r.app['signing_key'],
Expand All @@ -145,7 +143,9 @@ async def db_update(message):
mover,
chunk_size=r.app['chunk_size'])
# Mark as complete
await db.download_complete(request_id, dlsize)
elapsed_time = round(time.perf_counter() - start_time, 3) # rounded?
speed = dlsize / elapsed_time if elapsed_time else 0.0
await db.download_complete(request_id, dlsize, speed)
return response
#except web.HTTPError as err:
except Exception as err:
Expand All @@ -160,10 +160,9 @@ async def db_update(message):


@request_context
async def outgest(r, correlation_id, set_progress, pubkey, privkey, signing_key, header, vault_path, mover, chunk_size=1<<22):
async def outgest(r, correlation_id, pubkey, privkey, signing_key, header, vault_path, mover, chunk_size=1<<22):

# Crypt4GH encryption
await set_progress('REENCRYPTING')
LOG.info('Re-encrypting the header', extra={'correlation_id': correlation_id}) # in hex -> bytes, and take away 16 bytes
header_obj = Header.from_stream(io.BytesIO(bytes.fromhex(header)))
reencrypted_header = header_obj.reencrypt(pubkey, privkey, signing_key=signing_key)
Expand All @@ -172,7 +171,6 @@ async def outgest(r, correlation_id, set_progress, pubkey, privkey, signing_key,
LOG.debug('Reenc header %s', renc_header.hex(), extra={'correlation_id': correlation_id})

# Read the rest from the vault
await set_progress('STREAMING')
LOG.info('Opening vault file: %s', vault_path, extra={'correlation_id': correlation_id})
with mover.open(vault_path, 'rb') as vfile:

Expand Down
22 changes: 9 additions & 13 deletions lega/utils/async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,18 @@ async def close(self):

async def make_request(stable_id, user_info, client_ip, start_coordinate=0, end_coordinate=None):
async with connection.cursor() as cur:
await cur.execute('SELECT * FROM local_ega_download.make_request(%(stable_id)s,%(user_info)s,%(client_ip)s,%(start_coordinate)s,%(end_coordinate)s);', {'stable_id': stable_id,
'user_info': user_info,
'client_ip': client_ip,
'start_coordinate': start_coordinate,
'end_coordinate': end_coordinate})
await cur.execute('SELECT * FROM local_ega_download.make_request(%(sid)s,%(uinfo)s,%(ip)s,%(scoord)s,%(ecoord)s);',
{'sid': stable_id,
'uinfo': user_info,
'ip': client_ip,
'scoord': start_coordinate,
'ecoord': end_coordinate})
return await cur.fetchone()

async def download_complete(req_id, dlsize):
async def download_complete(req_id, dlsize, speed):
async with connection.cursor() as cur:
await cur.execute('SELECT local_ega_download.download_complete(%(req_id)s,%(dlsize)s);',
{'req_id': req_id, 'dlsize': dlsize})
await cur.execute('SELECT local_ega_download.download_complete(%(req_id)s,%(dlsize)s,%(speed)s);',
{'req_id': req_id, 'dlsize': dlsize, 'speed': speed})

async def set_error(req_id, error):

Expand All @@ -149,10 +150,5 @@ async def set_error(req_id, error):
'msg': repr(error),
'req_id': req_id})

async def update_status(req_id, status):
async with connection.cursor() as cur:
await cur.execute('SELECT local_ega_download.update_status(%(req_id)s,%(status)s);',
{'req_id': req_id, 'status': status})

async def close():
await connection.close()

0 comments on commit 5ea25e1

Please sign in to comment.