You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
from celery import Celery
from celery.exceptions import SoftTimeLimitExceeded
import psycopg2
import time
app = Celery('tasks', broker='memory://')
@app.task(soft_time_limit=5, time_limit=8)
def check_table():
print "Starting"
c = psycopg2.connect(
host='localhost',
user='db_user',
password='db_password',
dbname='db_name'
).cursor()
try:
c.execute("select * from celery_table")
print "Done"
except SoftTimeLimitExceeded:
print "Caught soft time limit"
time.sleep(10)
check_table.delay()
There are two problems with soft_time_limit: Problem 1: is that under certain scenarios when the worker is frozen, it is unable to properly stop the worker. Problem 2: is that the hard time_limit doesn't take affect if the soft_time_limit has already been exceeded (eg: the table is locked, and the soft_time_limit is exceeded, but nothing happens, and the hard time_limit won't be triggered).
Lock the table: begin; lock table celery_table IN ACCESS EXCLUSIVE MODE;
Start celery: celery -A tasks worker --loglevel=info
Expected behavior
The soft_time_limit should stop the frozen worker. When using time_limit alone, it works fine. Also, the hard time_limit should also stop the worker, but it doesn't.
Actual behavior
The worker remains frozen.
The text was updated successfully, but these errors were encountered:
celery use signal(USR1) to raise exception in worker, and libpq C(psycopg2) doesn't react to Python signals .
According to this article Cancelling PostgreSQL statements from Python, Just call patch function in your celery worker Initialization code will make soft limit exception be raised in a long running postgresql query.
fromselectimportselectfrompsycopg2.extensionsimportPOLL_OK, POLL_READ, POLL_WRITEimportpsycopg2defwait_select_inter(conn):
whileTrue:
try:
state=conn.poll()
ifstate==POLL_OK:
breakelifstate==POLL_READ:
select([conn.fileno()], [], [])
elifstate==POLL_WRITE:
select([], [conn.fileno()], [])
else:
raiseconn.OperationalError(
"bad state from poll: %s"%state)
exceptExceptionase:
deleconn.cancel()
# the loop will be broken by a server errorcontinuedefpatch():
psycopg2.extensions.set_wait_callback(wait_select_inter)
Checklist
celery -A proj report
in the issue.(if you are not able to do this, then at least specify the Celery
version affected).
master
branch of Celery.Steps to reproduce
Add the following to
tasks.py
:There are two problems with soft_time_limit:
Problem 1: is that under certain scenarios when the worker is frozen, it is unable to properly stop the worker.
Problem 2: is that the hard time_limit doesn't take affect if the soft_time_limit has already been exceeded (eg: the table is locked, and the soft_time_limit is exceeded, but nothing happens, and the hard time_limit won't be triggered).
begin; lock table celery_table IN ACCESS EXCLUSIVE MODE;
Expected behavior
The soft_time_limit should stop the frozen worker. When using time_limit alone, it works fine. Also, the hard time_limit should also stop the worker, but it doesn't.
Actual behavior
The worker remains frozen.
The text was updated successfully, but these errors were encountered: