Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 107 additions & 58 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ class DagParsingStat(NamedTuple):
all_files_processed: bool


class BundleState(NamedTuple):
"""Persisted refresh state for a DAG bundle."""

last_refreshed: datetime | None
version: str | None


@attrs.define
class DagFileStat:
"""Information about single processing of one file."""
Expand Down Expand Up @@ -591,6 +598,43 @@ def _add_callback_to_queue(self, request: CallbackRequest):
self._add_files_to_queue([file_info], mode="front")
Stats.incr("dag_processing.other_callback_count")

@provide_session
def get_bundle_state(self, bundle_name: str, *, session: Session = NEW_SESSION) -> BundleState | None:
"""
Return the persisted refresh state for a bundle.

Returns ``None`` if the bundle has no database record.
"""
row = session.scalar(
select(DagBundleModel)
.where(DagBundleModel.name == bundle_name)
.options(load_only(DagBundleModel.last_refreshed, DagBundleModel.version))
)
if row is None:
return None
return BundleState(last_refreshed=row.last_refreshed, version=row.version)

@provide_session
def update_bundle_state(
self,
bundle_name: str,
*,
last_refreshed: datetime,
version: str | None,
session: Session = NEW_SESSION,
) -> None:
"""
Persist the post-refresh state for a bundle.

Always updates ``last_refreshed``. Updates ``version`` only when ``version`` is not
``None`` — pass ``None`` to leave the stored version unchanged (e.g. for non-versioned
bundles or when the version did not change after a refresh).
"""
values: dict[str, Any] = {"last_refreshed": last_refreshed}
if version is not None:
values["version"] = version
session.execute(update(DagBundleModel).where(DagBundleModel.name == bundle_name).values(**values))

def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
"""Refresh DAG bundles, if required."""
now = timezone.utcnow()
Expand Down Expand Up @@ -619,69 +663,74 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
self.log.exception("Error initializing bundle %s: %s", bundle.name, e)
continue
# TODO: AIP-66 test to make sure we get a fresh record from the db and it's not cached
with create_session() as session:
bundle_model = session.get(DagBundleModel, bundle.name)
if bundle_model is None:
self.log.warning("Bundle model not found for %s", bundle.name)
continue
elapsed_time_since_refresh = (
now - (bundle_model.last_refreshed or utc_epoch())
).total_seconds()
if bundle.supports_versioning:
# we will also check the version of the bundle to see if another DAG processor has seen
# a new version
pre_refresh_version = (
self._bundle_versions.get(bundle.name) or bundle.get_current_version()
)
current_version_matches_db = pre_refresh_version == bundle_model.version
else:
# With no versioning, it always "matches"
current_version_matches_db = True

previously_seen = bundle.name in self._bundle_versions
if self.should_skip_refresh(
bundle=bundle,
elapsed_time_since_refresh=elapsed_time_since_refresh,
current_version_matches_db=current_version_matches_db,
previously_seen=previously_seen,
):
self.log.info("Not time to refresh bundle %s", bundle.name)
continue

self.log.info("Refreshing bundle %s", bundle.name)

try:
bundle.refresh()
any_refreshed = True
except Exception:
self.log.exception("Error refreshing bundle %s", bundle.name)
continue

bundle_model.last_refreshed = now
self._force_refresh_bundles.discard(bundle.name)
try:
bundle_state = self.get_bundle_state(bundle.name)
except Exception:
self.log.exception("Error fetching state for bundle %s", bundle.name)
continue
if bundle_state is None:
self.log.warning("Bundle model not found for %s", bundle.name)
continue
elapsed_time_since_refresh = (now - (bundle_state.last_refreshed or utc_epoch())).total_seconds()
if bundle.supports_versioning:
# we will also check the version of the bundle to see if another DAG processor has seen
# a new version
pre_refresh_version = self._bundle_versions.get(bundle.name) or bundle.get_current_version()
current_version_matches_db = pre_refresh_version == bundle_state.version
else:
# With no versioning, it always "matches"
current_version_matches_db = True

previously_seen = bundle.name in self._bundle_versions
if self.should_skip_refresh(
bundle=bundle,
elapsed_time_since_refresh=elapsed_time_since_refresh,
current_version_matches_db=current_version_matches_db,
previously_seen=previously_seen,
):
self.log.info("Not time to refresh bundle %s", bundle.name)
continue

if bundle.supports_versioning:
# We can short-circuit the rest of this if (1) bundle was seen before by
# this dag processor and (2) the version of the bundle did not change
# after refreshing it
version_after_refresh = bundle.get_current_version()
if previously_seen and pre_refresh_version == version_after_refresh:
self.log.debug(
"Bundle %s version not changed after refresh: %s",
bundle.name,
version_after_refresh,
)
continue
self.log.info("Refreshing bundle %s", bundle.name)

bundle_model.version = version_after_refresh
try:
bundle.refresh()
any_refreshed = True
except Exception:
self.log.exception("Error refreshing bundle %s", bundle.name)
continue

self.log.info(
"Version changed for %s, new version: %s", bundle.name, version_after_refresh
self._force_refresh_bundles.discard(bundle.name)

if bundle.supports_versioning:
# We can short-circuit the rest of this if (1) bundle was seen before by
# this dag processor and (2) the version of the bundle did not change
# after refreshing it
version_after_refresh = bundle.get_current_version()
if previously_seen and pre_refresh_version == version_after_refresh:
self.log.debug(
"Bundle %s version not changed after refresh: %s",
bundle.name,
version_after_refresh,
)
else:
version_after_refresh = None
try:
self.update_bundle_state(bundle.name, last_refreshed=now, version=None)
except Exception:
self.log.exception("Error persisting state for bundle %s", bundle.name)
continue

self._bundle_versions[bundle.name] = version_after_refresh
self.log.info("Version changed for %s, new version: %s", bundle.name, version_after_refresh)
else:
version_after_refresh = None

# Persistence failure must not skip file scanning (bundle is already refreshed locally).
# _bundle_versions is only advanced on success to stay consistent with the DB.
try:
self.update_bundle_state(bundle.name, last_refreshed=now, version=version_after_refresh)
except Exception:
self.log.exception("Error persisting state for bundle %s", bundle.name)
else:
self._bundle_versions[bundle.name] = version_after_refresh

found_files = {
DagFileInfo(rel_path=p, bundle_name=bundle.name, bundle_path=bundle.path)
Expand Down
Loading
Loading