Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions providers/openlineage/docs/configurations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,36 @@ If you want to look at OpenLineage events without sending them anywhere, you can
[openlineage]
transport = {"type": "console"}

OpenLineage config stored in an Airflow connection
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

You can store OpenLineage client configuration in a Generic Airflow connection instead of putting the full JSON
configuration directly in ``airflow.cfg``. Set ``config_conn_id`` to the connection ID and store the OpenLineage
configuration in the connection extra as JSON.

.. code-block:: ini

[openlineage]
config_conn_id = openlineage_default

Connection extra should contain the OpenLineage client configuration:

.. code-block:: json

{
"transport": {
"type": "http",
"url": "http://example.com:5000",
"auth": {"type": "airflow_connection_api_key"}
}
}

For HTTP transports that require API key authentication, you can keep the token in the Airflow connection password.
Set ``auth.type`` to ``airflow_connection_api_key``. When the config is loaded from ``config_conn_id``, the provider
reads the API key from the same connection password by default. You can also set ``auth.conn_id`` to read the token
from another Airflow connection. The provider resolves ``airflow_connection_api_key`` to standard OpenLineage
``api_key`` auth before creating the OpenLineage client.

.. note::
For full list of built-in transport types, specific transport's options or instructions on how to implement your custom transport, refer to
`Python client documentation <https://openlineage.io/docs/client/python/configuration#transports>`_.
Expand Down Expand Up @@ -100,9 +130,10 @@ Primary, and recommended method of configuring OpenLineage Airflow Provider is A
As there are multiple possible ways of configuring OpenLineage, it's important to keep in mind the precedence of different configurations.
OpenLineage Airflow Provider looks for the configuration in the following order:

1. Check ``config_path`` in ``airflow.cfg`` under ``openlineage`` section (or AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable)
2. Check ``transport`` in ``airflow.cfg`` under ``openlineage`` section (or AIRFLOW__OPENLINEAGE__TRANSPORT environment variable)
3. If all the above options are missing, the OpenLineage Python client used underneath looks for configuration in the order described in `this <https://openlineage.io/docs/client/python/configuration>`_ documentation. Please note that **using Airflow configuration is encouraged** and is the only future proof solution.
1. Check ``config_conn_id`` in ``airflow.cfg`` under ``openlineage`` section.
2. Check ``config_path`` in ``airflow.cfg`` under ``openlineage`` section (or AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable)
3. Check ``transport`` in ``airflow.cfg`` under ``openlineage`` section (or AIRFLOW__OPENLINEAGE__TRANSPORT environment variable)
4. If all the above options are missing, the OpenLineage Python client used underneath looks for configuration in the order described in `this <https://openlineage.io/docs/client/python/configuration>`_ documentation. Please note that **using Airflow configuration is encouraged** and is the only future proof solution.


.. _configuration_selective_enable:openlineage:
Expand Down
9 changes: 9 additions & 0 deletions providers/openlineage/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ config:
This section applies settings for OpenLineage integration.

options:
config_conn_id:
description: |
Specify a Generic Airflow connection ID that contains OpenLineage configuration in connection
extra. This can be used to keep the OpenLineage transport configuration, including auth settings,
outside of the Airflow configuration file.
version_added: ~
type: string
example: "openlineage_default"
default: ""
config_path:
description: |
Specify the path to the YAML configuration file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ def config_path(check_legacy_env_var: bool = True) -> str:
return option


@cache
def config_conn_id() -> str:
"""[openlineage] config_conn_id."""
return conf.get(_CONFIG_SECTION, "config_conn_id", fallback="")
Comment thread
VladaZakharova marked this conversation as resolved.


@cache
def is_source_enabled() -> bool:
"""[openlineage] disable_source_code."""
Expand Down Expand Up @@ -136,6 +142,8 @@ def is_disabled() -> bool:
if _is_true(os.getenv("OPENLINEAGE_DISABLED", "")): # Check legacy variable
return True

if config_conn_id(): # Check if config connection is present
return False
if transport(): # Check if transport is present
return False
if config_path(True): # Check if config file is present
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ def get_provider_info():
"openlineage": {
"description": "This section applies settings for OpenLineage integration.\n",
"options": {
"config_conn_id": {
"description": "Specify a Generic Airflow connection ID that contains OpenLineage configuration in connection\nextra. This can be used to keep the OpenLineage transport configuration, including auth settings,\noutside of the Airflow configuration file.\n",
"version_added": None,
"type": "string",
"example": "openlineage_default",
"default": "",
},
"config_path": {
"description": "Specify the path to the YAML configuration file.\nThis ensures backwards compatibility with passing config through the `openlineage.yml` file.\n",
"version_added": None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import os
import traceback
from typing import TYPE_CHECKING, Literal
from typing import TYPE_CHECKING, Any, Literal

import yaml
from openlineage.client import OpenLineageClient, set_producer
Expand All @@ -36,6 +36,10 @@

from airflow.providers.common.compat.sdk import Stats, conf as airflow_conf
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.token_provider import (
AirflowConnectionConfigProvider,
resolve_airflow_connection_auth,
)
from airflow.providers.openlineage.utils.utils import (
_PRODUCER,
OpenLineageRedactor,
Expand Down Expand Up @@ -103,22 +107,35 @@ def get_or_create_openlineage_client(self) -> OpenLineageClient:
return self._client

def get_openlineage_config(self) -> dict | None:
# First, try to read from YAML file
# First, try to read from Airflow connection
openlineage_config_conn_id = conf.config_conn_id()
if openlineage_config_conn_id:
config = AirflowConnectionConfigProvider(openlineage_config_conn_id).get_config()
resolve_airflow_connection_auth(config=config, config_conn_id=openlineage_config_conn_id)
return config
self.log.debug("OpenLineage config_conn_id configuration not found.")

# Second, try to read from YAML file
openlineage_config_path = conf.config_path(check_legacy_env_var=False)
if openlineage_config_path:
config = self._read_yaml_config(openlineage_config_path)
return config
yaml_config = self._read_yaml_config(openlineage_config_path)
if yaml_config is None:
return None
resolve_airflow_connection_auth(yaml_config)
return yaml_config
self.log.debug("OpenLineage config_path configuration not found.")

# Second, try to get transport config
# Third, try to get transport config
transport_config = conf.transport()
if not transport_config:
self.log.debug("OpenLineage transport configuration not found.")
return None
return {"transport": transport_config}
config = {"transport": transport_config}
resolve_airflow_connection_auth(config)
return config

@staticmethod
def _read_yaml_config(path: str) -> dict | None:
def _read_yaml_config(path: str) -> dict[str, Any] | None:
with open(path) as config_file:
return yaml.safe_load(config_file)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Any

from airflow.providers.common.compat.sdk import AirflowException, BaseHook

AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE = "airflow_connection_api_key"
_DEFAULT_EXTRA_KEYS = ("apiKey", "api_key", "apikey", "token", "access_token")


class OpenLineageAirflowConnectionAuthError(AirflowException):
"""Raised when OpenLineage API key auth cannot be resolved from an Airflow connection."""


class OpenLineageAirflowConnectionConfigError(AirflowException):
"""Raised when OpenLineage config cannot be resolved from an Airflow connection."""
Comment thread
VladaZakharova marked this conversation as resolved.


class AirflowConnectionConfigProvider:
"""
Resolve OpenLineage client configuration from an Airflow connection.

The connection extra contains the full OpenLineage client config, for example
``{"transport": {"type": "console"}}``.
"""

def __init__(self, conn_id: str) -> None:
if not conn_id:
raise OpenLineageAirflowConnectionConfigError(
"OpenLineage connection config requires a non-empty connection ID."
)
self.conn_id = conn_id

def get_config(self) -> dict[str, Any]:
connection = BaseHook.get_connection(self.conn_id)
return self._validate_config(connection.extra_dejson)

def _validate_config(self, config: Any) -> dict[str, Any]:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, wondering if there is any way we can use OL client initialization as validation here to avoid duplicate check logic. If not it's fine, we'll extend this validation in the future.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about using OpenLineageClient(config=...) for this, but I think it would be a bit too heavy for validation here. It would create the client/transport once just to check the config, and then we would create it again later in the adapter.

So for now I kept this check very small: the Airflow connection extra must be a JSON object with a transport object. The OpenLineage client still does the real transport/auth validation when it is created. If the OpenLineage client gets a dedicated validation method later, we can switch to that.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense !

if not isinstance(config, dict):
raise OpenLineageAirflowConnectionConfigError(
f"OpenLineage connection config `{config}` is not a dict."
)
if not isinstance(config.get("transport"), dict):
raise OpenLineageAirflowConnectionConfigError(
"OpenLineage connection config must contain a `transport` JSON object."
)
return config


class AirflowConnectionTokenProvider:
"""
Resolve an OpenLineage API key from an Airflow connection.

The connection password is preferred. If it is empty and ``extra_key`` is configured, that key
is read from connection ``extra``. Otherwise, common extra keys are checked.
"""

def __init__(self, config: dict[str, Any], default_conn_id: str | None = None) -> None:
self.conn_id = config.get("conn_id") or default_conn_id or ""
self.extra_key = config.get("extra_key")
if not self.conn_id:
raise OpenLineageAirflowConnectionAuthError(
"OpenLineage `airflow_connection_api_key` auth requires a non-empty `conn_id`."
)

def get_api_key(self) -> str:
connection = BaseHook.get_connection(self.conn_id)
if connection.password:
return connection.password.strip()
api_key = self._get_api_key_from_extra(connection.extra_dejson)
if api_key:
return api_key

raise OpenLineageAirflowConnectionAuthError(
"OpenLineage `airflow_connection_api_key` auth could not find a token in connection "
f"`{self.conn_id}`. Expected connection password or token in connection extra."
)

def _get_api_key_from_extra(self, extra: dict[str, Any]) -> str | None:
if self.extra_key:
value = extra.get(self.extra_key)
return str(value).strip() if value else None

for key in _DEFAULT_EXTRA_KEYS:
value = extra.get(key)
if value:
return str(value).strip()
return None


def resolve_airflow_connection_auth(config: dict[str, Any] | None, config_conn_id: str | None = None) -> None:
Comment thread
VladaZakharova marked this conversation as resolved.
"""
Read the API key from an Airflow connection and put it into the OpenLineage config.

OpenLineage config can contain one transport, a composite transport, or composite transports
nested inside each other. This function walks through that structure and updates every matching
``auth`` block in place.

This only makes sense for HTTP transports: ``airflow_connection_api_key`` is replaced with
``{"type": "api_key", "apiKey": ...}``.
"""
if not isinstance(config, dict):
return

for key, value in config.items():
if (
key == "auth"
and isinstance(value, dict)
and value.get("type") == AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE
):
provider = AirflowConnectionTokenProvider(value, default_conn_id=config_conn_id)
config[key] = {"type": "api_key", "apiKey": provider.get_api_key()}
elif key == "transports" and isinstance(value, list):
for item in value:
resolve_airflow_connection_auth(item, config_conn_id=config_conn_id)
else:
resolve_airflow_connection_auth(value, config_conn_id=config_conn_id)
Loading
Loading