Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Any way to track a celery group? #58

Closed
petoor opened this issue Oct 12, 2020 · 11 comments · Fixed by #115
Closed

Any way to track a celery group? #58

petoor opened this issue Oct 12, 2020 · 11 comments · Fixed by #115
Assignees
Labels
question Further information is requested

Comments

@petoor
Copy link

petoor commented Oct 12, 2020

Hello.

Is there any way to track the progress of a celery group?
That is, spawning a lot of asynchronous tasks, and keeping track of how many subtasks are completed as a function
The celery group has a completed_count() option which does exactly that, but my understanding of celery-progress is not good enough to know if this can be incorporated
https://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.GroupResult

if our group code looks something like this
group(do_something_to_file.si(file) for file in file_list)
then i'm not sure where to put the observer, since every subtask has a unique task id.
We could also assign an id to the group itself, but then again im not sure where to put the observer.

Best regards.
Peter

@czue
Copy link
Owner

czue commented Oct 13, 2020

I've never attempted this. A couple possible options that may work (just some quick thoughts):

  1. Show a separate progress per task using the normal observer
  2. Create some other observer object that can monitor all the child tasks and report progress on everything. Not sure whether you want to just track the number that have completed or the progress of each individual task in the group, but the latter would require passing information from the spawned tasks back up to the main thread somehow (sounds a bit tricky).

@jjrugui
Copy link

jjrugui commented Oct 13, 2020

Hi @czue, I really appreciate your commitment :). I'm working with @petoor, that's why I'm commenting on this issue also. One of the issues with the current workflow for this project (if I understand correctly) is that the view is expected to return the task id so if multiple tasks are spawned from a group, there's no way to return the id's of all tasks. Is that right?

I'm working on #50 which I guess it would solve this by including the id of each subtask within a group in a table that links it to a particular user. Then this can be queried at any time.

I have currently a very simple implementation of #50 implemented that I'm testing at the moment. I'll be able to share it shortly in case it might be useful for this project.

@EJH2
Copy link
Collaborator

EJH2 commented Oct 14, 2020

The package currently does have support for tracking tasks in a group, although it does not have persistence as you've noted. An example view to show off group support would look as follows:

def task_view(request):
    result = group(task.s(i) for i in [1000 for _ in range(5)])()
    return render(request, 'http.html', context={'task_ids': [task for parents in result.children for task in parents.as_list()[::-1]]})

The task_ids context will be used to get all of the tasks currently in the group, and the list comprehension is to simply act as a star expression to remove each task ID from the list that parents.as_list() generates, and to reverse it so any children of that task are properly displayed as well (or else you'd have the first tasks at the bottom of the list!). An accompanying task would be:

@shared_task(bind=True)
def task(self, number):
    progress_recorder = ProgressRecorder(self)
    for i in range(number):
        progress_recorder.set_progress(i+1, number)
    return int(random()*1000)

For the html, the significant part of the page would have:

{% for task_id in task_ids %}
<div class='progress-wrapper-{{ forloop.counter0 }}'>
    <div id='progress-bar-{{ forloop.counter0 }}' class='progress-bar-{{ forloop.counter0 }}' style="background-color: #68a9ef; width: 0%;">&nbsp;</div>
    <div id="progress-bar-message-{{ forloop.counter0 }}">Waiting for progress to start...</div>
    <div id="progress-result-{{ forloop.counter0 }}"></div>
</div>
{% endfor %}
<script>
    document.addEventListener("DOMContentLoaded", function () {
        const task_urls = [
            {% for task_id in task_ids %}"{% url 'celery_progress:task_status' task_id %}",
        {% endfor %}];
        for (var i = 0; i < task_urls.length; i++) {
            CeleryProgressBar.initProgressBar(task_urls[i], {
                progressBarId: "progress-bar-" + i,
                progressBarMessageId: "progress-bar-message-" + i,
                resultElementId: "progress-result-" + i
            });
        }
    });
</script>

The for loop will ensure that each task ID is assigned it's own bar and result element, and when the page finishes loading, the JavaScript will start a bar instance to begin pulling data. This, in total, would produce something to the tune of the image provided below.
image
When the tasks finish, the end result will look like the image below.
image

If you have a callback that is supposed to activate after the task is finished, it would be fairly easy to add an if statement on the last task in the JavaScript for loop and modify the onSuccess to suit your needs. A native solution for this would be an interesting task to behold, but any suggestions are welcome!

@EJH2
Copy link
Collaborator

EJH2 commented Oct 14, 2020

Alternatively, if a single progress bar encompassing all tasks is what your after, that we currently don't support. As @czue noted, it would require passing information from all tasks into a centralized location. From there, this "super observer" would then have to try and figure out how "done" every task is, and I guess work out how to display those results. From what I can see from GroupResult's source, it seems a GroupResult is only returned once the group is started, which also is coincidentally the object that holds the information that would be used to get the task IDs. Alternatively, for a rather crazy idea, it would be interesting if each child task could be spawned with a "subserver" that will shout out it's status, with a "super observer" capturing this and doing generally the same as above. If either of these could be accomplished, it may be pretty useful.

@czue czue added the question Further information is requested label Feb 27, 2021
@safhac
Copy link

safhac commented Mar 8, 2021

hi could the main ProgressRecorder be serialized somehow to the subtasks?
edit:
when i try to run the desrialized set_progress i get
[2021-03-08 18:59:39,920: ERROR/ForkPoolWorker-1] Task cluster123.tasks.task_progress_update[5e1e9d5a-54c6-4a89-b062-fcbbcbc283dd] raised unexpected: AttributeError("'dict' object has no attribute 'update_state'") Traceback (most recent call last): File "/home/arubico/PycharmProjects/djangoProject/venv/lib/python3.9/site-packages/celery/app/trace.py", line 405, in trace_task R = retval = fun(*args, **kwargs) File "/home/arubico/PycharmProjects/djangoProject/venv/lib/python3.9/site-packages/celery/app/trace.py", line 697, in __protected_call__ return self.run(*args, **kwargs) File "/home/arubico/PycharmProjects/djangoProject/cluster123/tasks.py", line 22, in task_progress_update prog_update.set_progress(i, t, m) File "/home/arubico/PycharmProjects/djangoProject/venv/lib/python3.9/site-packages/celery_progress/backend.py", line 50, in set_progress self.task.update_state( AttributeError: 'dict' object has no attribute 'update_state'
which is logical the deserialization isn't a constructor and this is a bad idea to begin with

@WassawRoki
Copy link

Was this ever solved, I run the same problem as @safhac , trying to pass the ProgressRecorder from a main_task down to the sub_tasks it spawns, however this do not seem doable. Only solution ive come up with is for the main task to save the sub_task id´s which are generated when spawning them. And then building a system to iterate over the list of id´s and calculate a progress.

@czue
Copy link
Owner

czue commented Feb 28, 2024

Apologies for the radio-silence. We can likely add serialization to the ProgressRecorder class if that's all it takes? If someone who has run can provide a dummy example of the code that uses groups and/or spawns subtasks I'd be happy to try and get it working and then submit an updated version with support for this. Alternatively, I'd happily accept contributions on this.

@WassawRoki
Copy link

WassawRoki commented Mar 1, 2024

@shared_task(bind=True)
def main_task(self, seconds):
    progress_recorder = ProgressRecorder(self)
    result = 0
    for i in range(seconds):
        time.sleep(1)
        sub_task(seconds, progress_recorder)
        result += i
        progress_recorder.set_progress(i + 1, seconds)
    return result
    
@shared_task(bind=True)
def sub_task(self, seconds, progress_recorder):
    time.sleep(1)
    task_status = progress_recorder.get_status()
    progress_recorder.set_progress(task_status+1, seconds)

This is a quick rough idea. The reason for the need to be able to create subtasks that share a progress_recorder, is that if a group of tasks are to be executes async, it is a must that all iterations of the main task is created as their own tasks. As of right now i have gone with an implementation of django_celery_results.models.GroupResult. However this is only able to provide me with a count of completed vs total tasks in a group of tasks, i have not be able to use this with your library. This also mean that i am not able to track progress of the sub_tasks as i only track completion.

@czue
Copy link
Owner

czue commented Mar 1, 2024

thanks - I'll take a look soon

@czue czue self-assigned this Mar 4, 2024
@czue
Copy link
Owner

czue commented Mar 4, 2024

I was able to get something working. See #115. Would love any feedback/testing as I didn't explore the edge cases.

Note, I explicitly used GroupResult instead of working with this example. @WassawRoki does this work for your use case?

@WassawRoki
Copy link

Interesting, i will look in to this implementation asap :D

@czue czue closed this as completed in #115 Jun 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants