Skip to content

Commit

Permalink
async_db does now retry a connection
Browse files Browse the repository at this point in the history
  • Loading branch information
silverdaz committed Nov 6, 2018
1 parent 3328ae3 commit 8fd1b90
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 71 deletions.
4 changes: 1 addition & 3 deletions deploy/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ private/lega.yml private bootstrap bootstrap-dev:
egarchive/lega --prefix $(PROJECT_NAME) ${ARGS}

up: private/lega.yml
@docker-compose --log-level ERROR -f $< up -d db mq inbox vault
@sleep 6
@docker-compose --log-level ERROR -f $< up -d ingest verify finalize outgest streamer
@docker-compose --log-level ERROR -f $< up -d

down:
-@docker-compose --log-level ERROR down -v
Expand Down
11 changes: 4 additions & 7 deletions lega/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@

async def init(app):
# Some settings
app['db'] = await db.create_pool(loop=app.loop) # db_out: read-only vault, read-write download logs
LOG.info('DB Connection pool created')

chunk_size = CONF.get_value('vault', 'chunk_size', conv=int, default=1<<22) # 4 MB
app['chunk_size'] = chunk_size
Expand Down Expand Up @@ -101,8 +99,7 @@ async def wrapper(r):
try:

# Fetch information and Create request
pool = r.app['db']
request_info = await db.make_request(pool, stable_id)
request_info = await db.make_request(stable_id)
if not request_info:
LOG.error('Unable to create a request entry')
raise web.HTTPServiceUnavailable(reason='Unable to process request')
Expand All @@ -121,7 +118,7 @@ async def wrapper(r):
raise web.HTTPUnprocessableEntity(reason='Unsupported storage type')

async def db_update(message):
await db.update(pool, request_id, status=message)
await db.update(request_id, status=message)

# Do the job
response, dlsize = await func(r,
Expand All @@ -134,7 +131,7 @@ async def db_update(message):
mover,
chunk_size=r.app['chunk_size'])
# Mark as complete
await db.download_complete(pool, request_id, dlsize)
await db.download_complete(request_id, dlsize)
return response
#except web.HTTPError as err:
except Exception as err:
Expand All @@ -143,7 +140,7 @@ async def db_update(message):
cause = err.__cause__ or err
LOG.error(f'{cause!r}') # repr = Technical
if request_id:
await db.set_error(pool, request_id, cause, client_ip=data.get('client_ip'))
await db.set_error(request_id, cause, client_ip=data.get('client_ip'))
raise web.HTTPServiceUnavailable(reason='Unable to process request')
return wrapper

Expand Down
173 changes: 119 additions & 54 deletions lega/utils/async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,64 +8,119 @@
import logging
import traceback
from socket import gethostname
#from contextlib import asynccontextmanager
from async_generator import asynccontextmanager

import aiopg

from .db import DBConnection
from ..conf import CONF

LOG = logging.getLogger(__name__)

class DBConnection():
pool = None
args = None

def __init__(self, conf_section='db', on_failure=None, loop=None):
self.on_failure = on_failure
self.conf_section = conf_section or 'db'
self.loop = loop

def fetch_args(self):
return { 'user': CONF.get_value(self.conf_section, 'user'),
'password': CONF.get_value(self.conf_section, 'password'),
'database': CONF.get_value(self.conf_section, 'database'),
'host': CONF.get_value(self.conf_section, 'host'),
'port': CONF.get_value(self.conf_section, 'port', conv=int),
'connect_timeout': CONF.get_value(self.conf_section, 'try_interval', conv=int, default=1),
'sslmode': CONF.get_value(self.conf_section, 'sslmode'),
}


async def connect(self, force=False):
'''Get the database connection (which encapsulates a database session)
Upon success, the connection is cached.
Before success, we try to connect ``try`` times every ``try_interval`` seconds (defined in CONF)
Executes ``on_failure`` after ``try`` attempts.
'''

if force:
self.close()

if self.pool:
return

if not self.args:
self.args = self.fetch_args()
LOG.info(f"Initializing a connection to: {self.args['host']}:{self.args['port']}/{self.args['database']}")

nb_try = CONF.get_value('postgres', 'try', conv=int, default=1)
assert nb_try > 0, "The number of reconnection should be >= 1"
LOG.debug(f"{nb_try} attempts")
count = 0
while count < nb_try:
try:
LOG.debug(f"Connection attempt {count+1}")
self.pool = await aiopg.create_pool(**self.args, loop=self.loop)
LOG.debug(f"Connection successful")
return
except psycopg2.OperationalError as e:
LOG.debug(f"Database connection error: {e!r}")
count += 1
except psycopg2.InterfaceError as e:
LOG.debug(f"Invalid connection parameters: {e!r}")
break

# fail to connect
if self.on_failure:
self.on_failure()

async def ping(self):
if self.pool is None:
await self.connect()
try:
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
cur.execute('SELECT 1;')
LOG.debug("Ping db successful")
except psycopg2.OperationalError as e:
LOG.debug('Ping failed: %s', e)
self.connect(force=True) # reconnect

@asynccontextmanager
async def cursor(self):
await self.ping()
async with self.pool.acquire() as conn:
async with conn.cursor() as cur:
yield cur
# closes cursor on exit
# transaction autocommit, but connection not closed

async def close(self):
LOG.debug("Closing the database")
if self.pool:
self.pool.close()
await self.pool.wait_closed()
self.pool = None

#############################################################
## Async code - Used by data-out
#############################################################
connection = DBConnection()

async def create_pool(loop=None):
db_args = DBConnection('db').fetch_args()
LOG.info(f"Initializing a connection to: {db_args['user']}@{db_args['host']}:{db_args['port']}/{db_args['database']}")
return await aiopg.create_pool(**db_args, loop=loop)

async def make_request(pool, stable_id):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute('SELECT * FROM local_ega_download.make_request(%(stable_id)s);', {'stable_id': stable_id})
return await cur.fetchone()

async def download_complete(pool, req_id, dlsize):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute('SELECT local_ega_download.download_complete(%(req_id)s,%(dlsize)s);',
{'req_id': req_id, 'dlsize': dlsize})

async def get(pool, req_id, fields, table='local_ega_download.main'):
"""SELECT *fields FROM table WHERE id = req_id"""
assert req_id, 'Eh? No req_id?'
table = table or 'local_ega_download.main'
LOG.debug(f'Select fields for {req_id} from {table}: {fields}')
if not fields:
return None
async with pool.acquire() as conn:
async with conn.cursor() as cur:
q = ', '.join(fields)
query = f'SELECT {q} FROM {table} WHERE id = %(req_id)s;' # no sql injection for {table}
await cur.execute(query, { 'file_id': file_id })
res = await cur.fetchone()
if res and len(fields) == 1: # deconstruct if only one field is requested
res = res[0]
return res

async def update(pool, req_id, **kwargs):
"""Updating information in database for ``req_id``."""
if not kwargs:
return
async with pool.acquire() as conn:
async with conn.cursor() as cur:
q = ', '.join(f'{k} = %({k})s' for k in kwargs) # keys
query = f'UPDATE local_ega_download.main SET {q} WHERE id = %(req_id)s;'
kwargs['req_id'] = req_id
await cur.execute(query, kwargs)
async def make_request(stable_id):
async with connection.cursor() as cur:
await cur.execute('SELECT * FROM local_ega_download.make_request(%(stable_id)s);', {'stable_id': stable_id})
return await cur.fetchone()

async def download_complete(req_id, dlsize):
async with connection.cursor() as cur:
await cur.execute('SELECT local_ega_download.download_complete(%(req_id)s,%(dlsize)s);',
{'req_id': req_id, 'dlsize': dlsize})

async def set_error(pool, req_id, error, client_ip=None):
async def set_error(req_id, error, client_ip=None):

exc_type, _, exc_tb = sys.exc_info()
g = traceback.walk_tb(exc_tb)
Expand All @@ -81,11 +136,21 @@ async def set_error(pool, req_id, error, client_ip=None):

hostname = gethostname()

async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute('SELECT local_ega_download.insert_error(%(req_id)s,%(h)s,%(etype)s,%(msg)s,%(client_ip)s);',
{'h':hostname,
'etype': error.__class__.__name__,
'msg': repr(error),
'req_id': req_id,
'client_ip': client_ip})
async with connection.cursor() as cur:
await cur.execute('SELECT local_ega_download.insert_error(%(req_id)s,%(h)s,%(etype)s,%(msg)s,%(client_ip)s);',
{'h':hostname,
'etype': error.__class__.__name__,
'msg': repr(error),
'req_id': req_id,
'client_ip': client_ip})


async def update(req_id, **kwargs):
"""Updating information in database for ``req_id``."""
if not kwargs:
return
async with connection.cursor() as cur:
q = ', '.join(f'{k} = %({k})s' for k in kwargs) # keys
query = f'UPDATE local_ega_download.main SET {q} WHERE id = %(req_id)s;'
kwargs['req_id'] = req_id
await cur.execute(query, kwargs)
7 changes: 0 additions & 7 deletions lega/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@

LOG = logging.getLogger(__name__)

######################################
## DB connection ##
######################################
def _do_exit():
LOG.error("Could not connect to the database: Exiting")
sys.exit(1)

class DBConnection():
conn = None
curr = None
Expand Down

0 comments on commit 8fd1b90

Please sign in to comment.