Skip to content

Commit

Permalink
fix: Remove redundant operator information from facets (#38264)
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Muda <mudakacper@gmail.com>
  • Loading branch information
kacpermuda committed Apr 4, 2024
1 parent 5337066 commit ecd6995
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 98 deletions.
22 changes: 4 additions & 18 deletions airflow/providers/openlineage/extractors/bash.py
Expand Up @@ -21,11 +21,7 @@

from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from airflow.providers.openlineage.plugins.facets import (
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from airflow.providers.openlineage.utils.utils import get_filtered_unknown_operator_keys
from airflow.providers.openlineage.utils.utils import get_unknown_source_attribute_run_facet

"""
:meta private:
Expand Down Expand Up @@ -60,19 +56,9 @@ def _execute_extraction(self) -> OperatorLineage | None:

return OperatorLineage(
job_facets=job_facets,
run_facets={
# The BashOperator is recorded as an "unknownSource" even though we have an
# extractor, as the <i>data lineage</i> cannot be determined from the operator
# directly.
"unknownSourceAttribute": UnknownOperatorAttributeRunFacet(
unknownItems=[
UnknownOperatorInstance(
name="BashOperator",
properties=get_filtered_unknown_operator_keys(self.operator),
)
]
)
},
# The BashOperator is recorded as an "unknownSource" even though we have an extractor,
# as the <i>data lineage</i> cannot be determined from the operator directly.
run_facets=get_unknown_source_attribute_run_facet(task=self.operator, name="BashOperator"),
)

def extract(self) -> OperatorLineage | None:
Expand Down
17 changes: 2 additions & 15 deletions airflow/providers/openlineage/extractors/manager.py
Expand Up @@ -24,11 +24,7 @@
from airflow.providers.openlineage.extractors.base import DefaultExtractor
from airflow.providers.openlineage.extractors.bash import BashExtractor
from airflow.providers.openlineage.extractors.python import PythonExtractor
from airflow.providers.openlineage.plugins.facets import (
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from airflow.providers.openlineage.utils.utils import get_filtered_unknown_operator_keys
from airflow.providers.openlineage.utils.utils import get_unknown_source_attribute_run_facet
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string

Expand Down Expand Up @@ -115,16 +111,7 @@ def extract_metadata(self, dagrun, task, complete: bool = False, task_instance=N

# Only include the unkonwnSourceAttribute facet if there is no extractor
task_metadata = OperatorLineage(
run_facets={
"unknownSourceAttribute": UnknownOperatorAttributeRunFacet(
unknownItems=[
UnknownOperatorInstance(
name=task.task_type,
properties=get_filtered_unknown_operator_keys(task),
)
]
)
},
run_facets=get_unknown_source_attribute_run_facet(task=task),
)
inlets = task.get_inlet_defs()
outlets = task.get_outlet_defs()
Expand Down
22 changes: 4 additions & 18 deletions airflow/providers/openlineage/extractors/python.py
Expand Up @@ -24,11 +24,7 @@

from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from airflow.providers.openlineage.plugins.facets import (
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from airflow.providers.openlineage.utils.utils import get_filtered_unknown_operator_keys
from airflow.providers.openlineage.utils.utils import get_unknown_source_attribute_run_facet

"""
:meta private:
Expand Down Expand Up @@ -63,19 +59,9 @@ def _execute_extraction(self) -> OperatorLineage | None:
}
return OperatorLineage(
job_facets=job_facet,
run_facets={
# The PythonOperator is recorded as an "unknownSource" even though we have an
# extractor, as the data lineage cannot be determined from the operator
# directly.
"unknownSourceAttribute": UnknownOperatorAttributeRunFacet(
unknownItems=[
UnknownOperatorInstance(
name="PythonOperator",
properties=get_filtered_unknown_operator_keys(self.operator),
)
]
)
},
# The PythonOperator is recorded as an "unknownSource" even though we have an extractor,
# as the <i>data lineage</i> cannot be determined from the operator directly.
run_facets=get_unknown_source_attribute_run_facet(task=self.operator, name="PythonOperator"),
)

def get_source_code(self, callable: Callable) -> str | None:
Expand Down
11 changes: 11 additions & 0 deletions airflow/providers/openlineage/plugins/facets.py
Expand Up @@ -17,10 +17,17 @@
from __future__ import annotations

from attrs import define
from deprecated import deprecated
from openlineage.client.facet import BaseFacet
from openlineage.client.utils import RedactMixin

from airflow.exceptions import AirflowProviderDeprecationWarning


@deprecated(
reason="To be removed in the next release. Make sure to use information from AirflowRunFacet instead.",
category=AirflowProviderDeprecationWarning,
)
@define(slots=False)
class AirflowMappedTaskRunFacet(BaseFacet):
"""Run facet containing information about mapped tasks."""
Expand Down Expand Up @@ -66,6 +73,10 @@ class UnknownOperatorInstance(RedactMixin):
_skip_redact = ["name", "type"]


@deprecated(
reason="To be removed in the next release. Make sure to use information from AirflowRunFacet instead.",
category=AirflowProviderDeprecationWarning,
)
@define(slots=False)
class UnknownOperatorAttributeRunFacet(BaseFacet):
"""RunFacet that describes unknown operators in an Airflow DAG."""
Expand Down
78 changes: 50 additions & 28 deletions airflow/providers/openlineage/utils/utils.py
Expand Up @@ -25,16 +25,15 @@
from typing import TYPE_CHECKING, Any, Iterable

import attrs
from attrs import asdict

# TODO: move this maybe to Airflow's logic?
from openlineage.client.utils import RedactMixin
from openlineage.client.utils import RedactMixin # TODO: move this maybe to Airflow's logic?

from airflow.models import DAG, BaseOperator, MappedOperator
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.facets import (
AirflowMappedTaskRunFacet,
AirflowRunFacet,
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from airflow.providers.openlineage.utils.selective_enable import (
is_dag_lineage_enabled,
Expand Down Expand Up @@ -196,23 +195,34 @@ class TaskInfo(InfoJsonEncodable):
"""Defines encoding BaseOperator/AbstractOperator object to JSON."""

renames = {
"_BaseOperator__init_kwargs": "args",
"_BaseOperator__from_mapped": "mapped",
"_downstream_task_ids": "downstream_task_ids",
"_upstream_task_ids": "upstream_task_ids",
"_is_setup": "is_setup",
"_is_teardown": "is_teardown",
}
excludes = [
"_BaseOperator__instantiated",
"_dag",
"_hook",
"_log",
"_outlets",
"_inlets",
"_lock_for_execution",
"handler",
"params",
"python_callable",
"retry_delay",
includes = [
"depends_on_past",
"downstream_task_ids",
"execution_timeout",
"executor_config",
"ignore_first_depends_on_past",
"max_active_tis_per_dag",
"max_active_tis_per_dagrun",
"max_retry_delay",
"multiple_outputs",
"owner",
"priority_weight",
"queue",
"retries",
"retry_exponential_backoff",
"run_as_user",
"task_id",
"trigger_rule",
"upstream_task_ids",
"wait_for_downstream",
"wait_for_past_depends_before_skipping",
"weight_rule",
]
casts = {
"operator_class": lambda task: task.task_type,
Expand Down Expand Up @@ -246,18 +256,30 @@ def get_airflow_run_facet(
task_uuid: str,
):
return {
"airflow": json.loads(
json.dumps(
asdict(
AirflowRunFacet(
dag=DagInfo(dag),
dagRun=DagRunInfo(dag_run),
taskInstance=TaskInstanceInfo(task_instance),
task=TaskInfo(task),
taskUuid=task_uuid,
"airflow": attrs.asdict(
AirflowRunFacet(
dag=DagInfo(dag),
dagRun=DagRunInfo(dag_run),
taskInstance=TaskInstanceInfo(task_instance),
task=TaskInfo(task),
taskUuid=task_uuid,
)
)
}


def get_unknown_source_attribute_run_facet(task: BaseOperator, name: str | None = None):
if not name:
name = get_operator_class(task).__name__
return {
"unknownSourceAttribute": attrs.asdict(
UnknownOperatorAttributeRunFacet(
unknownItems=[
UnknownOperatorInstance(
name=name,
properties=TaskInfo(task),
)
),
default=str,
]
)
)
}
Expand Down
35 changes: 25 additions & 10 deletions tests/providers/openlineage/extractors/test_bash.py
Expand Up @@ -17,16 +17,17 @@

from __future__ import annotations

import warnings
from datetime import datetime
from unittest.mock import patch

import pytest
from openlineage.client.facet import SourceCodeJobFacet

from airflow import DAG
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.operators.bash import BashOperator
from airflow.providers.openlineage.extractors.bash import BashExtractor
from airflow.providers.openlineage.plugins.facets import UnknownOperatorAttributeRunFacet

pytestmark = pytest.mark.db_test

Expand All @@ -44,20 +45,34 @@
@patch("airflow.providers.openlineage.conf.is_source_enabled")
def test_extract_operator_bash_command_disabled(mocked_source_enabled):
mocked_source_enabled.return_value = False
operator = BashOperator(task_id="taskid", bash_command="exit 0")
result = BashExtractor(operator).extract()
operator = BashOperator(task_id="taskid", bash_command="exit 0;", env={"A": "1"}, append_env=True)
with warnings.catch_warnings():
warnings.simplefilter("ignore", AirflowProviderDeprecationWarning)
result = BashExtractor(operator).extract()
assert "sourceCode" not in result.job_facets
assert "unknownSourceAttribute" in result.run_facets
unknown_items = result.run_facets["unknownSourceAttribute"]["unknownItems"]
assert len(unknown_items) == 1
assert unknown_items[0]["name"] == "BashOperator"
assert "bash_command" not in unknown_items[0]["properties"]
assert "env" not in unknown_items[0]["properties"]
assert "append_env" not in unknown_items[0]["properties"]
assert "task_id" in unknown_items[0]["properties"]


@patch("airflow.providers.openlineage.conf.is_source_enabled")
def test_extract_operator_bash_command_enabled(mocked_source_enabled):
mocked_source_enabled.return_value = True
operator = BashOperator(task_id="taskid", bash_command="exit 0")
result = BashExtractor(operator).extract()
assert result.job_facets["sourceCode"] == SourceCodeJobFacet("bash", "exit 0")
operator = BashOperator(task_id="taskid", bash_command="exit 0;", env={"A": "1"}, append_env=True)
with warnings.catch_warnings():
warnings.simplefilter("ignore", AirflowProviderDeprecationWarning)
result = BashExtractor(operator).extract()
assert result.job_facets["sourceCode"] == SourceCodeJobFacet("bash", "exit 0;")
assert "unknownSourceAttribute" in result.run_facets
unknown_operator_facet = result.run_facets["unknownSourceAttribute"]
assert isinstance(unknown_operator_facet, UnknownOperatorAttributeRunFacet)
assert len(unknown_operator_facet.unknownItems) == 1
assert unknown_operator_facet.unknownItems[0].name == "BashOperator"
unknown_items = result.run_facets["unknownSourceAttribute"]["unknownItems"]
assert len(unknown_items) == 1
assert unknown_items[0]["name"] == "BashOperator"
assert "bash_command" not in unknown_items[0]["properties"]
assert "env" not in unknown_items[0]["properties"]
assert "append_env" not in unknown_items[0]["properties"]
assert "task_id" in unknown_items[0]["properties"]
33 changes: 24 additions & 9 deletions tests/providers/openlineage/extractors/test_python.py
Expand Up @@ -19,17 +19,18 @@

import inspect
import os
import warnings
from datetime import datetime
from unittest.mock import patch

import pytest
from openlineage.client.facet import SourceCodeJobFacet

from airflow import DAG
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.openlineage.extractors.python import PythonExtractor
from airflow.providers.openlineage.plugins.facets import UnknownOperatorAttributeRunFacet

pytestmark = pytest.mark.db_test

Expand Down Expand Up @@ -65,20 +66,34 @@ def test_extract_source_code():
@patch("airflow.providers.openlineage.conf.is_source_enabled")
def test_extract_operator_code_disabled(mocked_source_enabled):
mocked_source_enabled.return_value = False
operator = PythonOperator(task_id="taskid", python_callable=callable)
result = PythonExtractor(operator).extract()
operator = PythonOperator(task_id="taskid", python_callable=callable, op_args=(1, 2), op_kwargs={"a": 1})
with warnings.catch_warnings():
warnings.simplefilter("ignore", AirflowProviderDeprecationWarning)
result = PythonExtractor(operator).extract()
assert "sourceCode" not in result.job_facets
assert "unknownSourceAttribute" in result.run_facets
unknown_items = result.run_facets["unknownSourceAttribute"]["unknownItems"]
assert len(unknown_items) == 1
assert unknown_items[0]["name"] == "PythonOperator"
assert "python_callable" not in unknown_items[0]["properties"]
assert "op_args" not in unknown_items[0]["properties"]
assert "op_kwargs" not in unknown_items[0]["properties"]
assert "task_id" in unknown_items[0]["properties"]


@patch("airflow.providers.openlineage.conf.is_source_enabled")
def test_extract_operator_code_enabled(mocked_source_enabled):
mocked_source_enabled.return_value = True
operator = PythonOperator(task_id="taskid", python_callable=callable)
result = PythonExtractor(operator).extract()
operator = PythonOperator(task_id="taskid", python_callable=callable, op_args=(1, 2), op_kwargs={"a": 1})
with warnings.catch_warnings():
warnings.simplefilter("ignore", AirflowProviderDeprecationWarning)
result = PythonExtractor(operator).extract()
assert result.job_facets["sourceCode"] == SourceCodeJobFacet("python", CODE)
assert "unknownSourceAttribute" in result.run_facets
unknown_operator_facet = result.run_facets["unknownSourceAttribute"]
assert isinstance(unknown_operator_facet, UnknownOperatorAttributeRunFacet)
assert len(unknown_operator_facet.unknownItems) == 1
assert unknown_operator_facet.unknownItems[0].name == "PythonOperator"
unknown_items = result.run_facets["unknownSourceAttribute"]["unknownItems"]
assert len(unknown_items) == 1
assert unknown_items[0]["name"] == "PythonOperator"
assert "python_callable" not in unknown_items[0]["properties"]
assert "op_args" not in unknown_items[0]["properties"]
assert "op_kwargs" not in unknown_items[0]["properties"]
assert "task_id" in unknown_items[0]["properties"]

0 comments on commit ecd6995

Please sign in to comment.