From 05261ed5322eddaa8e3c311f828537786eae8f2d Mon Sep 17 00:00:00 2001 From: Jakub Dardzinski Date: Wed, 24 Apr 2024 16:11:58 +0200 Subject: [PATCH] Use `ProcessPoolExecutor` over `ThreadPoolExecutor`. Signed-off-by: Jakub Dardzinski Make `max_workers` configurable. Signed-off-by: Jakub Dardzinski --- airflow/providers/openlineage/conf.py | 7 +++++++ .../providers/openlineage/plugins/listener.py | 5 +++-- .../openlineage/plugins/test_listener.py | 21 +++++++++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/airflow/providers/openlineage/conf.py b/airflow/providers/openlineage/conf.py index 4ca42eedfdfc13..5d215a6754918e 100644 --- a/airflow/providers/openlineage/conf.py +++ b/airflow/providers/openlineage/conf.py @@ -101,3 +101,10 @@ def _is_true(val): # Check if both 'transport' and 'config_path' are not present and also # if legacy 'OPENLINEAGE_URL' environment variables is not set return transport() == {} and config_path(True) == "" and os.getenv("OPENLINEAGE_URL", "") == "" + + +@cache +def dag_state_change_process_pool_size() -> int: + """[openlineage] dag_state_change_process_pool_size.""" + option = conf.getint(_CONFIG_SECTION, "dag_state_change_process_pool_size", fallback=1) + return option diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 25ded6d7f43618..4fd87ad0972107 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -17,13 +17,14 @@ from __future__ import annotations import logging -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ProcessPoolExecutor from datetime import datetime from typing import TYPE_CHECKING from openlineage.client.serde import Serde from airflow.listeners import hookimpl +from airflow.providers.openlineage import conf from airflow.providers.openlineage.extractors import ExtractorManager from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState from airflow.providers.openlineage.utils.utils import ( @@ -250,7 +251,7 @@ def on_failure(): @property def executor(self): if not self._executor: - self._executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="openlineage_") + self._executor = ProcessPoolExecutor(max_workers=conf.dag_state_change_process_pool_size()) return self._executor @hookimpl diff --git a/tests/providers/openlineage/plugins/test_listener.py b/tests/providers/openlineage/plugins/test_listener.py index c37892c1f3dbe3..b747da3827504f 100644 --- a/tests/providers/openlineage/plugins/test_listener.py +++ b/tests/providers/openlineage/plugins/test_listener.py @@ -523,6 +523,27 @@ def test_listener_on_task_instance_success_do_not_call_adapter_when_disabled_ope listener.adapter.complete_task.assert_not_called() +@pytest.mark.parametrize( + "max_workers,expected", + [ + (None, 1), + ("8", 8), + ], +) +@mock.patch("airflow.providers.openlineage.plugins.listener.ProcessPoolExecutor", autospec=True) +def test_listener_on_dag_run_state_changes_configure_process_pool_size(mock_executor, max_workers, expected): + """mock ProcessPoolExecutor and check if conf.dag_state_change_process_pool_size is applied to max_workers""" + listener = OpenLineageListener() + # mock ProcessPoolExecutor class + try: + with conf_vars({("openlineage", "dag_state_change_process_pool_size"): max_workers}): + listener.on_dag_run_running(mock.MagicMock(), None) + mock_executor.assert_called_once_with(max_workers=expected) + mock_executor.return_value.submit.assert_called_once() + finally: + conf.dag_state_change_process_pool_size.cache_clear() + + class TestOpenLineageSelectiveEnable: def setup_method(self): self.dag = DAG(