Skip to content

Commit

Permalink
Adding client_ip and user_info to DB. Adding update_status function
Browse files Browse the repository at this point in the history
  • Loading branch information
silverdaz committed Nov 12, 2018
1 parent cc8df7c commit a5d7773
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 66 deletions.
56 changes: 32 additions & 24 deletions deploy/images/db/download.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,16 @@ CREATE TABLE local_ega_download.main (

-- which files was downloaded
file_id INTEGER NOT NULL REFERENCES local_ega.main(id), -- No "ON DELETE CASCADE"
start_coordinate INTEGER DEFAULT 0,
end_coordinate INTEGER NULL, -- might be missing

-- Status/Progress
status VARCHAR NOT NULL REFERENCES local_ega_download.status (code), -- No "ON DELETE CASCADE"
-- DEFAULT 'INIT' ?

-- user info
user_info TEXT NULL,
client_ip TEXT NULL,

-- Stats
start_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp(),
Expand All @@ -41,19 +47,6 @@ CREATE TABLE local_ega_download.main (
);


-- View on the vault files
CREATE VIEW local_ega_download.files AS
SELECT id,
stable_id,
vault_file_reference,
vault_file_type,
vault_file_size,
header,
vault_file_checksum,
vault_file_checksum_type
FROM local_ega.main
WHERE status = 'READY';

-- Insert new request, and return some vault information
CREATE TYPE request_type AS (req_id INTEGER,
header TEXT,
Expand All @@ -64,23 +57,28 @@ CREATE TYPE request_type AS (req_id INTEGER,
unencrypted_checksum_type local_ega.checksum_algorithm);


CREATE FUNCTION make_request(sid local_ega.main.stable_id%TYPE)
CREATE FUNCTION make_request(sid local_ega.main.stable_id%TYPE,
uinfo local_ega_download.main.user_info%TYPE,
cip local_ega_download.main.client_ip%TYPE,
scoord local_ega_download.main.start_coordinate%TYPE DEFAULT 0,
ecoord local_ega_download.main.end_coordinate%TYPE DEFAULT NULL)
RETURNS request_type AS $make_request$
#variable_conflict use_column
DECLARE
req local_ega_download.request_type;
vault_rec local_ega_download.files%ROWTYPE;
vault_rec local_ega.main%ROWTYPE;
rid INTEGER;
BEGIN

SELECT * INTO vault_rec FROM local_ega_download.files WHERE stable_id = sid LIMIT 1;
-- Reading from main directly
SELECT * INTO vault_rec FROM local_ega.main WHERE status = 'READY' AND stable_id = sid LIMIT 1;

IF vault_rec IS NULL THEN
RAISE EXCEPTION 'Vault file not found for stable_id: % ', sid;
END IF;

INSERT INTO local_ega_download.main (file_id, status)
VALUES (vault_rec.id, 'INIT')
INSERT INTO local_ega_download.main (file_id, status, user_info, client_ip, start_coordinate, end_coordinate)
VALUES (vault_rec.id, 'INIT', uinfo, cip, scoord, ecoord)
RETURNING local_ega_download.main.id INTO rid;

req.req_id := rid;
Expand All @@ -94,6 +92,7 @@ BEGIN
END;
$make_request$ LANGUAGE plpgsql;


-- When there is an updated, remember the timestamp
CREATE FUNCTION download_updated()
RETURNS TRIGGER AS $download_updated$
Expand Down Expand Up @@ -130,6 +129,17 @@ END;
$download_complete$ LANGUAGE plpgsql;


-- Convenience utility. It hides the table name from the programmatic side
CREATE FUNCTION update_status(reqid local_ega_download.main.id%TYPE,
msg local_ega_download.main.status%TYPE)
RETURNS void AS $update_status$
#variable_conflict use_column
BEGIN
UPDATE local_ega_download.main SET status = msg WHERE id = reqid;
END;
$update_status$ LANGUAGE plpgsql;


-- ##################################################
-- ERRORS
-- ##################################################
Expand All @@ -142,7 +152,6 @@ CREATE TABLE local_ega_download.main_errors (
code TEXT NOT NULL,
description TEXT NOT NULL,

client_ip TEXT, -- where from
hostname TEXT, -- where it happened

-- table logs
Expand All @@ -151,14 +160,13 @@ CREATE TABLE local_ega_download.main_errors (

-- Just showing the current/active errors
CREATE VIEW local_ega_download.errors AS
SELECT id, code, description, client_ip, hostname, occured_at FROM local_ega_download.main_errors
SELECT id, code, description, hostname, occured_at FROM local_ega_download.main_errors
WHERE active = TRUE;

CREATE FUNCTION insert_error(req_id local_ega_download.main.id%TYPE,
h local_ega_download.errors.hostname%TYPE,
etype local_ega_download.errors.code%TYPE,
msg local_ega_download.errors.description%TYPE,
client_ip local_ega_download.errors.client_ip%TYPE)
msg local_ega_download.errors.description%TYPE)
RETURNS void AS $download_error$
DECLARE
fid local_ega_download.main.file_id%TYPE;
Expand All @@ -173,8 +181,8 @@ BEGIN
RAISE EXCEPTION 'Request id not found: %', req_id;
END IF;

INSERT INTO local_ega_download.main_errors (file_id,req_id,hostname,code,description,client_ip)
VALUES (fid,req_id, h, etype, msg, client_ip);
INSERT INTO local_ega_download.main_errors (file_id,req_id,hostname,code,description)
VALUES (fid,req_id, h, etype, msg);

END;
$download_error$ LANGUAGE plpgsql;
15 changes: 0 additions & 15 deletions deploy/images/db/ebi.sql

This file was deleted.

1 change: 0 additions & 1 deletion deploy/images/db/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ EOSQL
# Run sql commands (in order!)
DB_FILES=(/docker-entrypoint-initdb.d/main.sql
/docker-entrypoint-initdb.d/download.sql
/docker-entrypoint-initdb.d/ebi.sql
/docker-entrypoint-initdb.d/qc.sql
/docker-entrypoint-initdb.d/grants.sql)

Expand Down
4 changes: 3 additions & 1 deletion deploy/images/db/grants.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ GRANT USAGE ON SCHEMA local_ega_download TO lega_out;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA local_ega_download TO lega_out; -- Read/Write on local_ega_download.* for lega_out
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA local_ega_download TO lega_out; -- Don't forget the sequences

GRANT SELECT ON local_ega.main TO lega_out; -- Read-Only access for lega_out, through the views inside local_ega_download
-- Read-Only access for lega_out
GRANT SELECT ON local_ega.main TO lega_out;
GRANT USAGE ON SCHEMA local_ega TO lega_out;
23 changes: 15 additions & 8 deletions lega/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,15 @@ async def init(app):
LOG.info(f'Retrieving the Signing Key from {signing_key_location}')
if signing_key_location:
with open(signing_key_location, 'rt') as k: # hex file
key_content = bytes.fromhex(k.read())
print(key_content, len(key_content))
app['signing_key'] = ed25519.SigningKey(key_content)
app['signing_key'] = ed25519.SigningKey(bytes.fromhex(k.read()))
else:
app['signing_key'] = None


async def shutdown(app):
'''Function run after a KeyboardInterrupt. After that: cleanup'''
LOG.info('Shutting down the database engine')
app['db'].close()
await app['db'].wait_closed()
await db.close()

####################################

Expand Down Expand Up @@ -100,7 +97,17 @@ async def wrapper(r):
try:

# Fetch information and Create request
request_info = await db.make_request(stable_id)
user_info = ''
client_ip = data.get('client_ip') or ''
startCoordinate = int(r.query.get('startCoordinate', 0))
endCoordinate = r.query.get('endCoordinate', None)
endCoordinate = int(endCoordinate) if endCoordinate is not None else None

request_info = await db.make_request(stable_id,
user_info,
client_ip,
start_coordinate=startCoordinate,
end_coordinate=endCoordinate)
if not request_info:
LOG.error('Unable to create a request entry')
raise web.HTTPServiceUnavailable(reason='Unable to process request')
Expand All @@ -119,7 +126,7 @@ async def wrapper(r):
raise web.HTTPUnprocessableEntity(reason='Unsupported storage type')

async def db_update(message):
await db.update(request_id, status=message)
await db.update_status(request_id, message)

# Do the job
response, dlsize = await func(r,
Expand All @@ -141,7 +148,7 @@ async def db_update(message):
cause = err.__cause__ or err
LOG.error(f'{cause!r}') # repr = Technical
if request_id:
await db.set_error(request_id, cause, client_ip=data.get('client_ip'))
await db.set_error(request_id, cause)
raise web.HTTPServiceUnavailable(reason='Unable to process request')
return wrapper

Expand Down
33 changes: 17 additions & 16 deletions lega/utils/async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#from contextlib import asynccontextmanager
from async_generator import asynccontextmanager

import psycopg2
import aiopg

from ..conf import CONF
Expand Down Expand Up @@ -110,17 +111,21 @@ async def close(self):
#############################################################
connection = DBConnection()

async def make_request(stable_id):
async def make_request(stable_id, user_info, client_ip, start_coordinate=0, end_coordinate=None):
async with connection.cursor() as cur:
await cur.execute('SELECT * FROM local_ega_download.make_request(%(stable_id)s);', {'stable_id': stable_id})
await cur.execute('SELECT * FROM local_ega_download.make_request(%(stable_id)s,%(user_info)s,%(client_ip)s,%(start_coordinate)s,%(end_coordinate)s);', {'stable_id': stable_id,
'user_info': user_info,
'client_ip': client_ip,
'start_coordinate': start_coordinate,
'end_coordinate': end_coordinate})
return await cur.fetchone()

async def download_complete(req_id, dlsize):
async with connection.cursor() as cur:
await cur.execute('SELECT local_ega_download.download_complete(%(req_id)s,%(dlsize)s);',
{'req_id': req_id, 'dlsize': dlsize})

async def set_error(req_id, error, client_ip=None):
async def set_error(req_id, error):

exc_type, _, exc_tb = sys.exc_info()
g = traceback.walk_tb(exc_tb)
Expand All @@ -137,20 +142,16 @@ async def set_error(req_id, error, client_ip=None):
hostname = gethostname()

async with connection.cursor() as cur:
await cur.execute('SELECT local_ega_download.insert_error(%(req_id)s,%(h)s,%(etype)s,%(msg)s,%(client_ip)s);',
await cur.execute('SELECT local_ega_download.insert_error(%(req_id)s,%(h)s,%(etype)s,%(msg)s);',
{'h':hostname,
'etype': error.__class__.__name__,
'msg': repr(error),
'req_id': req_id,
'client_ip': client_ip})


async def update(req_id, **kwargs):
"""Updating information in database for ``req_id``."""
if not kwargs:
return
'req_id': req_id})

async def update_status(req_id, status):
async with connection.cursor() as cur:
q = ', '.join(f'{k} = %({k})s' for k in kwargs) # keys
query = f'UPDATE local_ega_download.main SET {q} WHERE id = %(req_id)s;'
kwargs['req_id'] = req_id
await cur.execute(query, kwargs)
await cur.execute('SELECT local_ega_download.update_status(%(req_id)s,%(status)s);',
{'req_id': req_id, 'status': status})

async def close():
await connection.close()
2 changes: 1 addition & 1 deletion lega/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def ping(self):
with self.conn.cursor() as cur: # does not commit if error raised
cur.execute('SELECT 1;')
LOG.debug("Ping db successful")
except psycopg2.OperationalError as e:
except (psycopg2.OperationalError, psycopg2.InterfaceError) as e:
LOG.debug('Ping failed: %s', e)
self.connect(force=True) # reconnect

Expand Down

0 comments on commit a5d7773

Please sign in to comment.