Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dev/breeze/tests/test_selective_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1941,7 +1941,7 @@ def test_expected_output_push(
{
"selected-providers-list-as-string": "amazon common.compat common.io common.sql "
"databricks dbt.cloud ftp google microsoft.mssql mysql "
"openlineage oracle postgres sftp snowflake trino",
"openlineage oracle postgres sftp snowflake standard trino",
"all-python-versions": f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
"all-python-versions-list-as-string": DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
"ci-image-build": "true",
Expand All @@ -1952,18 +1952,18 @@ def test_expected_output_push(
"docs-build": "true",
"docs-list-as-string": "apache-airflow task-sdk amazon common.compat common.io common.sql "
"databricks dbt.cloud ftp google microsoft.mssql mysql "
"openlineage oracle postgres sftp snowflake trino",
"openlineage oracle postgres sftp snowflake standard trino",
"skip-prek-hooks": ALL_SKIPPED_COMMITS_ON_NO_CI_IMAGE,
"run-kubernetes-tests": "false",
"upgrade-to-newer-dependencies": "false",
"core-test-types-list-as-strings-in-json": ALL_CI_SELECTIVE_TEST_TYPES_AS_JSON,
"providers-test-types-list-as-strings-in-json": json.dumps(
[
{
"description": "amazon...google",
"description": "amazon...standard",
"test_types": "Providers[amazon] Providers[common.compat,common.io,common.sql,"
"databricks,dbt.cloud,ftp,microsoft.mssql,mysql,openlineage,oracle,"
"postgres,sftp,snowflake,trino] Providers[google]",
"postgres,sftp,snowflake,trino] Providers[google] Providers[standard]",
}
]
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,7 @@ class TaskInfo(InfoJsonEncodable):
"sla",
"task_id",
"trigger_dag_id",
"trigger_run_id",
"external_dag_id",
"external_task_id",
"trigger_rule",
Expand Down
8 changes: 8 additions & 0 deletions providers/standard/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,20 @@ dependencies = [
"apache-airflow-providers-common-compat>=1.8.0",
]

# The optional dependencies should be modified in place in the generated file
# Any change in the dependencies is preserved when the file is regenerated
[project.optional-dependencies]
"openlineage" = [
"apache-airflow-providers-openlineage"
]

[dependency-groups]
dev = [
"apache-airflow",
"apache-airflow-task-sdk",
"apache-airflow-devel-common",
"apache-airflow-providers-common-compat",
"apache-airflow-providers-openlineage",
# Additional devel dependencies (do not remove this line and add extra development dependencies)
"apache-airflow-providers-mysql",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from airflow.models.serialized_dag import SerializedDagModel
from airflow.providers.common.compat.sdk import BaseOperatorLink, XCom, timezone
from airflow.providers.standard.triggers.external_task import DagStateTrigger
from airflow.providers.standard.utils.openlineage import safe_inject_openlineage_properties_into_dagrun_conf
from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
Expand Down Expand Up @@ -136,6 +137,12 @@ class TriggerDagRunOperator(BaseOperator):
:param fail_when_dag_is_paused: If the dag to trigger is paused, DagIsPaused will be raised.
:param deferrable: If waiting for completion, whether or not to defer the task until done,
default is ``False``.
:param openlineage_inject_parent_info: whether to include OpenLineage metadata about the parent task
in the triggered DAG run's conf, enabling improved lineage tracking. The metadata is only injected
if OpenLineage is enabled and running. This option does not modify any other part of the conf,
and existing OpenLineage-related settings in the conf will not be overwritten. The injection process
is safeguarded against exceptions - if any error occurs during metadata injection, it is gracefully
handled and the conf remains unchanged - so it's safe to use. Default is ``True``
"""

template_fields: Sequence[str] = (
Expand Down Expand Up @@ -165,6 +172,7 @@ def __init__(
skip_when_already_exists: bool = False,
fail_when_dag_is_paused: bool = False,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
openlineage_inject_parent_info: bool = True,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -184,6 +192,7 @@ def __init__(
self.failed_states = [DagRunState.FAILED]
self.skip_when_already_exists = skip_when_already_exists
self.fail_when_dag_is_paused = fail_when_dag_is_paused
self.openlineage_inject_parent_info = openlineage_inject_parent_info
self._defer = deferrable
self.logical_date = logical_date
if logical_date is NOTSET:
Expand Down Expand Up @@ -214,6 +223,12 @@ def execute(self, context: Context):
except (TypeError, JSONDecodeError):
raise ValueError("conf parameter should be JSON Serializable %s", self.conf)

if self.openlineage_inject_parent_info:
self.log.debug("Checking if OpenLineage information can be safely injected into dagrun conf.")
self.conf = safe_inject_openlineage_properties_into_dagrun_conf(
dr_conf=self.conf, ti=context.get("ti")
)

if self.trigger_run_id:
run_id = str(self.trigger_run_id)
else:
Expand All @@ -226,6 +241,9 @@ def execute(self, context: Context):
else:
run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_logical_date or timezone.utcnow()) # type: ignore[misc,call-arg]

# Save run_id as task attribute - to be used by listeners
self.trigger_run_id = run_id

if self.fail_when_dag_is_paused:
dag_model = DagModel.get_current(self.trigger_dag_id)
if not dag_model:
Expand All @@ -237,9 +255,13 @@ def execute(self, context: Context):
raise AirflowException(f"Dag {self.trigger_dag_id} is paused")

if AIRFLOW_V_3_0_PLUS:
self._trigger_dag_af_3(context=context, run_id=run_id, parsed_logical_date=parsed_logical_date)
self._trigger_dag_af_3(
context=context, run_id=self.trigger_run_id, parsed_logical_date=parsed_logical_date
)
else:
self._trigger_dag_af_2(context=context, run_id=run_id, parsed_logical_date=parsed_logical_date)
self._trigger_dag_af_2(
context=context, run_id=self.trigger_run_id, parsed_logical_date=parsed_logical_date
)

def _trigger_dag_af_3(self, context, run_id, parsed_logical_date):
from airflow.exceptions import DagRunTriggerException
Expand Down Expand Up @@ -327,6 +349,10 @@ def _trigger_dag_af_2(self, context, run_id, parsed_logical_date):
return

def execute_complete(self, context: Context, event: tuple[str, dict[str, Any]]):
run_ids = event[1]["run_ids"]
# Re-set as attribute after coming back from deferral - to be used by listeners.
# Just a safety check on length, we should always have single run_id here.
self.trigger_run_id = run_ids[0] if len(run_ids) == 1 else None
if AIRFLOW_V_3_0_PLUS:
self._trigger_dag_run_af_3_execute_complete(event=event)
else:
Expand Down
185 changes: 185 additions & 0 deletions providers/standard/src/airflow/providers/standard/utils/openlineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowOptionalProviderFeatureException
from airflow.providers.common.compat.openlineage.check import require_openlineage_version

if TYPE_CHECKING:
from airflow.models import TaskInstance
from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI

log = logging.getLogger(__name__)

OPENLINEAGE_PROVIDER_MIN_VERSION = "2.8.0"


def _is_openlineage_provider_accessible() -> bool:
"""
Check if the OpenLineage provider is accessible.

This function attempts to import the necessary OpenLineage modules and checks if the provider
is enabled and the listener is available.

Returns:
bool: True if the OpenLineage provider is accessible, False otherwise.
"""
try:
from airflow.providers.openlineage.conf import is_disabled
from airflow.providers.openlineage.plugins.listener import get_openlineage_listener
except (ImportError, AttributeError):
log.debug("OpenLineage provider could not be imported.")
return False

if is_disabled():
log.debug("OpenLineage provider is disabled.")
return False

if not get_openlineage_listener():
log.debug("OpenLineage listener could not be found.")
return False

return True


@require_openlineage_version(provider_min_version=OPENLINEAGE_PROVIDER_MIN_VERSION)
def _get_openlineage_parent_info(ti: TaskInstance | RuntimeTI) -> dict[str, str]:
"""Get OpenLineage metadata about the parent task."""
from airflow.providers.openlineage.plugins.macros import (
lineage_job_name,
lineage_job_namespace,
lineage_root_job_name,
lineage_root_job_namespace,
lineage_root_run_id,
lineage_run_id,
)

return {
"parentRunId": lineage_run_id(ti),
"parentJobName": lineage_job_name(ti),
"parentJobNamespace": lineage_job_namespace(),
"rootParentRunId": lineage_root_run_id(ti),
"rootParentJobName": lineage_root_job_name(ti),
"rootParentJobNamespace": lineage_root_job_namespace(ti),
}


def _inject_openlineage_parent_info_to_dagrun_conf(
dr_conf: dict | None, ol_parent_info: dict[str, str]
) -> dict:
"""
Safely inject OpenLineage parent and root run metadata into a DAG run configuration.

This function adds parent and root job/run identifiers derived from the given TaskInstance into the
`openlineage` section of the DAG run configuration. If an `openlineage` key already exists, it is
preserved and extended, but no existing parent or root identifiers are overwritten.

The function performs several safety checks:
- If conf is not a dictionary or contains a non-dict `openlineage` section, conf is returned unmodified.
- If `openlineage` section contains any parent/root lineage identifiers, conf is returned unmodified.

Args:
dr_conf: The original DAG run configuration dictionary or None.
ol_parent_info: OpenLineage metadata about the parent task

Returns:
A modified DAG run conf with injected OpenLineage parent and root metadata,
or the original conf if injection is not possible.
"""
current_ol_dr_conf = {}
if isinstance(dr_conf, dict) and dr_conf.get("openlineage"):
current_ol_dr_conf = dr_conf["openlineage"]
if not isinstance(current_ol_dr_conf, dict):
log.warning(
"Existing 'openlineage' section of DagRun conf is not a dictionary; "
"skipping injection of parent metadata."
)
return dr_conf
forbidden_keys = (
"parentRunId",
"parentJobName",
"parentJobNamespace",
"rootParentRunId",
"rootJobName",
"rootJobNamespace",
)

if existing := [k for k in forbidden_keys if k in current_ol_dr_conf]:
log.warning(
"'openlineage' section of DagRun conf already contains parent or root "
"identifiers: `%s`; skipping injection to avoid overwriting existing values.",
", ".join(existing),
)
return dr_conf

return {**(dr_conf or {}), **{"openlineage": {**ol_parent_info, **current_ol_dr_conf}}}


def safe_inject_openlineage_properties_into_dagrun_conf(
dr_conf: dict | None, ti: TaskInstance | RuntimeTI | None
) -> dict | None:
"""
Safely inject OpenLineage parent task metadata into a DAG run conf.

This function checks whether the OpenLineage provider is accessible and supports parent information
injection. If so, it enriches the DAG run conf with OpenLineage metadata about the parent task
to improve lineage tracking. The function does not modify other conf fields, will not overwrite
any existing content, and safely returns the original configuration if OpenLineage is unavailable,
unsupported, or an error occurs during injection.

:param dr_conf: The original DAG run configuration dictionary.
:param ti: The TaskInstance whose metadata may be injected.

:return: A potentially enriched DAG run conf with OpenLineage parent information,
or the original conf if injection was skipped or failed.
"""
try:
if ti is None:
log.debug("Task instance not provided - dagrun conf not modified.")
return dr_conf

if not _is_openlineage_provider_accessible():
log.debug("OpenLineage provider not accessible - dagrun conf not modified.")
return dr_conf

ol_parent_info = _get_openlineage_parent_info(ti=ti)

log.info("Injecting openlineage parent task information into dagrun conf.")
new_conf = _inject_openlineage_parent_info_to_dagrun_conf(
dr_conf=dr_conf.copy() if isinstance(dr_conf, dict) else dr_conf,
ol_parent_info=ol_parent_info,
)
return new_conf
except AirflowOptionalProviderFeatureException:
log.info(
"Current OpenLineage provider version doesn't support parent information in "
"the DagRun conf. Upgrade `apache-airflow-providers-openlineage>=%s` to use this feature. "
"DagRun conf has not been modified by OpenLineage.",
OPENLINEAGE_PROVIDER_MIN_VERSION,
)
return dr_conf
except Exception as e:
log.warning(
"An error occurred while trying to inject OpenLineage information into dagrun conf. "
"DagRun conf has not been modified by OpenLineage. Error: %s",
str(e),
)
log.debug("Error details: ", exc_info=e)
return dr_conf
Loading
Loading