Permalink
Browse files

Retry sqlalchemy backend operations on DatabaseError/OperationalError…

…. Closes #634
  • Loading branch information...
1 parent a412d3e commit 74410c98494492272b86fed8dab8be94f3168767 @ask committed May 14, 2012
Showing with 29 additions and 2 deletions.
  1. +29 −2 celery/backends/database/__init__.py
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
+from functools import wraps
+
from celery import states
from celery.exceptions import ImproperlyConfigured
from celery.utils.timeutils import maybe_timedelta
@@ -21,6 +23,24 @@ def _sqlalchemy_installed():
return sqlalchemy
_sqlalchemy_installed()
+from sqlalchemy.exc import DatabaseError, OperationalError
+
+
+def retry(fun):
+
+ @wraps(fun)
+ def _inner(*args, **kwargs):
+ max_retries = kwargs.pop("max_retries", 3)
+
+ for retries in xrange(max_retries + 1):
+ try:
+ return fun(*args, **kwargs)
+ except (DatabaseError, OperationalError):
+ if retries + 1 > max_retries:
+ raise
+
+ return _inner
+
class DatabaseBackend(BaseDictBackend):
"""The database result backend."""
@@ -49,7 +69,9 @@ def ResultSession(self):
short_lived_sessions=self.short_lived_sessions,
**self.engine_options)
- def _store_result(self, task_id, result, status, traceback=None):
+ @retry
+ def _store_result(self, task_id, result, status, traceback=None,
+ max_retries=3):
"""Store return value and status of an executed task."""
session = self.ResultSession()
try:
@@ -62,10 +84,11 @@ def _store_result(self, task_id, result, status, traceback=None):
task.status = status
task.traceback = traceback
session.commit()
+ return result
finally:
session.close()
- return result
+ @retry
def _get_task_meta_for(self, task_id):
"""Get task metadata for a task by id."""
session = self.ResultSession()
@@ -79,6 +102,7 @@ def _get_task_meta_for(self, task_id):
finally:
session.close()
+ @retry
def _save_taskset(self, taskset_id, result):
"""Store the result of an executed taskset."""
session = self.ResultSession()
@@ -91,6 +115,7 @@ def _save_taskset(self, taskset_id, result):
finally:
session.close()
+ @retry
def _restore_taskset(self, taskset_id):
"""Get metadata for taskset by id."""
session = self.ResultSession()
@@ -102,6 +127,7 @@ def _restore_taskset(self, taskset_id):
finally:
session.close()
+ @retry
def _delete_taskset(self, taskset_id):
"""Delete metadata for taskset by id."""
session = self.ResultSession()
@@ -113,6 +139,7 @@ def _delete_taskset(self, taskset_id):
finally:
session.close()
+ @retry
def _forget(self, task_id):
"""Forget about result."""
session = self.ResultSession()

0 comments on commit 74410c9

Please sign in to comment.