Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions airflow/dag_processing/bundles/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class BaseDagBundle(ABC):
multiple versions of the same bundle in use at the same time. The DAG processor will always use the latest version.

:param name: String identifier for the DAG bundle
:param refresh_interval: How often the bundle should be refreshed from the source (in seconds)
:param refresh_interval: How often the bundle should be refreshed from the source in seconds
(Optional - defaults to [dag_processor] refresh_interval)
:param version: Version of the DAG bundle (Optional)
"""

Expand All @@ -62,8 +63,10 @@ def initialize(self) -> None:
"""
Initialize the bundle.

This method is called by the DAG processor before the bundle is used,
and allows for deferring expensive operations until that point in time.
This method is called by the DAG processor and worker before the bundle is used,
and allows for deferring expensive operations until that point in time. This will
only be called when Airflow needs the bundle files on disk - some uses only need
to call the `view_url` method, which can run without initializing the bundle.
"""
self.is_initialized = True

Expand All @@ -72,7 +75,7 @@ def _dag_bundle_root_storage_path(self) -> Path:
"""
Where bundles can store DAGs on disk (if local disk is required).

This is the root path, shared by various bundles. Each bundle should have its own subdirectory.
This is the root bundle storage path, common to all bundles. Each bundle should use a subdirectory of this path.
"""
if configured_location := conf.get("dag_processor", "dag_bundle_storage_path", fallback=None):
return Path(configured_location)
Expand All @@ -84,7 +87,8 @@ def path(self) -> Path:
"""
Path for this bundle.

Airflow will use this path to load/execute the DAGs from the bundle.
Airflow will use this path to find/load/execute the DAGs from the bundle.
After `initialize` has been called, all dag files in the bundle should be accessible from this path.
"""

@abstractmethod
Expand All @@ -101,7 +105,9 @@ def refresh(self) -> None:

def view_url(self, version: str | None = None) -> str | None:
"""
URL to view the bundle.
URL to view the bundle on an external website. This is shown to users in the Airflow UI, allowing them to navigate to this url for more details about that version of the bundle.

This needs to function without `initialize` being called.

:param version: Version to view
:return: URL to view the bundle
Expand Down
1 change: 1 addition & 0 deletions airflow/dag_processing/bundles/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def parse_config(self) -> None:

@provide_session
def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None:
self.log.debug("Syncing DAG bundles to the database")
Comment thread
jedcunningham marked this conversation as resolved.
stored = {b.name: b for b in session.query(DagBundleModel).all()}
for name in self._bundle_config.keys():
if bundle := stored.pop(name, None):
Expand Down
10 changes: 6 additions & 4 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import airflow.models
from airflow.configuration import conf
from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.dag_processing.collection import update_dag_parsing_results_in_db
from airflow.dag_processing.processor import DagFileParsingResult, DagFileProcessorProcess
from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -232,11 +233,7 @@ def run(self):
# "Checking for new files in %s every %s seconds", self._dag_directory, self.dag_dir_list_interval
# )

from airflow.dag_processing.bundles.manager import DagBundlesManager

DagBundlesManager().sync_bundles_to_db()

self.log.info("Getting all DAG bundles")
self._dag_bundles = list(DagBundlesManager().get_all_dag_bundles())
self._symlink_latest_log_directory()

Expand Down Expand Up @@ -494,6 +491,11 @@ def _refresh_dag_bundles(self):
for p in self._find_files_in_bundle(bundle)
]

# Now that we have the files present in the latest bundle,
# we need to update file_paths to include any new files
# and remove any files that are no longer in the bundle.
# We do this by removing all existing files that are in this bundle
# and then adding all the current files back in.
new_files = [f for f in self._files if f.bundle_name != bundle.name]
new_files.extend(found_files)
self.set_files(new_files)
Expand Down
8 changes: 2 additions & 6 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,9 +832,7 @@ def test_bundles_are_refreshed(self):

with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}):
DagBundlesManager().sync_bundles_to_db()
with mock.patch(
"airflow.dag_processing.bundles.manager.DagBundlesManager"
) as mock_bundle_manager:
with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager:
mock_bundle_manager.return_value._bundle_config = {"bundleone": None, "bundletwo": None}
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone, bundletwo]

Expand Down Expand Up @@ -886,9 +884,7 @@ def test_bundles_versions_are_stored(self, session):

with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}):
DagBundlesManager().sync_bundles_to_db()
with mock.patch(
"airflow.dag_processing.bundles.manager.DagBundlesManager"
) as mock_bundle_manager:
with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager:
mock_bundle_manager.return_value._bundle_config = {"mybundle": None}
mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle]
manager = DagFileProcessorManager(max_runs=1)
Expand Down