Skip to content

Large string logs from trigger are not written to log file intermittently. #66158

@tirkarthi

Description

@tirkarthi

Under which category would you file this issue?

Airflow Core

Apache Airflow version

3.1.8

What happened and how to reproduce it?

We noticed reports where large string logs from trigger are not displayed in UI intermittently. The lines are also not present in the log file on the disk. We have custom Spark and Kubernetes based triggers where we download the container log and pod log to be logged using self.log.info(f"Output : {pod_log}") inside custom triggers. The pod_log sometimes are large as 4-5 MB as a single string passed through socket. As per our understanding from Airflow 3 the logs pass through sockets and then reach the structlog logger in process_log_messages_from_subprocess compared to Airflow 2 where it directly got written to the file from the trigger through file handler. So while processing large strings and when the triggerer is also busy the messages seem to get lost to be not logged in the triggerer. Sometimes the large string is broken and msgspec.json.decode(line) raises exception while decoding causing them to be skipped. This is not consistently reproducible as it depends on the triggerer load and also the large string.

In Airflow 3 is there a way to skip the socket layer and directly write to the trigger.log file for the trigger similar to Airflow 2.

def process_log_messages_from_subprocess(
loggers: tuple[FilteringBoundLogger, ...],
) -> Generator[None, bytes | bytearray, None]:
from structlog.stdlib import NAME_TO_LEVEL
loggers = tuple(
reconfigure_logger(
log,
structlog.processors.CallsiteParameterAdder,
# We need these logger to print _everything_ they are given. The subprocess itself does the level
# filtering.
level_override=logging.NOTSET,
)
for log in loggers
)
while True:
# Generator receive syntax, values are "sent" in by the `make_buffered_socket_reader` and returned to
# the yield.
line = yield
try:
event = msgspec.json.decode(line)
except Exception:
log.exception("Malformed json log line", line=line)
continue
if ts := event.get("timestamp"):
# We use msgspec to decode the timestamp as it does it orders of magnitude quicker than
# datetime.strptime cn
event["timestamp"] = msgspec.json.decode(f'"{ts}"', type=datetime)
if exc := event.pop("exception", None):
# TODO: convert the dict back to a pretty stack trace
event["error_detail"] = exc
if level := NAME_TO_LEVEL.get(event.pop("level")):
msg = event.pop("event", None)
for target in loggers:
target.log(level, msg, **event)

What you think should happen instead?

No response

Operating System

No response

Deployment

Virtualenv installation

Apache Airflow Provider(s)

No response

Versions of Apache Airflow Providers

No response

Official Helm Chart version

Not Applicable

Kubernetes Version

No response

Helm Chart configuration

No response

Docker Image customizations

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions