-
Notifications
You must be signed in to change notification settings - Fork 16.5k
Labels
area:DAG-processingarea:corearea:task-sdkkind:metaHigh-level information important to the communityHigh-level information important to the community
Milestone
Description
Body
Right now during test setup, we bootstrap dagbag this way:
https://github.com/apache/airflow/blob/main/devel-common/src/tests_common/test_utils/db.py#L64-L84
This leads to insertion of wrong kind of "DAG" if you try to write a DAG using airflow.sdk.DAG. It complains with things like:
type of dag is <class 'airflow.sdk.definitions.dag.DAG'> test_dag
test setup failed
../../devel-common/src/tests_common/pytest_plugin.py:1541: in _clear_db
initial_db_init()
../../devel-common/src/tests_common/test_utils/db.py:110: in initial_db_init
_bootstrap_dagbag()
../../devel-common/src/tests_common/test_utils/db.py:79: in _bootstrap_dagbag
dagbag.sync_to_db(bundle_name="dags-folder", bundle_version=None, session=session)
../src/airflow/utils/session.py:98: in wrapper
return func(*args, **kwargs)
../src/airflow/models/dagbag.py:649: in sync_to_db
update_dag_parsing_results_in_db(
../src/airflow/dag_processing/collection.py:326: in update_dag_parsing_results_in_db
for attempt in run_with_db_retries(logger=log):
../../.venv/lib/python3.12/site-packages/tenacity/__init__.py:443: in __iter__
do = self.iter(retry_state=retry_state)
../../.venv/lib/python3.12/site-packages/tenacity/__init__.py:376: in iter
result = action(retry_state)
../../.venv/lib/python3.12/site-packages/tenacity/__init__.py:398: in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
/opt/homebrew/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/concurrent/futures/_base.py:449: in result
return self.__get_result()
/opt/homebrew/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/concurrent/futures/_base.py:401: in __get_result
raise self._exception
../src/airflow/dag_processing/collection.py:336: in update_dag_parsing_results_in_db
DAG.bulk_write_to_db(bundle_name, bundle_version, dags, session=session)
../src/airflow/utils/session.py:98: in wrapper
return func(*args, **kwargs)
../src/airflow/models/dag.py:1872: in bulk_write_to_db
dag_op.update_dags(orm_dags, session=session)
../src/airflow/dag_processing/collection.py:475: in update_dags
dm.calculate_dagrun_date_fields(dag, last_automated_data_interval) # type: ignore[arg-type]
../src/airflow/models/dag.py:2409: in calculate_dagrun_date_fields
next_dagrun_info = dag.next_dagrun_info(last_automated_data_interval)
E AttributeError: 'DAG' object has no attribute 'next_dagrun_info'
This is just one of the many issues it can run into. All these things like next_dagrun_info are handled functionally by scheduler when running it, instead we shiould insert a serialised DAG(?) into DB i think
Committer
- I acknowledge that I am a maintainer/committer of the Apache Airflow project.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
area:DAG-processingarea:corearea:task-sdkkind:metaHigh-level information important to the communityHigh-level information important to the community
Type
Projects
Status
Done