Skip to content

DAG Processor crashing Asset.ref & Asset.ref #47872

@vatsrahul1001

Description

@vatsrahul1001

Apache Airflow version

3.0.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

DAG Processor crashing with triggering_asset_events DAG

LOGS


Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 10, in <module>
    sys.exit(main())
  File "/opt/airflow/airflow/__main__.py", line 58, in main
    args.func(args)
  File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/utils/cli.py", line 111, in wrapper
    return f(*args, **kwargs)
  File "/opt/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/cli/commands/local_commands/dag_processor_command.py", line 54, in dag_processor
    run_command_with_daemon_option(
  File "/opt/airflow/airflow/cli/commands/local_commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/opt/airflow/airflow/cli/commands/local_commands/dag_processor_command.py", line 57, in <lambda>
    callback=lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute),
  File "/opt/airflow/airflow/utils/session.py", line 101, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow/jobs/job.py", line 342, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/opt/airflow/airflow/jobs/job.py", line 371, in execute_job
    ret = execute_callable()
  File "/opt/airflow/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
    self.processor.run()
  File "/opt/airflow/airflow/dag_processing/manager.py", line 252, in run
    return self._run_parsing_loop()
  File "/opt/airflow/airflow/dag_processing/manager.py", line 341, in _run_parsing_loop
    self._collect_results()
  File "/opt/airflow/airflow/utils/session.py", line 101, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow/dag_processing/manager.py", line 778, in _collect_results
    self._file_stats[file] = process_parse_results(
  File "/opt/airflow/airflow/dag_processing/manager.py", line 1099, in process_parse_results
    update_dag_parsing_results_in_db(
  File "/opt/airflow/airflow/dag_processing/collection.py", line 326, in update_dag_parsing_results_in_db
    for attempt in run_with_db_retries(logger=log):
  File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 443, in __iter__
    do = self.iter(retry_state=retry_state)
  File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 376, in iter
    result = action(retry_state)
  File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 398, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/opt/airflow/airflow/dag_processing/collection.py", line 336, in update_dag_parsing_results_in_db
    DAG.bulk_write_to_db(bundle_name, bundle_version, dags, session=session)
  File "/opt/airflow/airflow/utils/session.py", line 98, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/models/dag.py", line 1888, in bulk_write_to_db
    asset_op.add_dag_asset_name_uri_references(session=session)
  File "/opt/airflow/airflow/dag_processing/collection.py", line 685, in add_dag_asset_name_uri_references
    self._add_dag_asset_references(
  File "/opt/airflow/airflow/dag_processing/collection.py", line 680, in _add_dag_asset_references
    session.execute(delete(model).where(tuple_(model.dag_id, getattr(model, attr)).in_(old_refs)))
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1717, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1816, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1810, in _execute_context
    context = constructor(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 1037, in _init_compiled
    expanded_state = compiled._process_parameters_for_postcompile(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/sql/compiler.py", line 1257, in _process_parameters_for_postcompile
    new_processors.update(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/sql/compiler.py", line 1265, in <genexpr>
    and processors[name][j - 1] is not None
sqlalchemy.exc.StatementError: (builtins.IndexError) tuple index out of range
[SQL: DELETE FROM dag_schedule_asset_name_reference WHERE (dag_schedule_asset_name_reference.dag_id, dag_schedule_asset_name_reference.name) IN (__[POSTCOMPILE_param_1])]
[parameters: [{}]]
root@4b44e5c6544e:/opt/airflow#

What you think should happen instead?

No response

How to reproduce

  1. Use below DAG
from __future__ import annotations

from airflow.decorators import dag, task
from airflow.sdk.definitions.asset import Asset
from airflow.sdk.definitions.asset.decorators import asset


@asset(uri="s3://bucket/asset1_producer", schedule=None)
def producer1():
    pass


@asset(uri="s3://bucket/asset2_producer", schedule=None)
def producer2():
    pass


@dag(
    schedule=Asset.ref(name="asset1_producer") & Asset.ref(name="asset2_producer"),
    catchup=False,
    tags=["asset"],
)
def consumer():
    @task()
    def process_nothing(triggering_asset_events):
        for a, events in triggering_asset_events.items():
            print(a.name, events)


consumer()


Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

area:corekind:bugThis is a clearly a bugpriority:criticalShowstopper bug that should be patched immediately

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions