Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add function parameters to the task attributes #5363

Open
tlinhart opened this issue Feb 26, 2019 · 6 comments
Open

Add function parameters to the task attributes #5363

tlinhart opened this issue Feb 26, 2019 · 6 comments

Comments

@tlinhart
Copy link

tlinhart commented Feb 26, 2019

I'm building a web application that would let users schedule jobs that are defined as Celery tasks. I attempt to decouple the components as much as possible -- the application doesn't have access to tasks' code and uses send_task() to execute the tasks. The only common thing should be the broker (and possibly the results backend). When adding new job, users have to specify the task and provide argument values to the task. I would like to render a form that would contains list of possible parameters. I'm able to get the list of tasks registered to workers broadcast call:

celery.control.broadcast('registered', reply=True)

What I'm not able to get, though, is the list of parameters the task accepts. I'm aware that I can call the broadcast() method like this:

celery.control.broadcast(
    'registered', reply=True, arguments={
        'taskinfoitems': [...]
    })

to also return task attributes, but there is no task attribute that would hold the parameters of the original function (I'm referring to app/base.py). If there was such an attribute, I would be able to parse it out of the broadcast reply (as it returns list of strings as per the code).

Or maybe there's another way to achieve my goal?

@tlinhart
Copy link
Author

I found a solution so I'm sharing it in case someone deals with the same problem. I defined a custom base class for my tasks that adds __parameters__ attribute:

import inspect
from celery import Celery, Task

celery = Celery('tasks', broker='pyamqp://guest@localhost//')

class BaseTask(Task):
    def __init__(self, *args, **kwargs):
        super(BaseTask, self).__init__(*args, **kwargs)

        signature = inspect.signature(self.__wrapped__)
        self.__parameters__ = [p for p in signature.parameters]

@celery.task(base=BaseTask, name='add_numbers')
def add(x, y):
    return x + y

In my application, I query the workers for registered tasks using broadcast and parse the results:

import re
from celery import Celery

celery = Celery(broker='pyamqp://guest@localhost//')

def get_available_tasks():
    reply = celery.control.broadcast(
        'registered', reply=True,
        arguments={'taskinfoitems': ['__parameters__']})
    reg_tasks = set(t for n in reply for w in n.values() for t in w)

    tasks = []
    for task in reg_tasks:
        name = re.search(r'[\w\.]+', task).group(0)
        parameters = eval(re.search(r'\[__parameters__=(\[.*\])\]', task).group(1))
        tasks.append({'name': name, 'parameters': parameters})

    return tasks

It's probably not the cleanest solution, but works perfectly for my use case.

@auvipy
Copy link
Member

auvipy commented Feb 21, 2021

I found a solution so I'm sharing it in case someone deals with the same problem. I defined a custom base class for my tasks that adds __parameters__ attribute:

import inspect
from celery import Celery, Task

celery = Celery('tasks', broker='pyamqp://guest@localhost//')

class BaseTask(Task):
    def __init__(self, *args, **kwargs):
        super(BaseTask, self).__init__(*args, **kwargs)

        signature = inspect.signature(self.__wrapped__)
        self.__parameters__ = [p for p in signature.parameters]

@celery.task(base=BaseTask, name='add_numbers')
def add(x, y):
    return x + y

In my application, I query the workers for registered tasks using broadcast and parse the results:

import re
from celery import Celery

celery = Celery(broker='pyamqp://guest@localhost//')

def get_available_tasks():
    reply = celery.control.broadcast(
        'registered', reply=True,
        arguments={'taskinfoitems': ['__parameters__']})
    reg_tasks = set(t for n in reply for w in n.values() for t in w)

    tasks = []
    for task in reg_tasks:
        name = re.search(r'[\w\.]+', task).group(0)
        parameters = eval(re.search(r'\[__parameters__=(\[.*\])\]', task).group(1))
        tasks.append({'name': name, 'parameters': parameters})

    return tasks

It's probably not the cleanest solution, but works perfectly for my use case.

If you are able to come with a possible proof of concept improvement request you are more than welcome

@tlinhart
Copy link
Author

I'm afraid I won't have time to look into this anytime soon.

@thedrow thedrow added this to the Future milestone Feb 28, 2021
@woutdenolf
Copy link
Contributor

woutdenolf commented Jan 14, 2022

I was about to make a new issue but afaiu it is related to this issue: "What is the Celery way to decouple client and worker code?"

@tlinhart I was trying to understand your solution but couldn't figure it out (I'm new to Celery).

Worker node

Suppose I have installed a python project on the worker node with this celery application:

# myproject.heavy_app
 
from celery import Celery
from heavy_project import heavy_function
 
app = Celery()
app.config_from_object("celeryconfig", force=True)
 
@app.task
def heavy_task(a, b):
    return heavy_function(a, b)

On the worker node I can the launch a worker that serves this application like this

celery -A myproject.heavy_app worker

Client node

To use the Celery app on the client node I have to import it just to get the task signature! This is not what I want because I don't want my client to depend on worker requirements (heavy_project in this case):

# myjob.py

from myproject.heavy_app import heavy_task
 
future = heavy_task.delay(1, 1)
assert future.get(timeout=10) == 2
python myjob.py

Solutions

Solutions to decouple client and worker code I'm not satisfied with:

  • import heavy_function inside heavy_task
  • define the Celery app twice:
    1. once for the client (not including the implementation, only the task signature)
    2. once for the worker (including the implementation)

@woutdenolf
Copy link
Contributor

Ok sorry, I found the solution in the docs. On the client side I can do this:

# myjob.py

from celery.execute import send_task

future = send_task("myproject.heavy_app.heavy_task", args=(1, 1))
future.get(timeout=10)
python myjob.py

@woutdenolf
Copy link
Contributor

Maybe this should be used in the "getting started" docs? It was not obvious to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants