Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

secrets_masker RecursionError with nested TriggerDagRunOperators #16473

Closed
tomaszZdunek opened this issue Jun 16, 2021 · 6 comments · Fixed by #16491
Closed

secrets_masker RecursionError with nested TriggerDagRunOperators #16473

tomaszZdunek opened this issue Jun 16, 2021 · 6 comments · Fixed by #16491
Labels
kind:bug This is a clearly a bug

Comments

@tomaszZdunek
Copy link

tomaszZdunek commented Jun 16, 2021

Apache Airflow version: 2.1.0

Environment: tested on Windows docker-compose envirnoment and on k8s (both with celery executor).

What happened:

[2021-06-16 07:56:32,682] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/trigger_dagrun.py", line 134, in execute
    replace_microseconds=False,
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/api/common/experimental/trigger_dag.py", line 123, in trigger_dag
    replace_microseconds=replace_microseconds,
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/api/common/experimental/trigger_dag.py", line 48, in _trigger_dag
    dag = dag_bag.get_dag(dag_id)  # prefetch dag if it is stored serialized
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 186, in get_dag
    self._add_dag_from_db(dag_id=dag_id, session=session)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 252, in _add_dag_from_db
    dag = row.dag
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/serialized_dag.py", line 175, in dag
    dag = SerializedDAG.from_dict(self.data)  # type: Any
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/serialization/serialized_objects.py", line 792, in from_dict
    return cls.deserialize_dag(serialized_obj['dag'])
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/serialization/serialized_objects.py", line 716, in deserialize_dag
    v = {task["task_id"]: SerializedBaseOperator.deserialize_operator(task) for task in v}
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/serialization/serialized_objects.py", line 716, in <dictcomp>
    v = {task["task_id"]: SerializedBaseOperator.deserialize_operator(task) for task in v}
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/serialization/serialized_objects.py", line 493, in deserialize_operator
    op_predefined_extra_links = cls._deserialize_operator_extra_links(v)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/serialization/serialized_objects.py", line 600, in _deserialize_operator_extra_links
    if _operator_link_class_path in get_operator_extra_links():
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/serialization/serialized_objects.py", line 86, in get_operator_extra_links
    _OPERATOR_EXTRA_LINKS.update(ProvidersManager().extra_links_class_names)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers_manager.py", line 400, in extra_links_class_names
    self.initialize_providers_manager()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers_manager.py", line 129, in initialize_providers_manager
    self._discover_all_providers_from_packages()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers_manager.py", line 151, in _discover_all_providers_from_packages
    log.debug("Loading %s from package %s", entry_point, package_name)
  File "/usr/local/lib/python3.7/logging/__init__.py", line 1366, in debug
    self._log(DEBUG, msg, args, **kwargs)
  File "/usr/local/lib/python3.7/logging/__init__.py", line 1514, in _log
    self.handle(record)
  File "/usr/local/lib/python3.7/logging/__init__.py", line 1524, in handle
    self.callHandlers(record)
  File "/usr/local/lib/python3.7/logging/__init__.py", line 1586, in callHandlers
    hdlr.handle(record)
  File "/usr/local/lib/python3.7/logging/__init__.py", line 890, in handle
    rv = self.filter(record)
  File "/usr/local/lib/python3.7/logging/__init__.py", line 751, in filter
    result = f.filter(record)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/secrets_masker.py", line 157, in filter
    record.__dict__[k] = self.redact(v)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/secrets_masker.py", line 203, in redact
    return tuple(self.redact(subval) for subval in item)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/secrets_masker.py", line 203, in <genexpr>
    return tuple(self.redact(subval) for subval in item)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/secrets_masker.py", line 203, in redact
    return tuple(self.redact(subval) for subval in item)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/secrets_masker.py", line 203, in <genexpr>
    return tuple(self.redact(subval) for subval in item)
  ....
    return tuple(self.redact(subval) for subval in item)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/secrets_masker.py", line 203, in redact
    return tuple(self.redact(subval) for subval in item)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/secrets_masker.py", line 203, in <genexpr>
    return tuple(self.redact(subval) for subval in item)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/secrets_masker.py", line 203, in redact
    return tuple(self.redact(subval) for subval in item)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/secrets_masker.py", line 203, in <genexpr>
    return tuple(self.redact(subval) for subval in item)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/secrets_masker.py", line 201, in redact
    elif isinstance(item, (tuple, set)):
RecursionError: maximum recursion depth exceeded in __instancecheck__ 

What you expected to happen:
I think new masker is not able to handle TriggerDagRunOperator running dag with TriggerDagRunOperator

How to reproduce it:

from datetime import datetime, timedelta

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

def pprint(**kwargs):
    print(1)

with DAG("test",
         catchup=False,
         max_active_runs=1,
         start_date=datetime(2021, 1, 1),
         is_paused_upon_creation=False,
         schedule_interval=None) as dag:

    task_observe_pea_data = PythonOperator(
        task_id="test_task",
        python_callable=pprint,
        provide_context=True
    )

with DAG("test_1",
         catchup=False,
         max_active_runs=1,
         start_date=datetime(2021, 1, 1),
         is_paused_upon_creation=False,
         schedule_interval=None) as dag:

    task_observe_pea_data = TriggerDagRunOperator(
        task_id="test_trigger_1",
        trigger_dag_id="test"
    )

with DAG("test_2",
         catchup=False,
         max_active_runs=1,
         start_date=datetime(2021, 1, 1),
         is_paused_upon_creation=False,
         schedule_interval=None) as dag:

    task_observe_pea_data = TriggerDagRunOperator(
        task_id="test_trigger_2",
        trigger_dag_id="test_1"
    )

Anything else we need to know:

How often does this problem occur? Every time
I have tried hide_sensitive_var_conn_fields=False but error still occurs.

@tomaszZdunek tomaszZdunek added the kind:bug This is a clearly a bug label Jun 16, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 16, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@tomaszZdunek tomaszZdunek changed the title secrets_masker RecursionError with TriggerDagRunOperator secrets_masker RecursionError with nested TriggerDagRunOperators Jun 16, 2021
@uranusjr
Copy link
Member

We should implement some kind of cycle detection in the redaction logic.

@potiuk
Copy link
Member

potiuk commented Jun 16, 2021

Maybe simple max depth of recursion. Somewhat arbitrary but trying to solve it 'properly' might be first - unnecessary and secondly - quite a bit too costly for the logging case. Setting like 2 max depth of recursion should cover the most common cases (dict of lists for example).

@potiuk
Copy link
Member

potiuk commented Jun 16, 2021

(however this particular problem should be fixed by just merged #16424 so it is more of a 'in case' protection.

UPDATE: or maybe not - seems that it is iterating over tuple that causes this particular problem.

@tomaszZdunek
Copy link
Author

any idea for temporary workaround waiting for 2.1.1?

@potiuk
Copy link
Member

potiuk commented Jun 17, 2021

Disable debug logging.

ashb pushed a commit that referenced this issue Jun 22, 2021
kaxil pushed a commit to astronomer/airflow that referenced this issue Jun 22, 2021
Fix apache#16473

(cherry picked from commit 7453d3e)
(cherry picked from commit 6a5e676)
kaxil pushed a commit to astronomer/airflow that referenced this issue Jun 23, 2021
Fix apache#16473

(cherry picked from commit 7453d3e)
(cherry picked from commit 6a5e676)
(cherry picked from commit 5398eb5)
kaxil pushed a commit to astronomer/airflow that referenced this issue Jun 23, 2021
Fix apache#16473

(cherry picked from commit 7453d3e)
(cherry picked from commit 6a5e676)
(cherry picked from commit 5398eb5)
(cherry picked from commit de563e3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants