Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW] Remove redundant operator information from facets #2524

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
from typing import Dict, List, Optional

from openlineage.airflow.extractors.base import BaseExtractor, TaskMetadata
from openlineage.airflow.facets import (
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from openlineage.airflow.utils import get_unknown_source_attribute_run_facet
from openlineage.client.facet import SourceCodeJobFacet


Expand Down Expand Up @@ -43,17 +40,7 @@ def extract(self) -> Optional[TaskMetadata]:
return TaskMetadata(
name=f"{self.operator.dag_id}.{self.operator.task_id}",
job_facets=job_facet,
run_facets={
# The BashOperator is recorded as an "unknownSource" even though we have an
# extractor, as the <i>data lineage</i> cannot be determined from the operator
# directly.
"unknownSourceAttribute": UnknownOperatorAttributeRunFacet(
unknownItems=[
UnknownOperatorInstance(
name="BashOperator",
properties={attr: value for attr, value in self.operator.__dict__.items()},
)
]
)
},
# The BashOperator is recorded as an "unknownSource" even though we have an extractor,
# as the <i>data lineage</i> cannot be determined from the operator directly.
run_facets=get_unknown_source_attribute_run_facet(task=self.operator, name="BashOperator"),
)
17 changes: 2 additions & 15 deletions integration/airflow/openlineage/airflow/extractors/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@
from typing import List, Optional, Type

from openlineage.airflow.extractors import BaseExtractor, Extractors, TaskMetadata
from openlineage.airflow.facets import (
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from openlineage.airflow.utils import get_job_name, get_operator_class
from openlineage.airflow.utils import get_job_name, get_operator_class, get_unknown_source_attribute_run_facet


class ExtractorManager:
Expand Down Expand Up @@ -75,16 +71,7 @@ def extract_metadata(
# Only include the unkonwnSourceAttribute facet if there is no extractor
task_metadata = TaskMetadata(
name=get_job_name(task),
run_facets={
"unknownSourceAttribute": UnknownOperatorAttributeRunFacet(
unknownItems=[
UnknownOperatorInstance(
name=get_operator_class(task).__name__,
properties={attr: value for attr, value in task.__dict__.items()},
)
]
)
},
run_facets=get_unknown_source_attribute_run_facet(task=task),
)
inlets = task.get_inlet_defs()
outlets = task.get_outlet_defs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
from typing import Callable, Dict, List, Optional

from openlineage.airflow.extractors.base import BaseExtractor, TaskMetadata
from openlineage.airflow.facets import (
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from openlineage.airflow.utils import get_unknown_source_attribute_run_facet
from openlineage.client.facet import SourceCodeJobFacet


Expand Down Expand Up @@ -44,19 +41,9 @@ def extract(self) -> Optional[TaskMetadata]:
return TaskMetadata(
name=f"{self.operator.dag_id}.{self.operator.task_id}",
job_facets=job_facet,
run_facets={
# The BashOperator is recorded as an "unknownSource" even though we have an
# extractor, as the <i>data lineage</i> cannot be determined from the operator
# directly.
"unknownSourceAttribute": UnknownOperatorAttributeRunFacet(
unknownItems=[
UnknownOperatorInstance(
name="PythonOperator",
properties={attr: value for attr, value in self.operator.__dict__.items()},
)
]
)
},
# The PythonOperator is recorded as an "unknownSource" even though we have an extractor,
# as the <i>data lineage</i> cannot be determined from the operator directly.
run_facets=get_unknown_source_attribute_run_facet(task=self.operator, name="PythonOperator"),
)

def get_source_code(self, callable: Callable) -> Optional[str]:
Expand Down
8 changes: 2 additions & 6 deletions integration/airflow/openlineage/airflow/facets.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,11 @@ class AirflowVersionRunFacet(BaseFacet):

@classmethod
def from_dagrun_and_task(cls, dagrun, task):
# task.__dict__ may contain values uncastable to str
from openlineage.airflow.utils import get_operator_class, to_json_encodable

task_info = to_json_encodable(task)
task_info["dag_run"] = to_json_encodable(dagrun)
from openlineage.airflow.utils import get_operator_class

return cls(
operator=f"{get_operator_class(task).__module__}.{get_operator_class(task).__name__}",
taskInfo=task_info,
taskInfo={},
airflowVersion=AIRFLOW_VERSION,
openlineageAirflowVersion=OPENLINEAGE_AIRFLOW_VERSION,
)
Expand Down
74 changes: 50 additions & 24 deletions integration/airflow/openlineage/airflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
AirflowRunArgsRunFacet,
AirflowRunFacet,
AirflowVersionRunFacet,
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from openlineage.client.utils import RedactMixin
from pendulum import from_timestamp
Expand Down Expand Up @@ -292,23 +294,35 @@ class TaskInstanceInfo(InfoJsonEncodable):

class TaskInfo(InfoJsonEncodable):
renames = {
"_BaseOperator__init_kwargs": "args",
"_BaseOperator__from_mapped": "mapped",
"_downstream_task_ids": "downstream_task_ids",
"_upstream_task_ids": "upstream_task_ids",
"_is_setup": "is_setup",
"_is_teardown": "is_teardown",
}
excludes = [
"_BaseOperator__instantiated",
"_dag",
"_hook",
"_log",
"_outlets",
"_inlets",
"_lock_for_execution",
"handler",
"params",
"python_callable",
"retry_delay",
includes = [
"task_id",
"depends_on_past",
"downstream_task_ids",
"execution_timeout",
"executor_config",
"ignore_first_depends_on_past",
"max_active_tis_per_dag",
"max_active_tis_per_dagrun",
"max_retry_delay",
"multiple_outputs",
"owner",
"priority_weight",
"queue",
"retries",
"retry_exponential_backoff",
"run_as_user",
"task_id",
"trigger_rule",
"upstream_task_ids",
"wait_for_downstream",
"wait_for_past_depends_before_skipping",
"weight_rule",
]
casts = {
"operator_class": lambda task: f"{get_operator_class(task).__module__}.{get_operator_class(task).__name__}", # noqa: E501
Expand Down Expand Up @@ -340,18 +354,30 @@ def get_airflow_run_facet(
task_uuid: str,
):
return {
"airflow": json.loads(
json.dumps(
attr.asdict(
AirflowRunFacet(
dag=DagInfo(dag),
dagRun=DagRunInfo(dag_run),
taskInstance=TaskInstanceInfo(task_instance),
task=TaskInfo(task),
taskUuid=task_uuid,
"airflow": attr.asdict(
AirflowRunFacet(
dag=DagInfo(dag),
dagRun=DagRunInfo(dag_run),
taskInstance=TaskInstanceInfo(task_instance),
task=TaskInfo(task),
taskUuid=task_uuid,
)
)
}


def get_unknown_source_attribute_run_facet(task: "BaseOperator", name: Optional[str] = None):
if not name:
name = get_operator_class(task).__name__
return {
"unknownSourceAttribute": attr.asdict(
UnknownOperatorAttributeRunFacet(
unknownItems=[
UnknownOperatorInstance(
name=name,
properties=TaskInfo(task),
)
),
default=str,
]
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
},
"run": {
"facets": {
"airflow_version": {
"taskInfo": "{{ 'true' if result == {} else result }}"
},
"airflow_runArgs": {
"externalTrigger": false
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
}],
"run": {
"facets": {
"airflow_version": {
"taskInfo": "{{ 'true' if result == {} else result }}"
},
"parent": {
"job": {
"name": "parentjob",
Expand Down
13 changes: 6 additions & 7 deletions integration/airflow/tests/integration/requests/secrets.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
"run": {
"facets": {
"airflow_version": {
"taskInfo": "{{ not_match(result, '1500100900') }}"
"taskInfo": "{{ 'true' if result == {} else result }}"
},
"airflow": {
"task": "{{ not_match(result, '1500100900') }}"
},
"unknownSourceAttribute": {
"unknownItems": [
{
"name": "SecretsOperator",
"properties": {
"password": "***"
}
"properties": "{{ not_match(result, 'password') }}"
}
]
}
Expand All @@ -33,9 +34,7 @@
"unknownItems": [
{
"name": "SecretsOperator",
"properties": {
"password": "***"
}
"properties": "{{ not_match(result, 'password') }}"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
},
"run": {
"facets": {
"airflow_version": {
"taskInfo": "{{ 'true' if result == {} else result }}"
},
"airflow_runArgs": {
"externalTrigger": false
},
Expand Down