Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery worker with pickle serializer and elasticsearch backend throws SerializationError when writing to result backend #8853

Open
william-tremblay1 opened this issue Feb 15, 2024 Discussed in #8850 · 0 comments

Comments

@william-tremblay1
Copy link

Discussed in #8850

Originally posted by william-tremblay1 February 14, 2024
It seems that when using Celery with pickle serialization and Elasticsearch as a backend, the ElasticsearchBackend tries to set the field of a JSON body to a pickle-encoded bytes object directly, which leads to a serialization error. This is the throwing code:

def _set_with_state(self, key, value, state):
body = {
'result': value,
'@timestamp': '{}Z'.format(
datetime.now(timezone.utc).isoformat()[:-9]
),
}
try:
self._index(
id=key,
body=body,
)
except elasticsearch.exceptions.ConflictError:
# document already exists, update it
self._update(key, body, state)

Here are my Celery serialization configuration options.

celery_app.conf.event_serializer = 'pickle'
celery_app.conf.task_serializer = 'pickle'
celery_app.conf.result_serializer = 'pickle'
celery_app.conf.accept_content = ['application/json', 'application/x-python-serialize', 'pickle']

Is this a bug or a misconfiguration?

Logs:

2024-02-14 09:51:44 Traceback (most recent call last):
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/site-packages/elasticsearch/serializer.py", line 130, in dumps
2024-02-14 09:51:44     return json.dumps(
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/json/__init__.py", line 234, in dumps
2024-02-14 09:51:44     return cls(
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/json/encoder.py", line 199, in encode
2024-02-14 09:51:44     chunks = self.iterencode(o, _one_shot=True)
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/json/encoder.py", line 257, in iterencode
2024-02-14 09:51:44     return _iterencode(o, 0)
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/site-packages/elasticsearch/serializer.py", line 116, in default
2024-02-14 09:51:44     raise TypeError("Unable to serialize %r (type: %s)" % (data, type(data)))
2024-02-14 09:51:44 TypeError: Unable to serialize b'\x80\x04\x95\xaf\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x06status\x94\x8c\x07SUCCESS\x94\x8c\x06result\x94\x8c\x17test task return string\x94\x8c\ttraceback\x94N\x8c\x08children\x94]\x94\x8c\tdate_done\x94\x8c\x1a2024-02-14T14:51:44.336272\x94\x8c\x07task_id\x94\x8c$2137139e-3864-4103-be17-a33639d8b45b\x94u.' (type: <class 'bytes'>)
2024-02-14 09:51:44 
2024-02-14 09:51:44 During handling of the above exception, another exception occurred:
2024-02-14 09:51:44 
2024-02-14 09:51:44 Traceback (most recent call last):
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/site-packages/celery/app/trace.py", line 544, in trace_task
2024-02-14 09:51:44     task.backend.mark_as_done(
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/site-packages/celery/backends/base.py", line 157, in mark_as_done
2024-02-14 09:51:44     self.store_result(task_id, result, state, request=request)
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/site-packages/celery/backends/base.py", line 526, in store_result
2024-02-14 09:51:44     self._store_result(task_id, result, state, traceback,
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/site-packages/celery/backends/base.py", line 979, in _store_result
2024-02-14 09:51:44     self._set_with_state(self.get_key_for_task(task_id), self.encode(meta), state)
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/site-packages/celery/backends/elasticsearch.py", line 136, in _set_with_state
2024-02-14 09:51:44     self._index(
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/site-packages/celery/backends/elasticsearch.py", line 159, in _index
2024-02-14 09:51:44     return self.server.index(
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/site-packages/elasticsearch/client/utils.py", line 152, in _wrapped
2024-02-14 09:51:44     return func(*args, params=params, headers=headers, **kwargs)
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/site-packages/elasticsearch/client/__init__.py", line 397, in index
2024-02-14 09:51:44     return self.transport.perform_request(
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py", line 373, in perform_request
2024-02-14 09:51:44     method, headers, params, body, ignore, timeout = self._resolve_request_args(
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/site-packages/elasticsearch/transport.py", line 439, in _resolve_request_args
2024-02-14 09:51:44     body = self.serializer.dumps(body)
2024-02-14 09:51:44   File "/usr/local/lib/python3.8/site-packages/elasticsearch/serializer.py", line 134, in dumps
2024-02-14 09:51:44     raise SerializationError(data, e)
2024-02-14 09:51:44 elasticsearch.exceptions.SerializationError: ({'result': b'\x80\x04\x95\xaf\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x06status\x94\x8c\x07SUCCESS\x94\x8c\x06result\x94\x8c\x17test task return string\x94\x8c\ttraceback\x94N\x8c\x08children\x94]\x94\x8c\tdate_done\x94\x8c\x1a2024-02-14T14:51:44.336272\x94\x8c\x07task_id\x94\x8c$2137139e-3864-4103-be17-a33639d8b45b\x94u.', '@timestamp': '2024-02-14T14:51:44.370Z'}, TypeError("Unable to serialize b'\\x80\\x04\\x95\\xaf\\x00\\x00\\x00\\x00\\x00\\x00\\x00}\\x94(\\x8c\\x06status\\x94\\x8c\\x07SUCCESS\\x94\\x8c\\x06result\\x94\\x8c\\x17test task return string\\x94\\x8c\\ttraceback\\x94N\\x8c\\x08children\\x94]\\x94\\x8c\\tdate_done\\x94\\x8c\\x1a2024-02-14T14:51:44.336272\\x94\\x8c\\x07task_id\\x94\\x8c$2137139e-3864-4103-be17-a33639d8b45b\\x94u.' (type: <class 'bytes'>)"))
```</div>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant