Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Make the system self-recoverable after connection errors (#319)
Browse files Browse the repository at this point in the history
* Added database rollback on unhandled worker SQL error

* Added celery config: task_soft_time_limit

* Implemented cleanup_session for rollback/fix stale jobs

* Fixed linting
  • Loading branch information
roll committed Sep 12, 2018
1 parent c054966 commit 36782b8
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 4 deletions.
6 changes: 2 additions & 4 deletions goodtablesio/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from goodtablesio.integrations.github.blueprint import github
from goodtablesio.integrations.s3.blueprint import s3
from goodtablesio.utils.frontend import render_component
from goodtablesio.utils.database import cleanup_session
from goodtablesio.services import database
log = logging.getLogger(__name__)

Expand Down Expand Up @@ -67,10 +68,7 @@ def server_error(err):

@app.errorhandler(sqlalchemy.exc.SQLAlchemyError)
def error_handler(err):
# To prevent session from break because of unhandled error with no rollback
# https://github.com/frictionlessdata/goodtables.io/issues/97
log.info('Database session rollback by server error handler')
database['session'].rollback()
cleanup_session(database['session'])
raise err


Expand Down
1 change: 1 addition & 0 deletions goodtablesio/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

broker_url = os.environ['BROKER_URL']
result_backend = os.environ['RESULT_BACKEND']
task_soft_time_limit = 10 * 60 # seconds = 10 minutes
task_default_queue = 'default'
task_serializer = 'json'
result_serializer = 'json'
Expand Down
2 changes: 2 additions & 0 deletions goodtablesio/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from goodtablesio.models.job import Job
from goodtablesio.services import database
from goodtablesio.models.internal_job import InternalJob
from goodtablesio.utils.database import cleanup_session
log = logging.getLogger(__name__)


Expand Down Expand Up @@ -56,6 +57,7 @@ def _on_failure(exception, job_class, job_id):
elif isinstance(exception, SoftTimeLimitExceeded):
message = 'Time limit exceeded'
else:
cleanup_session(database['session'])
message = 'Internal error'
log.exception(exception)

Expand Down
24 changes: 24 additions & 0 deletions goodtablesio/utils/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from goodtablesio import settings
log = logging.getLogger(__name__)


# Module API
Expand All @@ -9,3 +12,24 @@ def create_session(**params):
engine = create_engine(settings.DATABASE_URL, **params)
session = scoped_session(sessionmaker(bind=engine))
return session


def cleanup_session(session):
# To prevent session from break because of unhandled error with no rollback
# https://github.com/frictionlessdata/goodtables.io/issues/97
# https://github.com/frictionlessdata/goodtables.io/issues/317
from goodtablesio.models.internal_job import InternalJob
from goodtablesio.models.job import Job
log.info('Database session cleanup: rollback and fix stale jobs')
# Rollback session
session.rollback()
# Fix stale jobs
for model in [Job, InternalJob]:
utcnow = datetime.datetime.utcnow()
since = utcnow - datetime.timedelta(seconds=settings.task_soft_time_limit)
error = {'message': 'Time limit exceeded'}
(session.query(model)
.filter(model.finished == None, model.created < since) # noqa
.update({'finished': utcnow, 'status': 'error', 'error': error},
synchronize_session='fetch'))
session.commit()

0 comments on commit 36782b8

Please sign in to comment.