Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send task_received signal before receive log #8697

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -295,5 +295,6 @@ JoonHwan Kim, 2022/08/01
Kaustav Banerjee, 2022/11/10
Austin Snoeyink 2022/12/06
Jeremy Z. Othieno 2023/07/27
Nikolai Vidov 2023/12/12
Tomer Nosrati, 2022/17/07
Andy Zickler, 2024/01/18
Andy Zickler, 2024/01/18
24 changes: 13 additions & 11 deletions celery/worker/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,7 @@ def task_message_handler(message, body, ack, reject, callbacks,
eventer=eventer, task=task, connection_errors=connection_errors,
body=body, headers=headers, decoded=decoded, utc=utc,
)
if _does_info:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for the sake of being “the first iteration of messing up the code”, try to get all of this code back here, except the info call itself.

# Similar to `app.trace.info()`, we pass the formatting args as the
# `extra` kwarg for custom log handlers
context = {
'id': req.id,
'name': req.name,
'args': req.argsrepr,
'kwargs': req.kwargsrepr,
'eta': req.eta,
}
info(_app_trace.LOG_RECEIVED, context, extra={'data': context})

if (req.expires or req.id in revoked_tasks) and req.revoked():
return

Expand All @@ -175,6 +165,18 @@ def task_message_handler(message, body, ack, reject, callbacks,
expires=req.expires and req.expires.isoformat(),
)

if _does_info:
# Similar to `app.trace.info()`, we pass the formatting args as the
# `extra` kwarg for custom log handlers
context = {
'id': req.id,
'name': req.name,
'args': req.argsrepr,
'kwargs': req.kwargsrepr,
'eta': req.eta,
}
info(_app_trace.LOG_RECEIVED, context, extra={'data': context})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leave just the if … info().

The purpose is mainly to reduce the changed lines. This is just to remove noise so we can, step by step, improve our focus on the changes that caused it.


Niccolum marked this conversation as resolved.
Show resolved Hide resolved
bucket = None
eta = None
if req.eta:
Expand Down
29 changes: 29 additions & 0 deletions t/unit/worker/test_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,35 @@ def test_signal_task_received(self):
request=ANY,
signal=signals.task_received)

def test_log_task_received_meta(self, caplog):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add @pytest.mark.skip (double-check me for the syntax)

class Formatter:
def __init__(self, base_formatter):
self.base_formatter = base_formatter

def format(self, record):
record.foo = "bar"
return self.base_formatter.format(record)

old_formatter = caplog.handler.formatter

def callback(*args, **kwargs):
caplog.handler.setFormatter(Formatter(old_formatter))

caplog.set_level(logging.INFO, logger="celery.worker.strategy")

with self._context(
self.add.s(2, 2)
) as C:
signals.task_received.connect(callback)
C()
for record in caplog.records:
if record.msg == LOG_RECEIVED:
assert record.foo == "bar"
caplog.handler.setFormatter(old_formatter)
break
else:
Niccolum marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("Expected message not in captured log records")

def test_when_events_disabled(self):
with self._context(self.add.s(2, 2), events=False) as C:
C()
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ deps=

3.8,3.9,3.10,3.11,3.12: -r{toxinidir}/requirements/test-ci-default.txt
3.8,3.9,3.10,3.11,3.12: -r{toxinidir}/requirements/docs.txt
pypy3: -r{toxinidir}/requirements/test-ci-default.txt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this

pypy-3,pypy3: -r{toxinidir}/requirements/test-ci-default.txt

integration: -r{toxinidir}/requirements/test-integration.txt
smoke: pytest-xdist>=3.5
Expand Down