From 3e7edda12c43b64b8e9b3815b3dcb6bf5a3a5181 Mon Sep 17 00:00:00 2001 From: parkhojeong Date: Wed, 13 May 2026 15:46:11 +0900 Subject: [PATCH 1/2] Add airflow import smoke check script --- .../run_check_airflow_import_smoke.py | 113 ++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 scripts/in_container/run_check_airflow_import_smoke.py diff --git a/scripts/in_container/run_check_airflow_import_smoke.py b/scripts/in_container/run_check_airflow_import_smoke.py new file mode 100644 index 0000000000000..4f1d8c5b5a871 --- /dev/null +++ b/scripts/in_container/run_check_airflow_import_smoke.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import sys +from dataclasses import dataclass, field +from importlib import import_module +from pkgutil import walk_packages + +from in_container_utils import console + +import airflow + +# Alembic env modules require a live DB context at import time. +MIGRATION_ENV_MODULES = { + "airflow.migrations.env", + "airflow.providers.edge3.migrations.env", + "airflow.providers.fab.migrations.env", +} + +# Heavy optional deps (e.g. apache-beam) absent from default dev env; validated by providers-tests matrix. +OPTIONAL_HEAVY_DEP_MODULES = { + "airflow.providers.google.cloud.hooks.dataflow", + "airflow.providers.google.cloud.operators.dataflow", + "airflow.providers.google.cloud.sensors.dataflow", + "airflow.providers.google.cloud.triggers.dataflow", + # Worker-side Beam pipeline template, not a regular Airflow module. + "airflow.providers.google.cloud.utils.mlengine_prediction_summary", +} + +EXCLUDED_MODULES = MIGRATION_ENV_MODULES | OPTIONAL_HEAVY_DEP_MODULES + + +@dataclass +class ImportCheckResult: + checked_count: int = 0 + failures: list[tuple[str, str, str]] = field(default_factory=list) + skipped_modules: list[str] = field(default_factory=list) + + +def check_imports() -> ImportCheckResult: + result = ImportCheckResult() + + def _on_walk_error(module_name: str) -> None: + # Keeps walk_packages from aborting on non-ImportError; main loop records the failure. + ex = sys.exc_info()[1] + if ex is not None: + console.print(f"[yellow][walk warning][/] {module_name}: {type(ex).__name__}: {ex}") + + modules = [ + module.name + for module in walk_packages( + airflow.__path__, + prefix=f"{airflow.__name__}.", + onerror=_on_walk_error, + ) + ] + console.print(f"Discovered {len(modules)} modules") + + for module_name in modules: + if module_name in EXCLUDED_MODULES: + result.skipped_modules.append(module_name) + continue + result.checked_count += 1 + try: + import_module(module_name) + except (Exception, SystemExit) as ex: + result.failures.append((module_name, type(ex).__name__, str(ex))) + + return result + + +def main() -> int: + result = check_imports() + + console.print( + f"\nSummary: checked={result.checked_count}, " + f"failed={len(result.failures)}, " + f"skipped={len(result.skipped_modules)}" + ) + + if result.skipped_modules: + console.print("\n[yellow]Skipped:[/]") + for name in sorted(result.skipped_modules): + console.print(f" - {name}") + + if result.failures: + console.print("\n[red]Failures:[/]") + for name, exc_type, msg in sorted(result.failures): + console.print(f" - {name} ({exc_type}): {msg}") + return 1 + + console.print("\n[green]All modules imported successfully[/]") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) From fb5eae142b98498e9229409f6ad62e4c6a447f4b Mon Sep 17 00:00:00 2001 From: parkhojeong Date: Wed, 13 May 2026 16:06:56 +0900 Subject: [PATCH 2/2] Fix broken import in TaskHandlerWithCustomFormatter --- .../airflow/utils/log/task_handler_with_custom_formatter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/utils/log/task_handler_with_custom_formatter.py b/airflow-core/src/airflow/utils/log/task_handler_with_custom_formatter.py index 6301a53682dc6..219d280ffaf67 100644 --- a/airflow-core/src/airflow/utils/log/task_handler_with_custom_formatter.py +++ b/airflow-core/src/airflow/utils/log/task_handler_with_custom_formatter.py @@ -22,7 +22,7 @@ from typing import TYPE_CHECKING from airflow.configuration import conf -from airflow.utils.helpers import _render_template_to_string, parse_template_string +from airflow.utils.helpers import parse_template_string, render_template if TYPE_CHECKING: from jinja2 import Template @@ -62,6 +62,6 @@ def set_context(self, ti) -> None: def _render_prefix(self, ti: TaskInstance) -> str: if self.prefix_jinja_template: jinja_context = ti.get_template_context() - return _render_template_to_string(self.prefix_jinja_template, jinja_context) + return render_template(self.prefix_jinja_template, jinja_context, native=False) logger.warning("'task_log_prefix_template' is in invalid format, ignoring the variable value") return ""