From a8fd3e02e014a5f0a4aad656650ac8511f52f82e Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Mon, 27 Jan 2025 21:29:43 -0700 Subject: [PATCH 1/8] Minor cleanup and better docstrings/comments for DAG parsing There is no real functional changes in this PR - just comments, doc strings, moving an import, and some logging cleanup. --- airflow/dag_processing/bundles/base.py | 17 ++++++++++++----- airflow/dag_processing/bundles/manager.py | 1 + airflow/dag_processing/manager.py | 10 ++++++---- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/airflow/dag_processing/bundles/base.py b/airflow/dag_processing/bundles/base.py index 04acc324f4ff7..c3da00cbf4494 100644 --- a/airflow/dag_processing/bundles/base.py +++ b/airflow/dag_processing/bundles/base.py @@ -40,7 +40,8 @@ class BaseDagBundle(ABC): multiple versions of the same bundle in use at the same time. The DAG processor will always use the latest version. :param name: String identifier for the DAG bundle - :param refresh_interval: How often the bundle should be refreshed from the source (in seconds) + :param refresh_interval: How often the bundle should be refreshed from the source in seconds + (Optional - defaults to [dag_processor] refresh_interval) :param version: Version of the DAG bundle (Optional) """ @@ -62,8 +63,10 @@ def initialize(self) -> None: """ Initialize the bundle. - This method is called by the DAG processor before the bundle is used, - and allows for deferring expensive operations until that point in time. + This method is called by the DAG processor and worker before the bundle is used, + and allows for deferring expensive operations until that point in time. This will + only be called when Airflow needs the bundle files on disk - some uses only need + to call the `view_url` method, which can run without initializing the bundle. """ self.is_initialized = True @@ -72,7 +75,7 @@ def _dag_bundle_root_storage_path(self) -> Path: """ Where bundles can store DAGs on disk (if local disk is required). - This is the root path, shared by various bundles. Each bundle should have its own subdirectory. + This is the root path, shared by various bundles. Each bundle should use its own subdirectory. """ if configured_location := conf.get("dag_processor", "dag_bundle_storage_path", fallback=None): return Path(configured_location) @@ -84,7 +87,9 @@ def path(self) -> Path: """ Path for this bundle. - Airflow will use this path to load/execute the DAGs from the bundle. + Airflow will use this path to find/load/execute the DAGs from the bundle. + You can change this path during refreshes, but it cannot change between refreshes. + It is also should be retrievable once the bundle has been initialized. """ @abstractmethod @@ -103,6 +108,8 @@ def view_url(self, version: str | None = None) -> str | None: """ URL to view the bundle. + This needs to function without `initialize` being called. + :param version: Version to view :return: URL to view the bundle """ diff --git a/airflow/dag_processing/bundles/manager.py b/airflow/dag_processing/bundles/manager.py index 574f287f5b361..09767281438e8 100644 --- a/airflow/dag_processing/bundles/manager.py +++ b/airflow/dag_processing/bundles/manager.py @@ -99,6 +99,7 @@ def parse_config(self) -> None: @provide_session def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None: + self.log.debug("Syncing DAG bundles to the database") stored = {b.name: b for b in session.query(DagBundleModel).all()} for name in self._bundle_config.keys(): if bundle := stored.pop(name, None): diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 6eb432cbeb2fe..04fb15f0f9933 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -47,6 +47,7 @@ import airflow.models from airflow.configuration import conf +from airflow.dag_processing.bundles.manager import DagBundlesManager from airflow.dag_processing.collection import update_dag_parsing_results_in_db from airflow.dag_processing.processor import DagFileParsingResult, DagFileProcessorProcess from airflow.exceptions import AirflowException @@ -232,11 +233,7 @@ def run(self): # "Checking for new files in %s every %s seconds", self._dag_directory, self.dag_dir_list_interval # ) - from airflow.dag_processing.bundles.manager import DagBundlesManager - DagBundlesManager().sync_bundles_to_db() - - self.log.info("Getting all DAG bundles") self._dag_bundles = list(DagBundlesManager().get_all_dag_bundles()) self._symlink_latest_log_directory() @@ -494,7 +491,12 @@ def _refresh_dag_bundles(self): for p in self._find_files_in_bundle(bundle) ] + # Now that we have the files present in the latest bundle, + # we need to update the file paths to include any new ones + # and remove any that are no longer in the bundle. + # We do this by removing all existing files that are in this bundle new_files = [f for f in self._files if f.bundle_name != bundle.name] + # And then add all the current files in new_files.extend(found_files) self.set_files(new_files) From e93da5c4250be6c269526bed995009c7cad9f084 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Wed, 29 Jan 2025 12:28:47 -0700 Subject: [PATCH 2/8] Update airflow/dag_processing/manager.py --- airflow/dag_processing/manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 04fb15f0f9933..9c7af83673abe 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -492,11 +492,11 @@ def _refresh_dag_bundles(self): ] # Now that we have the files present in the latest bundle, - # we need to update the file paths to include any new ones - # and remove any that are no longer in the bundle. + # we need to update file_paths to include any new files + # and remove any files that are no longer in the bundle. # We do this by removing all existing files that are in this bundle + # and then adding all the current files back in. new_files = [f for f in self._files if f.bundle_name != bundle.name] - # And then add all the current files in new_files.extend(found_files) self.set_files(new_files) From 146fc817267f9fee641d3e84b8addf1ac99dec05 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Thu, 30 Jan 2025 14:49:07 -0700 Subject: [PATCH 3/8] Update airflow/dag_processing/bundles/base.py --- airflow/dag_processing/bundles/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dag_processing/bundles/base.py b/airflow/dag_processing/bundles/base.py index c3da00cbf4494..2583620e27b84 100644 --- a/airflow/dag_processing/bundles/base.py +++ b/airflow/dag_processing/bundles/base.py @@ -88,7 +88,7 @@ def path(self) -> Path: Path for this bundle. Airflow will use this path to find/load/execute the DAGs from the bundle. - You can change this path during refreshes, but it cannot change between refreshes. + The path can change when `refresh` is called, but it cannot change between refreshes. It is also should be retrievable once the bundle has been initialized. """ From a7471b119448e6c470ac94b72250b397fde135da Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Thu, 30 Jan 2025 14:49:41 -0700 Subject: [PATCH 4/8] Apply suggestions from code review Co-authored-by: Daniel Standish <15932138+dstandish@users.noreply.github.com> --- airflow/dag_processing/bundles/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/dag_processing/bundles/base.py b/airflow/dag_processing/bundles/base.py index 2583620e27b84..ca843a49e7883 100644 --- a/airflow/dag_processing/bundles/base.py +++ b/airflow/dag_processing/bundles/base.py @@ -75,7 +75,7 @@ def _dag_bundle_root_storage_path(self) -> Path: """ Where bundles can store DAGs on disk (if local disk is required). - This is the root path, shared by various bundles. Each bundle should use its own subdirectory. + This is the root bundle storage path, common to all bundles. Each bundle should use a subdirectory of this path. """ if configured_location := conf.get("dag_processor", "dag_bundle_storage_path", fallback=None): return Path(configured_location) @@ -89,7 +89,7 @@ def path(self) -> Path: Airflow will use this path to find/load/execute the DAGs from the bundle. The path can change when `refresh` is called, but it cannot change between refreshes. - It is also should be retrievable once the bundle has been initialized. + After `initialize` has been called, all dag files in the bundle should be accessible from this path. """ @abstractmethod From e8c6c46961362adaa59452e6b8a92cd7bfa35808 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Thu, 30 Jan 2025 14:58:31 -0700 Subject: [PATCH 5/8] Update airflow/dag_processing/bundles/base.py --- airflow/dag_processing/bundles/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dag_processing/bundles/base.py b/airflow/dag_processing/bundles/base.py index ca843a49e7883..ab43691748367 100644 --- a/airflow/dag_processing/bundles/base.py +++ b/airflow/dag_processing/bundles/base.py @@ -106,7 +106,7 @@ def refresh(self) -> None: def view_url(self, version: str | None = None) -> str | None: """ - URL to view the bundle. + URL to view the bundle on an external website. This is shown to users in the Airflow UI, allowing them to navigate to this url for more details about that version of the bundle. This needs to function without `initialize` being called. From 5986c4ea3b00f49e79388f49ccbb8fa22893f401 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Thu, 30 Jan 2025 15:08:40 -0700 Subject: [PATCH 6/8] Update airflow/dag_processing/bundles/base.py --- airflow/dag_processing/bundles/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/dag_processing/bundles/base.py b/airflow/dag_processing/bundles/base.py index ab43691748367..81bf38d373677 100644 --- a/airflow/dag_processing/bundles/base.py +++ b/airflow/dag_processing/bundles/base.py @@ -88,7 +88,6 @@ def path(self) -> Path: Path for this bundle. Airflow will use this path to find/load/execute the DAGs from the bundle. - The path can change when `refresh` is called, but it cannot change between refreshes. After `initialize` has been called, all dag files in the bundle should be accessible from this path. """ From a902152282955ccc1cf0984f5d5e2216dafeaa62 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Thu, 30 Jan 2025 19:14:19 -0700 Subject: [PATCH 7/8] Fix mock --- tests/dag_processing/test_manager.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index c90921ac3c1be..a0a9e05a3dca3 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -832,9 +832,7 @@ def test_bundles_are_refreshed(self): with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): DagBundlesManager().sync_bundles_to_db() - with mock.patch( - "airflow.dag_processing.bundles.manager.DagBundlesManager" - ) as mock_bundle_manager: + with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager: mock_bundle_manager.return_value._bundle_config = {"bundleone": None, "bundletwo": None} mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone, bundletwo] From bd8e7e2d51ea06e22fce4462817972de243d9314 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Thu, 30 Jan 2025 22:13:09 -0700 Subject: [PATCH 8/8] Fix another mock - missed one :) --- tests/dag_processing/test_manager.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index a0a9e05a3dca3..d2dacd3587087 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -884,9 +884,7 @@ def test_bundles_versions_are_stored(self, session): with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): DagBundlesManager().sync_bundles_to_db() - with mock.patch( - "airflow.dag_processing.bundles.manager.DagBundlesManager" - ) as mock_bundle_manager: + with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager: mock_bundle_manager.return_value._bundle_config = {"mybundle": None} mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle] manager = DagFileProcessorManager(max_runs=1)