Skip to content

Commit

Permalink
airflow: remove redundant operator information from facets
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Muda <mudakacper@gmail.com>
  • Loading branch information
kacpermuda committed Mar 15, 2024
1 parent b90ca67 commit a187a40
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
from typing import Dict, List, Optional

from openlineage.airflow.extractors.base import BaseExtractor, TaskMetadata
from openlineage.airflow.facets import (
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from openlineage.airflow.utils import get_unknown_source_attribute_run_facet
from openlineage.client.facet import SourceCodeJobFacet


Expand Down Expand Up @@ -43,17 +40,7 @@ def extract(self) -> Optional[TaskMetadata]:
return TaskMetadata(
name=f"{self.operator.dag_id}.{self.operator.task_id}",
job_facets=job_facet,
run_facets={
# The BashOperator is recorded as an "unknownSource" even though we have an
# extractor, as the <i>data lineage</i> cannot be determined from the operator
# directly.
"unknownSourceAttribute": UnknownOperatorAttributeRunFacet(
unknownItems=[
UnknownOperatorInstance(
name="BashOperator",
properties={attr: value for attr, value in self.operator.__dict__.items()},
)
]
)
},
# The BashOperator is recorded as an "unknownSource" even though we have an extractor,
# as the <i>data lineage</i> cannot be determined from the operator directly.
run_facets=get_unknown_source_attribute_run_facet(task=self.operator, name="BashOperator"),
)
17 changes: 2 additions & 15 deletions integration/airflow/openlineage/airflow/extractors/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@
from typing import List, Optional, Type

from openlineage.airflow.extractors import BaseExtractor, Extractors, TaskMetadata
from openlineage.airflow.facets import (
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from openlineage.airflow.utils import get_job_name, get_operator_class
from openlineage.airflow.utils import get_job_name, get_operator_class, get_unknown_source_attribute_run_facet


class ExtractorManager:
Expand Down Expand Up @@ -75,16 +71,7 @@ def extract_metadata(
# Only include the unkonwnSourceAttribute facet if there is no extractor
task_metadata = TaskMetadata(
name=get_job_name(task),
run_facets={
"unknownSourceAttribute": UnknownOperatorAttributeRunFacet(
unknownItems=[
UnknownOperatorInstance(
name=get_operator_class(task).__name__,
properties={attr: value for attr, value in task.__dict__.items()},
)
]
)
},
run_facets=get_unknown_source_attribute_run_facet(task=task),
)
inlets = task.get_inlet_defs()
outlets = task.get_outlet_defs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
from typing import Callable, Dict, List, Optional

from openlineage.airflow.extractors.base import BaseExtractor, TaskMetadata
from openlineage.airflow.facets import (
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from openlineage.airflow.utils import get_unknown_source_attribute_run_facet
from openlineage.client.facet import SourceCodeJobFacet


Expand Down Expand Up @@ -44,19 +41,9 @@ def extract(self) -> Optional[TaskMetadata]:
return TaskMetadata(
name=f"{self.operator.dag_id}.{self.operator.task_id}",
job_facets=job_facet,
run_facets={
# The BashOperator is recorded as an "unknownSource" even though we have an
# extractor, as the <i>data lineage</i> cannot be determined from the operator
# directly.
"unknownSourceAttribute": UnknownOperatorAttributeRunFacet(
unknownItems=[
UnknownOperatorInstance(
name="PythonOperator",
properties={attr: value for attr, value in self.operator.__dict__.items()},
)
]
)
},
# The PythonOperator is recorded as an "unknownSource" even though we have an extractor,
# as the <i>data lineage</i> cannot be determined from the operator directly.
run_facets=get_unknown_source_attribute_run_facet(task=self.operator, name="PythonOperator"),
)

def get_source_code(self, callable: Callable) -> Optional[str]:
Expand Down
8 changes: 2 additions & 6 deletions integration/airflow/openlineage/airflow/facets.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,11 @@ class AirflowVersionRunFacet(BaseFacet):

@classmethod
def from_dagrun_and_task(cls, dagrun, task):
# task.__dict__ may contain values uncastable to str
from openlineage.airflow.utils import get_operator_class, to_json_encodable

task_info = to_json_encodable(task)
task_info["dag_run"] = to_json_encodable(dagrun)
from openlineage.airflow.utils import get_operator_class

return cls(
operator=f"{get_operator_class(task).__module__}.{get_operator_class(task).__name__}",
taskInfo=task_info,
taskInfo={},
airflowVersion=AIRFLOW_VERSION,
openlineageAirflowVersion=OPENLINEAGE_AIRFLOW_VERSION,
)
Expand Down
74 changes: 50 additions & 24 deletions integration/airflow/openlineage/airflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
AirflowRunArgsRunFacet,
AirflowRunFacet,
AirflowVersionRunFacet,
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from openlineage.client.utils import RedactMixin
from pendulum import from_timestamp
Expand Down Expand Up @@ -292,23 +294,35 @@ class TaskInstanceInfo(InfoJsonEncodable):

class TaskInfo(InfoJsonEncodable):
renames = {
"_BaseOperator__init_kwargs": "args",
"_BaseOperator__from_mapped": "mapped",
"_downstream_task_ids": "downstream_task_ids",
"_upstream_task_ids": "upstream_task_ids",
"_is_setup": "is_setup",
"_is_teardown": "is_teardown",
}
excludes = [
"_BaseOperator__instantiated",
"_dag",
"_hook",
"_log",
"_outlets",
"_inlets",
"_lock_for_execution",
"handler",
"params",
"python_callable",
"retry_delay",
includes = [
"task_id",
"depends_on_past",
"downstream_task_ids",
"execution_timeout",
"executor_config",
"ignore_first_depends_on_past",
"max_active_tis_per_dag",
"max_active_tis_per_dagrun",
"max_retry_delay",
"multiple_outputs",
"owner",
"priority_weight",
"queue",
"retries",
"retry_exponential_backoff",
"run_as_user",
"task_id",
"trigger_rule",
"upstream_task_ids",
"wait_for_downstream",
"wait_for_past_depends_before_skipping",
"weight_rule",
]
casts = {
"operator_class": lambda task: f"{get_operator_class(task).__module__}.{get_operator_class(task).__name__}", # noqa: E501
Expand Down Expand Up @@ -340,18 +354,30 @@ def get_airflow_run_facet(
task_uuid: str,
):
return {
"airflow": json.loads(
json.dumps(
attr.asdict(
AirflowRunFacet(
dag=DagInfo(dag),
dagRun=DagRunInfo(dag_run),
taskInstance=TaskInstanceInfo(task_instance),
task=TaskInfo(task),
taskUuid=task_uuid,
"airflow": attr.asdict(
AirflowRunFacet(
dag=DagInfo(dag),
dagRun=DagRunInfo(dag_run),
taskInstance=TaskInstanceInfo(task_instance),
task=TaskInfo(task),
taskUuid=task_uuid,
)
)
}


def get_unknown_source_attribute_run_facet(task: "BaseOperator", name: Optional[str] = None):
if not name:
name = get_operator_class(task).__name__
return {
"unknownSourceAttribute": attr.asdict(
UnknownOperatorAttributeRunFacet(
unknownItems=[
UnknownOperatorInstance(
name=name,
properties=TaskInfo(task),
)
),
default=str,
]
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
},
"run": {
"facets": {
"airflow_version": {
"taskInfo": "{{ 'true' if result == {} else result }}"
},
"airflow_runArgs": {
"externalTrigger": false
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
}],
"run": {
"facets": {
"airflow_version": {
"taskInfo": "{{ 'true' if result == {} else result }}"
},
"parent": {
"job": {
"name": "parentjob",
Expand Down
13 changes: 6 additions & 7 deletions integration/airflow/tests/integration/requests/secrets.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
"run": {
"facets": {
"airflow_version": {
"taskInfo": "{{ not_match(result, '1500100900') }}"
"taskInfo": "{{ 'true' if result == {} else result }}"
},
"airflow": {
"task": "{{ not_match(result, '1500100900') }}"
},
"unknownSourceAttribute": {
"unknownItems": [
{
"name": "SecretsOperator",
"properties": {
"password": "***"
}
"properties": "{{ not_match(result, 'password') }}"
}
]
}
Expand All @@ -33,9 +34,7 @@
"unknownItems": [
{
"name": "SecretsOperator",
"properties": {
"password": "***"
}
"properties": "{{ not_match(result, 'password') }}"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
},
"run": {
"facets": {
"airflow_version": {
"taskInfo": "{{ 'true' if result == {} else result }}"
},
"airflow_runArgs": {
"externalTrigger": false
},
Expand Down

0 comments on commit a187a40

Please sign in to comment.