Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PENDING Tasks not added to database #286

Open
urzbs opened this issue Feb 8, 2022 · 15 comments
Open

PENDING Tasks not added to database #286

urzbs opened this issue Feb 8, 2022 · 15 comments

Comments

@urzbs
Copy link

urzbs commented Feb 8, 2022

Hi,

new tasks that have been pushed with .delay(*args)
do not show up as PENDING in the django-celery-results database (when all workers are offline)

(SUCCESS / FAILURE works just fine)

is this a misconfiguration on my side? I am kinda confused by this.

settings.py

'django_celery_results' is in installed_apps

##### CELERY ######
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_IMPORTS = ("myapp.tasks", )
CELERY_TASK_TRACK_STARTED = True

# celery setting.
CELERY_CACHE_BACKEND = 'default'

# django setting.
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.db.DatabaseCache',
        'LOCATION': 'my_cache_table',
    }
}

celery.py

import os
from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

tasks.py

from celery import shared_task

@shared_task(queue='queue1')
def add(x, y):
    return x + y

push task

from myapp.tasks import add
add.delay(1,2)

start worker:

celery -A myproject worker -l INFO -Q queue1
@urzbs
Copy link
Author

urzbs commented Feb 16, 2022

As a Workaround, i tried writing a wrapper so when tasks are being pushed to the broker; we manually add the TaskResult object with status=PENDING

this is "kinda" a workaround, but it is problematic to get the tasks name from the AsyncResult.

For this I have seen that there is a celery option mentioned:
result_extended = True that should save more information it to the AsyncResult._meta

result_extended

However, the task name is still nowhere to be found in the AsyncResult object

I added result_extended = True into settings.py

##### CELERY ######
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_IMPORTS = ("demoapp.tasks", )
CELERY_TASK_TRACK_STARTED = True
CELERY_RESULT_EXTENDED = True

# celery setting.
CELERY_CACHE_BACKEND = 'default'

# django setting.
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.db.DatabaseCache',
        'LOCATION': 'my_cache_table',
    }
}

and to make sure I also added it in myproject.celery.py

app = Celery('myproject', result_extended=True) 

myapp/views.py

from myapp import tasks
from django.views.generic import TemplateView
from django_celery_results.models import TaskResult

# Create your views here.

def call_task(task_name):
    return getattr(tasks, task_name)

class CreateTaskView(TemplateView):
    template_name = 'myapp/index.html'
    extra_context = {}

    def get(self, request, *args, **kwargs):
        result = call_task('add').delay(1, 2)
        result_meta = result._get_task_meta()
        print(result_meta)
        TaskResult.objects.create(task_id=result.id, task_name=result_meta.get("task_name"), status=result.status)
        self.extra_context['result'] = result
        return super(CreateTaskView, self).get(request)

@jezeniel
Copy link

jezeniel commented Mar 9, 2022

@urzbs curious why is not creating the TaskResult with PENDING status the default behavior when the the delay() call is successfully created?

@urzbs
Copy link
Author

urzbs commented Mar 9, 2022

Thats exactly what we did in my second comment, however getting the tasks name from the AsyncResult and saving it to the TaskResult is a Problem;

Therefore i hoped that result_extended would work properly; then i could do it as described.

Using inspect or building the task name manually together is not suiable and feels extremly dirty when you have a complex task hierarchy.

For us it its important that certain tasks can never be put in queue twice, for example imagine a task that creates a sha512 checksum of 100TB of data. We need an absolute consistency for the tasks name aswell.

@amirhoseinbidar
Copy link

I have same issue

@barik94
Copy link

barik94 commented Sep 28, 2022

I have same issue

@amirhoseinbidar how did you solve this problem?

@amirhoseinbidar
Copy link

amirhoseinbidar commented Sep 28, 2022

I have same issue

@amirhoseinbidar how did you solve this problem?

@barik94
I didn't solve it, instead used of redis cache to log task prograss with task id as key... you can have dedicated model for log if you want observe progress from admin

@gustavo-sdo
Copy link

The filters in the Django Admin listing allow you to select pending tasks.
However, they are not really available for selection while they are in the execution queue.

Is this the expected behavior with django-celery-results?

@rubimpassos
Copy link

I solved it using celery signals.

from celery import states
from celery.signals import before_task_publish
from django_celery_results.models import TaskResult


@before_task_publish.connect
def create_task_result_on_publish(sender=None, headers=None, body=None, **kwargs):
    if "task" not in headers:
        return

    TaskResult.objects.store_result(
        "application/json",
        "utf-8",
        headers["id"],
        None,
        states.PENDING,
        task_name=headers["task"],
        task_args=headers["argsrepr"],
        task_kwargs=headers["kwargsrepr"],
    )

@auvipy
Copy link
Member

auvipy commented Oct 15, 2022

I solved it using celery signals.

from celery import states
from celery.signals import before_task_publish
from django_celery_results.models import TaskResult


@before_task_publish.connect
def create_task_result_on_publish(sender=None, headers=None, body=None, **kwargs):
    if "task" not in headers:
        return

    TaskResult.objects.store_result(
        "application/json",
        "utf-8",
        headers["id"],
        None,
        states.PENDING,
        task_name=headers["task"],
        task_args=headers["argsrepr"],
        task_kwargs=headers["kwargsrepr"],
    )

should we consider adding to this package?

@gustavo-sdo
Copy link

I solved it using celery signals.

A great solution. Thanks for it!
Working perfectly with celery==5.2.7 django-celery-results==2.4.0

@itisnotyourenv
Copy link

I solved it using celery signals.

from celery import states
from celery.signals import before_task_publish
from django_celery_results.models import TaskResult


@before_task_publish.connect
def create_task_result_on_publish(sender=None, headers=None, body=None, **kwargs):
    if "task" not in headers:
        return

    TaskResult.objects.store_result(
        "application/json",
        "utf-8",
        headers["id"],
        None,
        states.PENDING,
        task_name=headers["task"],
        task_args=headers["argsrepr"],
        task_kwargs=headers["kwargsrepr"],
    )

Thank you. It's useful solution.

@ottorei
Copy link

ottorei commented Nov 25, 2022

I solved it using celery signals.

from celery import states
from celery.signals import before_task_publish
from django_celery_results.models import TaskResult


@before_task_publish.connect
def create_task_result_on_publish(sender=None, headers=None, body=None, **kwargs):
    if "task" not in headers:
        return

    TaskResult.objects.store_result(
        "application/json",
        "utf-8",
        headers["id"],
        None,
        states.PENDING,
        task_name=headers["task"],
        task_args=headers["argsrepr"],
        task_kwargs=headers["kwargsrepr"],
    )

should we consider adding to this package?

Yes, this would be a really useful feature.

@MohammedAliD
Copy link

MohammedAliD commented Jun 2, 2023

Hi Guys,

Problem:
There seems to be documentation error according to the given link the variable to start track of the task is CELERY_TASK_TRACK_STARTED = True
https://docs.celeryq.dev/en/v5.2.7/userguide/configuration.html#std-setting-task_track_started

Solution:
But the actual variable should be
CELERY_TRACK_STARTED = True

You can also verify the applied config values from below snippet

from project.celery import app
print(app.conf)

Thanks @sandeep7410

@jjorissen52
Copy link

jjorissen52 commented Jul 27, 2023

The before_task_publish signal does not fire when invoking a task to run once using django-celery-beat admin. In addition, trying to directly import TaskResult resulted in a import error indicating that apps haven't finished loading yet. I had better luck with this:

# celery.py
@signals.task_prerun.connect
def create_task_result_on_task_prerun(task_id, task, args=(), kwargs=None, **other):
    # trying to import models at the top of the celery file causes an import error in my app
    TaskResult = get_task_result_model()
    TaskResult.objects.get_or_create(
        content_type="application/json",
        content_encoding="utf-8",
        task_id=task_id,
        status=states.PENDING,
        task_name=task.name,
        task_args=args,
        task_kwargs=kwargs if kwargs else {},
    )


def get_task_result_model():
    if not get_task_result_model.TaskResultModel:
        from django_celery_results.models import TaskResult

        get_task_result_model.TaskResultModel = TaskResult
    return get_task_result_model.TaskResultModel

@mghantous
Copy link

I solved it using celery signals.

from celery import states
from celery.signals import before_task_publish
from django_celery_results.models import TaskResult


@before_task_publish.connect
def create_task_result_on_publish(sender=None, headers=None, body=None, **kwargs):
    if "task" not in headers:
        return

    TaskResult.objects.store_result(
        "application/json",
        "utf-8",
        headers["id"],
        None,
        states.PENDING,
        task_name=headers["task"],
        task_args=headers["argsrepr"],
        task_kwargs=headers["kwargsrepr"],
    )

should we consider adding to this package?

Thanks for this solution. It got me off on a great start, but I noticed task_args and task_kwargs weren't being encoded. I ended up calling store_result via DatabaseBackend rather than TaskResult to get the encoding logic to happen (among other pieces of missing logic). Below is my tweaked solution (imported in my applications ready() function).

I agree that something like this should be added to the package, or at least be behind a feature toggle so we don't have to all custom implement varied solutions.

from celery import states
from celery.signals import before_task_publish
from django.conf import settings

from my_app_foo import celery_app


db_result_backend = None
registered_task_names = celery_app.tasks.keys()


def create_task_result_on_publish(sender=None, headers=None, **kwargs):  # noqa: ARG001
    """
    This is a workaround for an issue where django-celery-results
    is not adding PENDING tasks to the database.

    # ref: https://github.com/celery/django-celery-results/issues/286
    """

    if "task" not in headers or not db_result_backend or sender not in registered_task_names:
        return

    # essentially transforms a single-level of the headers dictionary
    # into an object with properties
    request = type('request', (object,), headers)

    db_result_backend.store_result(
        headers["id"],
        None,
        states.PENDING,
        traceback=None,
        request=request,
    )


celery_backend = getattr(settings, 'CELERY_RESULT_BACKEND', '')
is_django_celery_installed = 'django_celery_results' in getattr(settings, 'INSTALLED_APPS', [])

if is_django_celery_installed and celery_backend == 'django-db':
    # We are good to import DatabaseBackend
    from django_celery_results.backends.database import DatabaseBackend

    db_result_backend = DatabaseBackend(celery_app)
    # And now register the signal
    before_task_publish.connect(create_task_result_on_publish, dispatch_uid='create_task_result_on_publish')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests