Skip to content

Commit

Permalink
Merge c690516 into 44c3ba3
Browse files Browse the repository at this point in the history
  • Loading branch information
ionelmc committed Mar 3, 2014
2 parents 44c3ba3 + c690516 commit a8e1b94
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 30 deletions.
67 changes: 38 additions & 29 deletions celery/backends/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@
"""
from __future__ import absolute_import

import logging
from contextlib import contextmanager
from functools import wraps

from celery import states
from celery.backends.base import BaseBackend
from celery.exceptions import ImproperlyConfigured
from celery.five import range
from celery.utils.timeutils import maybe_timedelta

from celery.backends.base import BaseBackend

from .models import Task, TaskSet
from .models import Task
from .models import TaskSet
from .session import ResultSession

logger = logging.getLogger(__name__)

__all__ = ['DatabaseBackend']


Expand All @@ -33,7 +37,19 @@ def _sqlalchemy_installed():
return sqlalchemy
_sqlalchemy_installed()

from sqlalchemy.exc import DatabaseError, OperationalError
from sqlalchemy.exc import DatabaseError, OperationalError, ResourceClosedError, InvalidRequestError
from sqlalchemy.orm.exc import StaleDataError


@contextmanager
def session_cleanup(session):
try:
yield
except Exception:
session.rollback()
raise
finally:
session.close()


def retry(fun):
Expand All @@ -45,7 +61,12 @@ def _inner(*args, **kwargs):
for retries in range(max_retries):
try:
return fun(*args, **kwargs)
except (DatabaseError, OperationalError):
except (DatabaseError, OperationalError, ResourceClosedError, StaleDataError, InvalidRequestError):
logger.warning(
"Failed operation %s. Retrying %s more times.",
fun.__name__, max_retries - retries - 1,
exc_info=True,
)
if retries + 1 >= max_retries:
raise

Expand Down Expand Up @@ -95,8 +116,9 @@ def _store_result(self, task_id, result, status,
traceback=None, max_retries=3, **kwargs):
"""Store return value and status of an executed task."""
session = self.ResultSession()
try:
task = session.query(Task).filter(Task.task_id == task_id).first()
with session_cleanup(session):
task = list(session.query(Task).filter(Task.task_id == task_id))
task = task and task[0]
if not task:
task = Task(task_id)
session.add(task)
Expand All @@ -106,83 +128,70 @@ def _store_result(self, task_id, result, status,
task.traceback = traceback
session.commit()
return result
finally:
session.close()

@retry
def _get_task_meta_for(self, task_id):
"""Get task metadata for a task by id."""
session = self.ResultSession()
try:
task = session.query(Task).filter(Task.task_id == task_id).first()
if task is None:
with session_cleanup(session):
task = list(session.query(Task).filter(Task.task_id == task_id))
task = task and task[0]
if not task:
task = Task(task_id)
task.status = states.PENDING
task.result = None
return task.to_dict()
finally:
session.close()

@retry
def _save_group(self, group_id, result):
"""Store the result of an executed group."""
session = self.ResultSession()
try:
with session_cleanup(session):
group = TaskSet(group_id, result)
session.add(group)
session.flush()
session.commit()
return result
finally:
session.close()

@retry
def _restore_group(self, group_id):
"""Get metadata for group by id."""
session = self.ResultSession()
try:
with session_cleanup(session):
group = session.query(TaskSet).filter(
TaskSet.taskset_id == group_id).first()
if group:
return group.to_dict()
finally:
session.close()

@retry
def _delete_group(self, group_id):
"""Delete metadata for group by id."""
session = self.ResultSession()
try:
with session_cleanup(session):
session.query(TaskSet).filter(
TaskSet.taskset_id == group_id).delete()
session.flush()
session.commit()
finally:
session.close()

@retry
def _forget(self, task_id):
"""Forget about result."""
session = self.ResultSession()
try:
with session_cleanup(session):
session.query(Task).filter(Task.task_id == task_id).delete()
session.commit()
finally:
session.close()

def cleanup(self):
"""Delete expired metadata."""
session = self.ResultSession()
expires = self.expires
now = self.app.now()
try:
with session_cleanup(session):
session.query(Task).filter(
Task.date_done < (now - expires)).delete()
session.query(TaskSet).filter(
TaskSet.date_done < (now - expires)).delete()
session.commit()
finally:
session.close()

def __reduce__(self, args=(), kwargs={}):
kwargs.update(
Expand Down
4 changes: 3 additions & 1 deletion celery/backends/database/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ class _after_fork(object):

def __call__(self):
self.registered = False # child must reregister
for session in _SESSIONS:
session.close()
_SESSIONS.clear()
for engine in list(_ENGINES.values()):
engine.dispose()
_ENGINES.clear()
_SESSIONS.clear()
after_fork = _after_fork()


Expand Down

0 comments on commit a8e1b94

Please sign in to comment.