From e523e890299fe7146f1cdbd13b6767effb9b1a91 Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Mon, 6 Apr 2026 22:42:25 +0800 Subject: [PATCH 1/3] add changes for os provider --- .../airflow_local_settings.py | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index 48f14b0f9a9ee..0c54c55573575 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -334,34 +334,36 @@ def _default_conn_name_from(mod_path, hook_name): ) elif OPENSEARCH_HOST: - OPENSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("opensearch", "END_OF_LOG_MARK") - OPENSEARCH_PORT: str = conf.get_mandatory_value("opensearch", "PORT") + from airflow.providers.opensearch.log.os_task_handler import OpensearchRemoteLogIO + + OPENSEARCH_PORT = conf.getint("opensearch", "PORT", fallback=9200) OPENSEARCH_USERNAME: str = conf.get_mandatory_value("opensearch", "USERNAME") OPENSEARCH_PASSWORD: str = conf.get_mandatory_value("opensearch", "PASSWORD") OPENSEARCH_WRITE_STDOUT: bool = conf.getboolean("opensearch", "WRITE_STDOUT") + OPENSEARCH_WRITE_TO_OS: bool = conf.getboolean("opensearch", "WRITE_TO_OS") OPENSEARCH_JSON_FORMAT: bool = conf.getboolean("opensearch", "JSON_FORMAT") - OPENSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("opensearch", "JSON_FIELDS") + OPENSEARCH_TARGET_INDEX: str = conf.get_mandatory_value("opensearch", "TARGET_INDEX") OPENSEARCH_HOST_FIELD: str = conf.get_mandatory_value("opensearch", "HOST_FIELD") OPENSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("opensearch", "OFFSET_FIELD") + OPENSEARCH_LOG_ID_TEMPLATE: str = conf.get("opensearch", "LOG_ID_TEMPLATE", fallback="") or ( + "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}" + ) - OPENSEARCH_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { - "task": { - "class": "airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler", - "formatter": "airflow", - "base_log_folder": BASE_LOG_FOLDER, - "end_of_log_mark": OPENSEARCH_END_OF_LOG_MARK, - "host": OPENSEARCH_HOST, - "port": OPENSEARCH_PORT, - "username": OPENSEARCH_USERNAME, - "password": OPENSEARCH_PASSWORD, - "write_stdout": OPENSEARCH_WRITE_STDOUT, - "json_format": OPENSEARCH_JSON_FORMAT, - "json_fields": OPENSEARCH_JSON_FIELDS, - "host_field": OPENSEARCH_HOST_FIELD, - "offset_field": OPENSEARCH_OFFSET_FIELD, - }, - } - DEFAULT_LOGGING_CONFIG["handlers"].update(OPENSEARCH_REMOTE_HANDLERS) + REMOTE_TASK_LOG = OpensearchRemoteLogIO( + host=OPENSEARCH_HOST, + port=OPENSEARCH_PORT, + username=OPENSEARCH_USERNAME, + password=OPENSEARCH_PASSWORD, + target_index=OPENSEARCH_TARGET_INDEX, + write_stdout=OPENSEARCH_WRITE_STDOUT, + write_to_opensearch=OPENSEARCH_WRITE_TO_OS, + offset_field=OPENSEARCH_OFFSET_FIELD, + host_field=OPENSEARCH_HOST_FIELD, + base_log_folder=BASE_LOG_FOLDER, + delete_local_copy=delete_local_copy, + json_format=OPENSEARCH_JSON_FORMAT, + log_id_template=OPENSEARCH_LOG_ID_TEMPLATE, + ) else: raise AirflowException( "Incorrect remote log configuration. Please check the configuration of option 'host' in " From 1eba4e997997907de126742981905d952ded011a Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Wed, 8 Apr 2026 21:46:58 +0800 Subject: [PATCH 2/3] bump pyproject.toml provider lib version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 3bebe69019cb3..436c1e9415f8e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -297,7 +297,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-openlineage>=2.3.0" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py ] "opensearch" = [ - "apache-airflow-providers-opensearch>=1.5.0" + "apache-airflow-providers-opensearch>=1.9.0" ] "opsgenie" = [ "apache-airflow-providers-opsgenie>=5.8.0" From b1b39ac4f2f6b26b2ad7c7b176b77a7efdc70e7e Mon Sep 17 00:00:00 2001 From: Owen Leung Date: Thu, 9 Apr 2026 21:35:40 +0800 Subject: [PATCH 3/3] Fix failing CI --- pyproject.toml | 4 ++-- scripts/ci/prek/update_airflow_pyproject_toml.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 436c1e9415f8e..b7b3a444a41ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -297,7 +297,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-openlineage>=2.3.0" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py ] "opensearch" = [ - "apache-airflow-providers-opensearch>=1.9.0" + "apache-airflow-providers-opensearch>=1.9.0" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py ] "opsgenie" = [ "apache-airflow-providers-opsgenie>=5.8.0" @@ -464,7 +464,7 @@ apache-airflow = "airflow.__main__:main" "apache-airflow-providers-openai>=1.5.0", "apache-airflow-providers-openfaas>=3.7.0", "apache-airflow-providers-openlineage>=2.3.0", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py - "apache-airflow-providers-opensearch>=1.5.0", + "apache-airflow-providers-opensearch>=1.9.0", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py "apache-airflow-providers-opsgenie>=5.8.0", "apache-airflow-providers-oracle>=3.12.0", "apache-airflow-providers-pagerduty>=3.8.1", diff --git a/scripts/ci/prek/update_airflow_pyproject_toml.py b/scripts/ci/prek/update_airflow_pyproject_toml.py index 2143432dedda7..e912f3aee592c 100755 --- a/scripts/ci/prek/update_airflow_pyproject_toml.py +++ b/scripts/ci/prek/update_airflow_pyproject_toml.py @@ -89,6 +89,7 @@ "git": parse_version("0.0.2"), "common.messaging": parse_version("2.0.0"), "elasticsearch": parse_version("6.5.0"), + "opensearch": parse_version("1.9.0"), }