Skip to content

Dag.test causes Logging Recurison when run as a Python Task (Celery)  #30029

@JonnyWaffles

Description

@JonnyWaffles

Apache Airflow version

2.5.1

What happened

Executor: Celery
Airflow V 2.5.1
Platform: Python 3.9
OS: ubuntu20.04

Hi team, this one is pretty wild.

In order to integrate with our CICD, we run Pytest as a Task inside a Dag like so

@task(multiple_outputs=True)
def run_pytest(params=None) -> Dict[str, Union[int, str]]:
    with tempfile.NamedTemporaryFile() as f:
        pytest_args = (params or {}).get("pytest_args") or Variable.get(
            "pytest_args"
        ).split()
        args = [*pytest_args, "--ignore", __file__, f"--junitxml={f.name}"]
        code = pytest.main(args)
        xml_text = Path(f.name).read_text()
        try:
            ET.fromstring(xml_text)
        except ET.ParseError as err:
            xml_text = f"{JUNIT_XML_EXCEPTION_STRING} " f"Original exception: {err}"

    return {PYTEST_RET_CODE_XCOM_KEY: code, JUNIT_XML_XCOM_KEY: xml_text}

This has been working great! And I was excited to use the new dag.test command to run our test dags inside the pipeline. Unfortunately, if you dag.test any Dag with dynamically mapped tasks, the logging system triggers a recursive exception.

You can easily recreate this behavior by creating the following test case

from airflow.example_dags.example_dynamic_task_mapping import dag

def test_map_dag():
    dag.test()

And then executing it as a task using the above run_pytest. You'll see a stack trace like

 File "/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 149, in write
2023-03-10T20:18:54.316081052Z     self.flush()
2023-03-10T20:18:54.316083250Z   File "/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 156, in flush
2023-03-10T20:18:54.316085497Z     self._propagate_log(buf)
2023-03-10T20:18:54.316088048Z   File "/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 137, in _propagate_log
2023-03-10T20:18:54.316090370Z     self.logger.log(self.level, remove_escape_codes(message))
2023-03-10T20:18:54.316092740Z   File "/usr/lib/python3.9/logging/__init__.py", line 1512, in log
2023-03-10T20:18:54.316095067Z     self._log(level, msg, args, **kwargs)
2023-03-10T20:18:54.316097285Z   File "/usr/lib/python3.9/logging/__init__.py", line 1589, in _log
2023-03-10T20:18:54.316102712Z     self.handle(record)
2023-03-10T20:18:54.316105214Z   File "/usr/lib/python3.9/logging/__init__.py", line 1599, in handle
2023-03-10T20:18:54.316107414Z     self.callHandlers(record)
2023-03-10T20:18:54.316109533Z   File "/usr/lib/python3.9/logging/__init__.py", line 1661, in callHandlers
2023-03-10T20:18:54.316111769Z     hdlr.handle(record)
2023-03-10T20:18:54.316113875Z   File "/usr/lib/python3.9/logging/__init__.py", line 952, in handle
2023-03-10T20:18:54.316116060Z     self.emit(record)
2023-03-10T20:18:54.316118210Z   File "/usr/lib/python3.9/logging/__init__.py", line 1091, in emit
2023-03-10T20:18:54.316120464Z     self.handleError(record)
2023-03-10T20:18:54.316122649Z   File "/usr/lib/python3.9/logging/__init__.py", line 1004, in handleError
2023-03-10T20:18:54.316124890Z     sys.stderr.write('--- Logging error ---\n')
2023-03-10T20:18:54.316127043Z   File "/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 149, in write
2023-03-10T20:18:54.316129277Z     self.flush()
2023-03-10T20:18:54.316131450Z   File "/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 156, in flush
2023-03-10T20:18:54.316133804Z     self._propagate_log(buf)
2023-03-10T20:18:54.316135984Z   File "/app/.local/lib/python3.9/site-packages/airflow/utils/log/logging_mixin.py", line 137, in _propagate_log

etc etc. You can see the cycle between flush and propagate_log

What you think should happen instead

It would be nice to be able to run a Dag.test command inside a PythonOperator Task so we can easily run pytest as a remotely triggered Dag.

How to reproduce

  1. Create a test case
    from airflow.example_dags.example_dynamic_task_mapping import dag
    
    def test_map_dag():
        dag.test()
  2. Run the test case via Pytest inside a PythonOperator Task (using CeleryExecutor)

Operating System

ubuntu20.04

Versions of Apache Airflow Providers

You don't need any providers to reproduce

Deployment

Docker-Compose

Deployment details

Running scheduler, worker, webserver, and Redis, are inside a local docker compose. Single worker container with default Celery settings.

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions