diff --git a/providers/openlineage/docs/configurations-ref.rst b/providers/openlineage/docs/configurations-ref.rst index 258c9761d560a..fb78ad2424fca 100644 --- a/providers/openlineage/docs/configurations-ref.rst +++ b/providers/openlineage/docs/configurations-ref.rst @@ -56,6 +56,36 @@ If you want to look at OpenLineage events without sending them anywhere, you can [openlineage] transport = {"type": "console"} +OpenLineage config stored in an Airflow connection +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +You can store OpenLineage client configuration in a Generic Airflow connection instead of putting the full JSON +configuration directly in ``airflow.cfg``. Set ``config_conn_id`` to the connection ID and store the OpenLineage +configuration in the connection extra as JSON. + +.. code-block:: ini + + [openlineage] + config_conn_id = openlineage_default + +Connection extra should contain the OpenLineage client configuration: + +.. code-block:: json + + { + "transport": { + "type": "http", + "url": "http://example.com:5000", + "auth": {"type": "airflow_connection_api_key"} + } + } + +For HTTP transports that require API key authentication, you can keep the token in the Airflow connection password. +Set ``auth.type`` to ``airflow_connection_api_key``. When the config is loaded from ``config_conn_id``, the provider +reads the API key from the same connection password by default. You can also set ``auth.conn_id`` to read the token +from another Airflow connection. The provider resolves ``airflow_connection_api_key`` to standard OpenLineage +``api_key`` auth before creating the OpenLineage client. + .. note:: For full list of built-in transport types, specific transport's options or instructions on how to implement your custom transport, refer to `Python client documentation `_. @@ -100,9 +130,10 @@ Primary, and recommended method of configuring OpenLineage Airflow Provider is A As there are multiple possible ways of configuring OpenLineage, it's important to keep in mind the precedence of different configurations. OpenLineage Airflow Provider looks for the configuration in the following order: -1. Check ``config_path`` in ``airflow.cfg`` under ``openlineage`` section (or AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable) -2. Check ``transport`` in ``airflow.cfg`` under ``openlineage`` section (or AIRFLOW__OPENLINEAGE__TRANSPORT environment variable) -3. If all the above options are missing, the OpenLineage Python client used underneath looks for configuration in the order described in `this `_ documentation. Please note that **using Airflow configuration is encouraged** and is the only future proof solution. +1. Check ``config_conn_id`` in ``airflow.cfg`` under ``openlineage`` section. +2. Check ``config_path`` in ``airflow.cfg`` under ``openlineage`` section (or AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable) +3. Check ``transport`` in ``airflow.cfg`` under ``openlineage`` section (or AIRFLOW__OPENLINEAGE__TRANSPORT environment variable) +4. If all the above options are missing, the OpenLineage Python client used underneath looks for configuration in the order described in `this `_ documentation. Please note that **using Airflow configuration is encouraged** and is the only future proof solution. .. _configuration_selective_enable:openlineage: diff --git a/providers/openlineage/provider.yaml b/providers/openlineage/provider.yaml index 387eec20ca6ee..d04fa32d55b38 100644 --- a/providers/openlineage/provider.yaml +++ b/providers/openlineage/provider.yaml @@ -104,6 +104,15 @@ config: This section applies settings for OpenLineage integration. options: + config_conn_id: + description: | + Specify a Generic Airflow connection ID that contains OpenLineage configuration in connection + extra. This can be used to keep the OpenLineage transport configuration, including auth settings, + outside of the Airflow configuration file. + version_added: ~ + type: string + example: "openlineage_default" + default: "" config_path: description: | Specify the path to the YAML configuration file. diff --git a/providers/openlineage/src/airflow/providers/openlineage/conf.py b/providers/openlineage/src/airflow/providers/openlineage/conf.py index 0fc4611e0c6e5..3a2ca201e53e4 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/conf.py +++ b/providers/openlineage/src/airflow/providers/openlineage/conf.py @@ -54,6 +54,12 @@ def config_path(check_legacy_env_var: bool = True) -> str: return option +@cache +def config_conn_id() -> str: + """[openlineage] config_conn_id.""" + return conf.get(_CONFIG_SECTION, "config_conn_id", fallback="") + + @cache def is_source_enabled() -> bool: """[openlineage] disable_source_code.""" @@ -136,6 +142,8 @@ def is_disabled() -> bool: if _is_true(os.getenv("OPENLINEAGE_DISABLED", "")): # Check legacy variable return True + if config_conn_id(): # Check if config connection is present + return False if transport(): # Check if transport is present return False if config_path(True): # Check if config file is present diff --git a/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py b/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py index c222b76aa5d87..60cf1a981a6cf 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py +++ b/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py @@ -50,6 +50,13 @@ def get_provider_info(): "openlineage": { "description": "This section applies settings for OpenLineage integration.\n", "options": { + "config_conn_id": { + "description": "Specify a Generic Airflow connection ID that contains OpenLineage configuration in connection\nextra. This can be used to keep the OpenLineage transport configuration, including auth settings,\noutside of the Airflow configuration file.\n", + "version_added": None, + "type": "string", + "example": "openlineage_default", + "default": "", + }, "config_path": { "description": "Specify the path to the YAML configuration file.\nThis ensures backwards compatibility with passing config through the `openlineage.yml` file.\n", "version_added": None, diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py index 594afe122800e..ec81e232fc8cf 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py @@ -18,7 +18,7 @@ import os import traceback -from typing import TYPE_CHECKING, Literal +from typing import TYPE_CHECKING, Any, Literal import yaml from openlineage.client import OpenLineageClient, set_producer @@ -36,6 +36,10 @@ from airflow.providers.common.compat.sdk import Stats, conf as airflow_conf from airflow.providers.openlineage import conf +from airflow.providers.openlineage.token_provider import ( + AirflowConnectionConfigProvider, + resolve_airflow_connection_auth, +) from airflow.providers.openlineage.utils.utils import ( _PRODUCER, OpenLineageRedactor, @@ -103,22 +107,35 @@ def get_or_create_openlineage_client(self) -> OpenLineageClient: return self._client def get_openlineage_config(self) -> dict | None: - # First, try to read from YAML file + # First, try to read from Airflow connection + openlineage_config_conn_id = conf.config_conn_id() + if openlineage_config_conn_id: + config = AirflowConnectionConfigProvider(openlineage_config_conn_id).get_config() + resolve_airflow_connection_auth(config=config, config_conn_id=openlineage_config_conn_id) + return config + self.log.debug("OpenLineage config_conn_id configuration not found.") + + # Second, try to read from YAML file openlineage_config_path = conf.config_path(check_legacy_env_var=False) if openlineage_config_path: - config = self._read_yaml_config(openlineage_config_path) - return config + yaml_config = self._read_yaml_config(openlineage_config_path) + if yaml_config is None: + return None + resolve_airflow_connection_auth(yaml_config) + return yaml_config self.log.debug("OpenLineage config_path configuration not found.") - # Second, try to get transport config + # Third, try to get transport config transport_config = conf.transport() if not transport_config: self.log.debug("OpenLineage transport configuration not found.") return None - return {"transport": transport_config} + config = {"transport": transport_config} + resolve_airflow_connection_auth(config) + return config @staticmethod - def _read_yaml_config(path: str) -> dict | None: + def _read_yaml_config(path: str) -> dict[str, Any] | None: with open(path) as config_file: return yaml.safe_load(config_file) diff --git a/providers/openlineage/src/airflow/providers/openlineage/token_provider.py b/providers/openlineage/src/airflow/providers/openlineage/token_provider.py new file mode 100644 index 0000000000000..330d8a7375d40 --- /dev/null +++ b/providers/openlineage/src/airflow/providers/openlineage/token_provider.py @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Any + +from airflow.providers.common.compat.sdk import AirflowException, BaseHook + +AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE = "airflow_connection_api_key" +_DEFAULT_EXTRA_KEYS = ("apiKey", "api_key", "apikey", "token", "access_token") + + +class OpenLineageAirflowConnectionAuthError(AirflowException): + """Raised when OpenLineage API key auth cannot be resolved from an Airflow connection.""" + + +class OpenLineageAirflowConnectionConfigError(AirflowException): + """Raised when OpenLineage config cannot be resolved from an Airflow connection.""" + + +class AirflowConnectionConfigProvider: + """ + Resolve OpenLineage client configuration from an Airflow connection. + + The connection extra contains the full OpenLineage client config, for example + ``{"transport": {"type": "console"}}``. + """ + + def __init__(self, conn_id: str) -> None: + if not conn_id: + raise OpenLineageAirflowConnectionConfigError( + "OpenLineage connection config requires a non-empty connection ID." + ) + self.conn_id = conn_id + + def get_config(self) -> dict[str, Any]: + connection = BaseHook.get_connection(self.conn_id) + return self._validate_config(connection.extra_dejson) + + def _validate_config(self, config: Any) -> dict[str, Any]: + if not isinstance(config, dict): + raise OpenLineageAirflowConnectionConfigError( + f"OpenLineage connection config `{config}` is not a dict." + ) + if not isinstance(config.get("transport"), dict): + raise OpenLineageAirflowConnectionConfigError( + "OpenLineage connection config must contain a `transport` JSON object." + ) + return config + + +class AirflowConnectionTokenProvider: + """ + Resolve an OpenLineage API key from an Airflow connection. + + The connection password is preferred. If it is empty and ``extra_key`` is configured, that key + is read from connection ``extra``. Otherwise, common extra keys are checked. + """ + + def __init__(self, config: dict[str, Any], default_conn_id: str | None = None) -> None: + self.conn_id = config.get("conn_id") or default_conn_id or "" + self.extra_key = config.get("extra_key") + if not self.conn_id: + raise OpenLineageAirflowConnectionAuthError( + "OpenLineage `airflow_connection_api_key` auth requires a non-empty `conn_id`." + ) + + def get_api_key(self) -> str: + connection = BaseHook.get_connection(self.conn_id) + if connection.password: + return connection.password.strip() + api_key = self._get_api_key_from_extra(connection.extra_dejson) + if api_key: + return api_key + + raise OpenLineageAirflowConnectionAuthError( + "OpenLineage `airflow_connection_api_key` auth could not find a token in connection " + f"`{self.conn_id}`. Expected connection password or token in connection extra." + ) + + def _get_api_key_from_extra(self, extra: dict[str, Any]) -> str | None: + if self.extra_key: + value = extra.get(self.extra_key) + return str(value).strip() if value else None + + for key in _DEFAULT_EXTRA_KEYS: + value = extra.get(key) + if value: + return str(value).strip() + return None + + +def resolve_airflow_connection_auth(config: dict[str, Any] | None, config_conn_id: str | None = None) -> None: + """ + Read the API key from an Airflow connection and put it into the OpenLineage config. + + OpenLineage config can contain one transport, a composite transport, or composite transports + nested inside each other. This function walks through that structure and updates every matching + ``auth`` block in place. + + This only makes sense for HTTP transports: ``airflow_connection_api_key`` is replaced with + ``{"type": "api_key", "apiKey": ...}``. + """ + if not isinstance(config, dict): + return + + for key, value in config.items(): + if ( + key == "auth" + and isinstance(value, dict) + and value.get("type") == AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE + ): + provider = AirflowConnectionTokenProvider(value, default_conn_id=config_conn_id) + config[key] = {"type": "api_key", "apiKey": provider.get_api_key()} + elif key == "transports" and isinstance(value, list): + for item in value: + resolve_airflow_connection_auth(item, config_conn_id=config_conn_id) + else: + resolve_airflow_connection_auth(value, config_conn_id=config_conn_id) diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py index 30e18d0b69840..a3a252b8cbb18 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py @@ -17,6 +17,7 @@ from __future__ import annotations import datetime +import json import os import pathlib import uuid @@ -41,7 +42,7 @@ from airflow import DAG from airflow.models.dagrun import DagRun, DagRunState from airflow.models.taskinstance import TaskInstance, TaskInstanceState -from airflow.providers.common.compat.sdk import Stats +from airflow.providers.common.compat.sdk import BaseHook, Connection, Stats from airflow.providers.openlineage.conf import namespace from airflow.providers.openlineage.extractors import OperatorLineage from airflow.providers.openlineage.plugins.adapter import _PRODUCER, OpenLineageAdapter @@ -50,6 +51,11 @@ AirflowDebugRunFacet, AirflowStateRunFacet, ) +from airflow.providers.openlineage.token_provider import ( + AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE, + OpenLineageAirflowConnectionAuthError, + OpenLineageAirflowConnectionConfigError, +) from airflow.providers.openlineage.utils.utils import get_airflow_job_facet from airflow.providers.standard.operators.empty import EmptyOperator from airflow.utils.task_group import TaskGroup @@ -102,6 +108,160 @@ def test_create_client_from_config_with_options(): assert client.transport.url == "http://ol-api:5000" +@patch.object(BaseHook, "get_connection") +@conf_vars( + { + ("openlineage", "transport"): '{"type": "http", "url": "http://ol-api:5000",' + ' "auth": {"type": "api_key", "apiKey": "api-key"}}' + } +) +def test_create_client_from_config_without_connection_auth_does_not_read_connection(mock_get_connection): + client = OpenLineageAdapter().get_or_create_openlineage_client() + + assert client.transport.kind == "http" + assert client.transport.url == "http://ol-api:5000" + mock_get_connection.assert_not_called() + + +def _connection_auth_transport_config(**auth_config): + auth = { + "type": AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE, + "conn_id": "openlineage_default", + **auth_config, + } + return json.dumps({"type": "http", "url": "http://ol-api:5000", "auth": auth}) + + +@patch.object(BaseHook, "get_connection") +def test_create_client_from_config_with_connection_auth_password(mock_get_connection): + mock_get_connection.return_value = Connection( + conn_id="openlineage_default", conn_type="http", password="api-key" + ) + + with conf_vars({("openlineage", "transport"): _connection_auth_transport_config()}): + client = OpenLineageAdapter().get_or_create_openlineage_client() + + assert client.transport.kind == "http" + assert client.transport.url == "http://ol-api:5000" + assert client.transport.config.auth.api_key == "api-key" + + +@patch.object(BaseHook, "get_connection") +def test_create_client_from_config_with_connection_auth_extra(mock_get_connection): + mock_get_connection.return_value = Connection( + conn_id="openlineage_default", + conn_type="http", + extra='{"lineage_token": "api-key-from-extra"}', + ) + + transport_config = _connection_auth_transport_config(extra_key="lineage_token") + with conf_vars({("openlineage", "transport"): transport_config}): + client = OpenLineageAdapter().get_or_create_openlineage_client() + + assert client.transport.kind == "http" + assert client.transport.config.auth.api_key == "api-key-from-extra" + + +@patch.object(BaseHook, "get_connection") +def test_create_client_from_config_with_connection_auth_token_extra(mock_get_connection): + mock_get_connection.return_value = Connection( + conn_id="openlineage_default", + conn_type="http", + extra='{"token": "api-key-from-token"}', + ) + + with conf_vars({("openlineage", "transport"): _connection_auth_transport_config()}): + client = OpenLineageAdapter().get_or_create_openlineage_client() + + assert client.transport.kind == "http" + assert client.transport.config.auth.api_key == "api-key-from-token" + + +@patch.object(BaseHook, "get_connection") +def test_create_client_from_config_with_connection_auth_missing_secret(mock_get_connection): + mock_get_connection.return_value = Connection(conn_id="openlineage_default", conn_type="http", extra="{}") + + with conf_vars({("openlineage", "transport"): _connection_auth_transport_config()}): + with pytest.raises(OpenLineageAirflowConnectionAuthError, match="could not find a token"): + OpenLineageAdapter().get_or_create_openlineage_client() + + +@patch.object(BaseHook, "get_connection") +def test_create_client_from_connection_config_with_connection_auth_password(mock_get_connection): + mock_get_connection.return_value = Connection( + conn_id="openlineage_default", + conn_type="http", + password="api-key", + extra=json.dumps( + { + "transport": { + "type": "http", + "url": "http://ol-api:5000", + "auth": { + "type": AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE, + }, + } + } + ), + ) + + with conf_vars({("openlineage", "config_conn_id"): "openlineage_default"}): + client = OpenLineageAdapter().get_or_create_openlineage_client() + + assert client.transport.kind == "http" + assert client.transport.url == "http://ol-api:5000" + assert client.transport.config.auth.api_key == "api-key" + + +@patch.object(BaseHook, "get_connection") +def test_create_client_from_connection_transport_config(mock_get_connection): + mock_get_connection.return_value = Connection( + conn_id="openlineage_default", + conn_type="generic", + extra='{"transport": {"type": "console"}}', + ) + + with conf_vars({("openlineage", "config_conn_id"): "openlineage_default"}): + client = OpenLineageAdapter().get_or_create_openlineage_client() + + assert client.transport.kind == "console" + + +@patch.object(BaseHook, "get_connection") +def test_connection_config_takes_precedence_over_transport_config(mock_get_connection): + mock_get_connection.return_value = Connection( + conn_id="openlineage_default", + conn_type="generic", + extra='{"transport": {"type": "console"}}', + ) + + with conf_vars( + { + ("openlineage", "config_conn_id"): "openlineage_default", + ("openlineage", "transport"): '{"type": "http", "url": "http://ol-api:5000"}', + } + ): + client = OpenLineageAdapter().get_or_create_openlineage_client() + + assert client.transport.kind == "console" + + +@patch.object(BaseHook, "get_connection") +def test_connection_config_missing_transport_raises_custom_exception(mock_get_connection): + mock_get_connection.return_value = Connection( + conn_id="openlineage_default", + conn_type="generic", + extra='{"url": "http://ol-api:5000"}', + ) + + with conf_vars({("openlineage", "config_conn_id"): "openlineage_default"}): + with pytest.raises( + OpenLineageAirflowConnectionConfigError, + match="must contain a `transport` JSON object", + ): + OpenLineageAdapter().get_or_create_openlineage_client() + + def test_create_client_from_yaml_config(): current_folder = pathlib.Path(__file__).parent.resolve() yaml_config = str((current_folder / "openlineage_configs" / "http.yaml").resolve()) diff --git a/providers/openlineage/tests/unit/openlineage/test_token_provider.py b/providers/openlineage/tests/unit/openlineage/test_token_provider.py new file mode 100644 index 0000000000000..afd49433cb323 --- /dev/null +++ b/providers/openlineage/tests/unit/openlineage/test_token_provider.py @@ -0,0 +1,192 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from airflow.providers.common.compat.sdk import BaseHook, Connection +from airflow.providers.openlineage.token_provider import ( + AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE, + AirflowConnectionConfigProvider, + AirflowConnectionTokenProvider, + OpenLineageAirflowConnectionAuthError, + OpenLineageAirflowConnectionConfigError, + resolve_airflow_connection_auth, +) + + +@patch.object(BaseHook, "get_connection") +def test_get_api_key_from_connection_password(mock_get_connection): + mock_get_connection.return_value = Connection( + conn_id="openlineage_default", conn_type="http", password="api-key" + ) + + provider = AirflowConnectionTokenProvider({"conn_id": "openlineage_default"}) + + assert provider.get_api_key() == "api-key" + + +@patch.object(BaseHook, "get_connection") +def test_get_api_key_from_default_connection_id(mock_get_connection): + mock_get_connection.return_value = Connection( + conn_id="openlineage_default", conn_type="http", password="api-key" + ) + + provider = AirflowConnectionTokenProvider({}, default_conn_id="openlineage_default") + + assert provider.get_api_key() == "api-key" + + +@patch.object(BaseHook, "get_connection") +def test_get_api_key_from_connection_extra(mock_get_connection): + mock_get_connection.return_value = Connection( + conn_id="openlineage_default", conn_type="http", extra='{"api_key": "api-key-from-extra"}' + ) + + provider = AirflowConnectionTokenProvider({"conn_id": "openlineage_default"}) + + assert provider.get_api_key() == "api-key-from-extra" + + +def test_missing_conn_id_raises_custom_exception(): + with pytest.raises(OpenLineageAirflowConnectionAuthError, match="requires a non-empty `conn_id`"): + AirflowConnectionTokenProvider({}) + + +@patch.object(BaseHook, "get_connection") +def test_missing_token_raises_custom_exception(mock_get_connection): + mock_get_connection.return_value = Connection(conn_id="openlineage_default", conn_type="http") + + provider = AirflowConnectionTokenProvider({"conn_id": "openlineage_default"}) + + with pytest.raises(OpenLineageAirflowConnectionAuthError, match="could not find a token"): + provider.get_api_key() + + +@patch.object(BaseHook, "get_connection") +def test_resolve_connection_auth_in_composite_transport(mock_get_connection): + mock_get_connection.return_value = Connection( + conn_id="openlineage_default", conn_type="http", password="api-key" + ) + config = { + "transport": { + "type": "composite", + "transports": [ + { + "type": "http", + "url": "http://ol-api:5000", + "auth": { + "type": AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE, + "conn_id": "openlineage_default", + }, + } + ], + } + } + + resolve_airflow_connection_auth(config) + + assert config["transport"]["transports"][0]["auth"] == { + "type": "api_key", + "apiKey": "api-key", + } + + +@patch.object(BaseHook, "get_connection") +def test_resolve_connection_auth_in_nested_composite_transport(mock_get_connection): + mock_get_connection.return_value = Connection( + conn_id="openlineage_default", conn_type="http", password="api-key" + ) + config = { + "transport": { + "type": "composite", + "transports": [ + { + "type": "http", + "url": "http://ol-api-1:5000", + "auth": { + "type": AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE, + "conn_id": "openlineage_default", + }, + }, + { + "type": "composite", + "transports": [ + { + "type": "http", + "url": "http://ol-api-2:5000", + "auth": { + "type": AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE, + "conn_id": "openlineage_default", + }, + }, + {"type": "console"}, + ], + }, + ], + } + } + + resolve_airflow_connection_auth(config) + + assert config["transport"]["transports"][0]["auth"] == { + "type": "api_key", + "apiKey": "api-key", + } + assert config["transport"]["transports"][1]["transports"][0]["auth"] == { + "type": "api_key", + "apiKey": "api-key", + } + assert config["transport"]["transports"][1]["transports"][1] == {"type": "console"} + + +@patch.object(BaseHook, "get_connection") +def test_get_openlineage_config_from_connection_extra(mock_get_connection): + mock_get_connection.return_value = Connection( + conn_id="openlineage_default", + conn_type="generic", + extra='{"transport": {"type": "console"}}', + ) + + provider = AirflowConnectionConfigProvider("openlineage_default") + + assert provider.get_config() == {"transport": {"type": "console"}} + + +def test_missing_config_conn_id_raises_custom_exception(): + with pytest.raises(OpenLineageAirflowConnectionConfigError, match="requires a non-empty connection ID"): + AirflowConnectionConfigProvider("") + + +@patch.object(BaseHook, "get_connection") +def test_missing_config_raises_custom_exception(mock_get_connection): + mock_get_connection.return_value = Connection( + conn_id="openlineage_default", + conn_type="generic", + extra='{"url": "http://ol-api:5000"}', + ) + + provider = AirflowConnectionConfigProvider("openlineage_default") + + with pytest.raises( + OpenLineageAirflowConnectionConfigError, + match="must contain a `transport` JSON object", + ): + provider.get_config()