Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chaining Chords produces enormously big messages causing OOM on workers #5000

Closed
1 of 2 tasks
brabiega opened this issue Aug 22, 2018 · 25 comments
Closed
1 of 2 tasks

Comments

@brabiega
Copy link
Contributor

Checklist

  • I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
  • I have verified that the issue exists against the master branch of Celery.
software -> celery:4.2.1 (windowlicker) kombu:4.2.1 py:2.7.6
            billiard:3.5.0.4 redis:2.10.6
platform -> system:Linux arch:64bit, ELF imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:redis results:db+mysql://celery:**@127.0.0.1/celery

task_serializer: 'json'
broker_heartbeat: 360
result_serializer: 'json'
event_queue_ttl: 60
prefetch_multiplier: 1
broker_url: u'redis://:********@localhost:6379/0'
task_acks_late: True
worker_redirect_stdouts_level: 'INFO'
database_result_engine_options: {
    'isolation_level': 'READ_COMMITTED'}
task_time_limit: 910
task_soft_time_limit: 900
accept_content: ['json']
task_track_started: True
result_backend: u'db+mysql://celery:********@127.0.0.1/celery'
max_tasks_per_child: 100

Steps to reproduce

Tasks file

@app.task()
def noop(*results):
    return

Example workflow

from tasks import noop
                                                    
workflow = (                                        
    noop.s() |                                      
    chord([noop.s() for i in range(10)], noop.s()) |
    chord([noop.s() for i in range(10)], noop.s()) |
    chord([noop.s() for i in range(10)], noop.s()) |
    chord([noop.s() for i in range(10)], noop.s()) |
    chord([noop.s() for i in range(10)], noop.s()) |
    noop.s()                                        
)                                                   
                                                    
workflow.apply_async()                              

Patch kombu to see serialized message size

diff --git a/kombu/utils/json.py b/kombu/utils/json.py
index b2466a4..dee3476 100644
--- a/kombu/utils/json.py
+++ b/kombu/utils/json.py
@@ -65,8 +65,11 @@ _default_encoder = JSONEncoder
 def dumps(s, _dumps=json.dumps, cls=None,
           default_kwargs=_json_extra_kwargs, **kwargs):
     """Serialize object to json string."""
-    return _dumps(s, cls=cls or _default_encoder,
+    res = _dumps(s, cls=cls or _default_encoder,
                   **dict(default_kwargs, **kwargs))
+    msg_size = len(res)
+    print("Serialized data size {:,} bytes".format(msg_size))
+    return res
 
 
 def loads(s, _loads=json.loads, decode_bytes=PY3):

Expected behavior

The workflow contains roughly 57 tasks. They shouldn't be serialized into such big messages

Actual behavior

apply_async causes serialization of the workflow producing 2 messages with following sizes

Serialized data size 235,441,375 bytes
Serialized data size 313,922,589 bytes

As a result celery worker on my machine eats all memory.

@brabiega
Copy link
Contributor Author

brabiega commented Aug 23, 2018

I'm not sure if it is related but doing

workflow = (
    noop.s() |
    chord([noop.s() for i in range(2)], noop.s())
)

workflow.apply_async(serializer='yaml')

Causes

kombu.exceptions.EncodeError: cannot represent an object: %tasks.noop(group([noop(), noop()]))

The data passed to encoder looks like follows

-> env/local/lib/python2.7/site-packages/kombu/serialization.py(222)dumps()
-> payload = encoder(data)
(Pdb) pp data
((),
 {},
 {u'callbacks': None,
  u'chain': [%tasks.noop(group([noop(), noop()]))],
  u'chord': None,
  u'errbacks': None})

(Pdb) type(data[2]['chain'][0])
<class 'celery.canvas.chord'>

Calling the encoder on data

(Pdb) encoder
<function safe_dump at 0x7f17243b7e60>
(Pdb) encoder(data)
*** RepresenterError: cannot represent an object: %tasks.noop(group([noop(), noop()]))

@brabiega
Copy link
Contributor Author

brabiega commented Aug 23, 2018

More digging...
I've simplified the case to find the root cause

workflow = (
    noop.s() |
    chord([noop.s() for i in range(2)], noop.s()) |
    chord([noop.s() for i in range(2)], noop.s()) |
    noop.s()
)

What I've found out is that nested chord seems to contain some options with another nested chain?

[
    [ ], 
    { }, 
    {
      "chord": null, 
      "callbacks": null, 
      "errbacks": null, 
      "chain": [
        {
          "chord_size": null, 
          "task": "celery.chord", 
          "subtask_type": "chord", 
          "kwargs": {
            "body": {}, 
            "header": {
              "chord_size": null, 
              "task": "celery.group", 
              "subtask_type": "group", 
              "kwargs": {
                "tasks": [
                  {
                    "chord_size": null, 
                    "task": "tasks.noop", 
                    "subtask_type": null, 
                    "kwargs": { }, 
                    "args": [ ], 
                    "immutable": false, 
                    "options": {
                      "reply_to": "a7b2268e-e9f8-3d93-b1d2-28c7b934e727", 
                      "chord": {    # <---------------------- HERE
                        "chord_size": null, 
                        "task": "celery.chain", 
                        "subtask_type": "chain", 
                        "kwargs": {
                          "tasks": [
                            {}, 
                            {}
                          ]
                        }, 
                        "args": [ ], 
                        "options": { }, 
                        "immutable": false
                      }, 
                      "task_id": "0b94ed8a-20d3-4e9c-9fd7-b4dea0306c4d"
                    }
                  }, 
                  {}
                ]
              }, 
              "args": [ ], 
              "options": {}, 
              "immutable": false
            }, 
            "kwargs": {}
          }, 
          "args": [ ], 
          "options": {}, 
          "immutable": false
        }
      ]
    }
  ]

@brabiega
Copy link
Contributor Author

brabiega commented Aug 24, 2018

It seems that nested task structures are created somewhere around task.freeze
https://github.com/celery/celery/blob/v4.2.1/celery/canvas.py#L664

Got something.. The huge size of messages comes actually from 'chord' inside task options, produced by following line:
https://github.com/celery/celery/blob/v4.2.1/celery/canvas.py#L278

But I have absolutely NO Idea how to fix it.
Would be nice if someone familiar could take a look.

@brabiega
Copy link
Contributor Author

Does anybody know what's the purpose of putting 'chord' inside task options?

I've commented out the line from https://github.com/celery/celery/blob/v4.2.1/celery/canvas.py#L278

Chords seem to work fine, messages are much smaller

@thedrow
Copy link
Member

thedrow commented Aug 27, 2018

According to git blame the problem comes from #1921.
Not sure how to fix this though.

@brabiega
Copy link
Contributor Author

@thedrow
Interesting...

I've executed

    workflow = group(                                           
        chord(group(noop.s('a1'), noop.s('a2')), noop.s('cb1')),
        chord(group(noop.s('b1'), noop.s('b2')), noop.s('cb2')),
    )                                                           
    res = workflow.apply_async(serializer='json')

It seems that the problem no longer occurs

[2018-08-28 06:54:03,816: INFO/MainProcess] Received task: noop[73a54a23-96a3-43fa-b11b-ad300f9e702b]  
[2018-08-28 06:54:03,818: INFO/ForkPoolWorker-5] (u'Fast trace - ', <class 'celerytest.noop'>)
[2018-08-28 06:54:03,822: INFO/MainProcess] Received task: noop[6cd6d419-9482-473c-95ab-1d2c28a8159d]  
[2018-08-28 06:54:03,824: INFO/ForkPoolWorker-3] (u'Fast trace - ', <class 'celerytest.noop'>)
[2018-08-28 06:54:03,824: INFO/ForkPoolWorker-5] noop[73a54a23-96a3-43fa-b11b-ad300f9e702b]: ('a1',)
[2018-08-28 06:54:03,830: INFO/ForkPoolWorker-5] noop[73a54a23-96a3-43fa-b11b-ad300f9e702b]: Task noop[73a54a23-96a3-43fa-b11b-ad300f9e702b] succeeded in 0.0117540680221s: None
[2018-08-28 06:54:03,837: INFO/ForkPoolWorker-3] noop[6cd6d419-9482-473c-95ab-1d2c28a8159d]: ('a2',)
[2018-08-28 06:54:03,843: INFO/MainProcess] Received task: celery.chord_unlock[ae3d2494-9ea3-40e5-ab71-fffc173515db]  ETA:[2018-08-28 06:54:04.840605+00:00] 
[2018-08-28 06:54:03,848: INFO/ForkPoolWorker-3] noop[6cd6d419-9482-473c-95ab-1d2c28a8159d]: Task noop[6cd6d419-9482-473c-95ab-1d2c28a8159d] succeeded in 0.0230618439964s: None
[2018-08-28 06:54:03,852: INFO/MainProcess] Received task: noop[ffba7d42-73e9-488c-93d0-3d587173ad23]  
[2018-08-28 06:54:03,853: INFO/ForkPoolWorker-9] (u'Fast trace - ', <class 'celerytest.noop'>)
[2018-08-28 06:54:03,854: INFO/MainProcess] Received task: noop[cce9dd73-2451-43bd-984b-3d7b073fd84b]  
[2018-08-28 06:54:03,856: INFO/ForkPoolWorker-7] (u'Fast trace - ', <class 'celerytest.noop'>)
[2018-08-28 06:54:03,859: INFO/MainProcess] Received task: celery.chord_unlock[247efc23-98d5-493d-9b1f-61752862975c]  ETA:[2018-08-28 06:54:04.853322+00:00] 
[2018-08-28 06:54:03,865: INFO/ForkPoolWorker-7] noop[cce9dd73-2451-43bd-984b-3d7b073fd84b]: ('b2',)
[2018-08-28 06:54:03,868: INFO/ForkPoolWorker-9] noop[ffba7d42-73e9-488c-93d0-3d587173ad23]: ('b1',)
[2018-08-28 06:54:03,872: INFO/ForkPoolWorker-7] noop[cce9dd73-2451-43bd-984b-3d7b073fd84b]: Task noop[cce9dd73-2451-43bd-984b-3d7b073fd84b] succeeded in 0.015936093987s: None
[2018-08-28 06:54:03,874: INFO/ForkPoolWorker-9] noop[ffba7d42-73e9-488c-93d0-3d587173ad23]: Task noop[ffba7d42-73e9-488c-93d0-3d587173ad23] succeeded in 0.0205110319657s: None
[2018-08-28 06:54:04,853: INFO/ForkPoolWorker-8] (u'Fast trace - ', <class 'celery.app.builtins.unlock_chord'>)
[2018-08-28 06:54:04,860: INFO/ForkPoolWorker-5] (u'Fast trace - ', <class 'celery.app.builtins.unlock_chord'>)
[2018-08-28 06:54:04,881: INFO/ForkPoolWorker-8] celery.chord_unlock[ae3d2494-9ea3-40e5-ab71-fffc173515db]: Task celery.chord_unlock[ae3d2494-9ea3-40e5-ab71-fffc173515db] succeeded in 0.0264335739776s: None
[2018-08-28 06:54:04,882: INFO/MainProcess] Received task: noop[5f1fc8b8-4589-4a3d-af3b-944b6239ecbb]  
[2018-08-28 06:54:04,883: INFO/ForkPoolWorker-3] (u'Fast trace - ', <class 'celerytest.noop'>)
[2018-08-28 06:54:04,892: INFO/ForkPoolWorker-3] noop[5f1fc8b8-4589-4a3d-af3b-944b6239ecbb]: ([None, None], 'cb1')
[2018-08-28 06:54:04,898: INFO/ForkPoolWorker-3] noop[5f1fc8b8-4589-4a3d-af3b-944b6239ecbb]: Task noop[5f1fc8b8-4589-4a3d-af3b-944b6239ecbb] succeeded in 0.0147304679849s: None
[2018-08-28 06:54:04,908: INFO/ForkPoolWorker-5] celery.chord_unlock[247efc23-98d5-493d-9b1f-61752862975c]: Task celery.chord_unlock[247efc23-98d5-493d-9b1f-61752862975c] succeeded in 0.0484056730056s: None
[2018-08-28 06:54:04,909: INFO/MainProcess] Received task: noop[1d0545d2-0619-47a3-acb3-ac83f3b5e72c]  
[2018-08-28 06:54:04,910: INFO/ForkPoolWorker-9] (u'Fast trace - ', <class 'celerytest.noop'>)
[2018-08-28 06:54:04,918: INFO/ForkPoolWorker-9] noop[1d0545d2-0619-47a3-acb3-ac83f3b5e72c]: ([None, None], 'cb2')
[2018-08-28 06:54:04,922: INFO/ForkPoolWorker-9] noop[1d0545d2-0619-47a3-acb3-ac83f3b5e72c]: Task noop[1d0545d2-0619-47a3-acb3-ac83f3b5e72c] succeeded in 0.0118885629927s: None

@brabiega
Copy link
Contributor Author

Ugh CI fails after my patch but...

        c.apply_async(chord='some_chord_id')
>       assert c.tasks[-1].options['chord'] == 'some_chord_id'
E       KeyError: u'chord'

According to tests the 'chord' inside options should be chord.id?

@thedrow
Copy link
Member

thedrow commented Aug 28, 2018

I believe the string is simply a sentinel for the actual value.

@brabiega
Copy link
Contributor Author

brabiega commented Sep 3, 2018

@thedrow yep, you're right.

No idea how to fix it, I don't have that much understanding of celery internals 😞 and different pieces of code aren't really easy to read.

@brabiega
Copy link
Contributor Author

So I've created a small extension-like project. It unifies the way all complex canvases are processed.
https://github.com/ovh/celery-dyrygent

@auvipy auvipy modified the milestones: 4.5, 4.4.x Dec 16, 2019
@auvipy
Copy link
Member

auvipy commented Jun 15, 2020

So I've created a small extension-like project. It unifies the way all complex canvases are processed.
https://github.com/ovh/celery-dyrygent

how about merging this back in celery?

@brabiega
Copy link
Contributor Author

Well... I'd love to have it in celery. Unfortunately I don't have time at the moment to do it on my own.
Also I think some long time celery dev would be more useful.

I have a feeling that celery is getting too complex.

@auvipy
Copy link
Member

auvipy commented Jun 16, 2020

OK. will try to handle this.

@auvipy auvipy self-assigned this Jun 16, 2020
@thedrow
Copy link
Member

thedrow commented Jun 22, 2020

We're going to refactor Canvases in Celery 5 because of this issue.
I am going to take your design into account but it is possible that the entire protocol will be different.

@brabiega
Copy link
Contributor Author

Are you going to draft some design docs? I'd be happy to participate

@pySilver
Copy link

@thedrow is this still an issue in Celery 5?

@auvipy auvipy removed their assignment Nov 1, 2020
@auvipy auvipy modified the milestones: 4.4.x, 5.1.0 Nov 1, 2020
@yifanw80
Copy link

yifanw80 commented Nov 5, 2020

is there any workaround for this bug? i constructed a complex canvas and the message size exceed the redis limits. And i have no time to wait for 5.1.0.

thx

@pySilver
Copy link

pySilver commented Nov 5, 2020

@yifanw80 it's hardly a bug. its a design problem. what you can do at the moment is either use something else like Kafka or simply redesign your app (which might be pretty doable / or not)

@fcollman
Copy link

fcollman commented Nov 9, 2020

@pySilver How is this not a bug? Is your position that all hierarchal workflows are design problems?

@pySilver
Copy link

pySilver commented Nov 9, 2020

@fcollman kind of, yes. As much as I'd love it to work you'd expect, there are limitations that I guess are very hard to overcome. Complex workflow should keep state somehow, so I'm not surprised it uses a lot of memory. Changing workflow to something that is stateless (or not at least not holding state in runtime) is the answer I guess.

@pySilver
Copy link

@fcollman you might want to look at Airflow which plays nicely with Celery (you'd be able to define complex workflows in Airflow and execute them using Celery workers).

@thedrow
Copy link
Member

thedrow commented Nov 16, 2020

@thedrow is this still an issue in Celery 5?

Yes, unfortunately.

@thedrow
Copy link
Member

thedrow commented Nov 16, 2020

is there any workaround for this bug? i constructed a complex canvas and the message size exceed the redis limits. And i have no time to wait for 5.1.0.

thx

Try to use compression.

@yifanw80
Copy link

@thedrow how to do the compression?

@thedrow
Copy link
Member

thedrow commented Mar 18, 2021

@auvipy auvipy modified the milestones: 5.1.0, 5.2 Mar 28, 2021
@auvipy auvipy removed this from the 5.2 milestone Oct 30, 2021
@celery celery locked and limited conversation to collaborators Oct 30, 2021
@auvipy auvipy closed this as completed Oct 30, 2021

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants