Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reload configuration for standalone dag file processor #35725

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow/cli/commands/dag_processor_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.configuration import conf
from airflow.dag_processing.manager import DagFileProcessorManager
from airflow.dag_processing.manager import DagFileProcessorManager, reload_configuration_for_dag_processing
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
from airflow.jobs.job import Job, run_job
from airflow.utils import cli as cli_utils
Expand All @@ -36,7 +36,6 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
"""Create DagFileProcessorProcess instance."""
processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout")
processor_timeout = timedelta(seconds=processor_timeout_seconds)

return DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
Expand All @@ -62,6 +61,7 @@ def dag_processor(args):

job_runner = _create_dag_processor_job_runner(args)

reload_configuration_for_dag_processing()
run_command_with_daemon_option(
args=args,
process_name="dag-processor",
Expand Down
46 changes: 24 additions & 22 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,28 +227,7 @@ def _run_processor_manager(
set_new_process_group()

setproctitle("airflow scheduler -- DagFileProcessorManager")
# Reload configurations and settings to avoid collision with parent process.
# Because this process may need custom configurations that cannot be shared,
# e.g. RotatingFileHandler. And it can cause connection corruption if we
# do not recreate the SQLA connection pool.
os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"] = "True"
os.environ["AIRFLOW__LOGGING__COLORED_CONSOLE_LOG"] = "False"
# Replicating the behavior of how logging module was loaded
# in logging_config.py

# TODO: This reloading should be removed when we fix our logging behaviour
# In case of "spawn" method of starting processes for multiprocessing, reinitializing of the
# SQLAlchemy engine causes extremely unexpected behaviour of messing with objects already loaded
# in a parent process (likely via resources shared in memory by the ORM libraries).
# This caused flaky tests in our CI for many months and has been discovered while
# iterating on https://github.com/apache/airflow/pull/19860
# The issue that describes the problem and possible remediation is
# at https://github.com/apache/airflow/issues/19934

importlib.reload(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit(".", 1)[0])) # type: ignore
importlib.reload(airflow.settings)
airflow.settings.initialize()
del os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"]
reload_configuration_for_dag_processing()
processor_manager = DagFileProcessorManager(
dag_directory=dag_directory,
max_runs=max_runs,
Expand Down Expand Up @@ -1292,3 +1271,26 @@ def emit_metrics(self):
@property
def file_paths(self):
return self._file_paths


def reload_configuration_for_dag_processing():
# Reload configurations and settings to avoid collision with parent process.
# Because this process may need custom configurations that cannot be shared,
# e.g. RotatingFileHandler. And it can cause connection corruption if we
# do not recreate the SQLA connection pool.
os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"] = "True"
os.environ["AIRFLOW__LOGGING__COLORED_CONSOLE_LOG"] = "False"
# Replicating the behavior of how logging module was loaded
# in logging_config.py
# TODO: This reloading should be removed when we fix our logging behaviour
# In case of "spawn" method of starting processes for multiprocessing, reinitializing of the
# SQLAlchemy engine causes extremely unexpected behaviour of messing with objects already loaded
# in a parent process (likely via resources shared in memory by the ORM libraries).
# This caused flaky tests in our CI for many months and has been discovered while
# iterating on https://github.com/apache/airflow/pull/19860
# The issue that describes the problem and possible remediation is
# at https://github.com/apache/airflow/issues/19934
importlib.reload(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit(".", 1)[0])) # type: ignore
importlib.reload(airflow.settings)
airflow.settings.initialize()
del os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"]