Skip to content

Commit

Permalink
feat: Add OpenLineage metrics for event_size and extraction time (#37797
Browse files Browse the repository at this point in the history
)

Co-authored-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
kacpermuda and mobuchowski committed Mar 1, 2024
1 parent e904f67 commit 9848954
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 19 deletions.
29 changes: 21 additions & 8 deletions airflow/providers/openlineage/plugins/adapter.py
Expand Up @@ -18,6 +18,7 @@

import os
import uuid
from contextlib import ExitStack
from typing import TYPE_CHECKING

import yaml
Expand Down Expand Up @@ -118,17 +119,29 @@ def build_task_instance_run_id(dag_id, task_id, execution_date, try_number):
)

def emit(self, event: RunEvent):
"""Emit OpenLineage event.
:param event: Event to be emitted.
:return: Redacted Event.
"""
if not self._client:
self._client = self.get_or_create_openlineage_client()
redacted_event: RunEvent = self._redacter.redact(event, max_depth=20) # type: ignore[assignment]
event_type = event.eventType.value.lower()
transport_type = f"{self._client.transport.kind}".lower()

try:
with Stats.timer("ol.emit.attempts"):
return self._client.emit(redacted_event)
with ExitStack() as stack:
stack.enter_context(Stats.timer(f"ol.emit.attempts.{event_type}.{transport_type}"))
stack.enter_context(Stats.timer("ol.emit.attempts"))
self._client.emit(redacted_event)
except Exception as e:
Stats.incr("ol.emit.failed")
self.log.warning("Failed to emit OpenLineage event of id %s", event.run.runId)
self.log.debug("OpenLineage emission failure: %s", e)

return redacted_event

def start_task(
self,
run_id: str,
Expand All @@ -143,7 +156,7 @@ def start_task(
owners: list[str],
task: OperatorLineage | None,
run_facets: dict[str, BaseFacet] | None = None, # Custom run facets
):
) -> RunEvent:
"""
Emit openlineage event of type START.
Expand Down Expand Up @@ -198,7 +211,7 @@ def start_task(
outputs=task.outputs if task else [],
producer=_PRODUCER,
)
self.emit(event)
return self.emit(event)

def complete_task(
self,
Expand All @@ -208,7 +221,7 @@ def complete_task(
parent_run_id: str | None,
end_time: str,
task: OperatorLineage,
):
) -> RunEvent:
"""
Emit openlineage event of type COMPLETE.
Expand All @@ -235,7 +248,7 @@ def complete_task(
outputs=task.outputs,
producer=_PRODUCER,
)
self.emit(event)
return self.emit(event)

def fail_task(
self,
Expand All @@ -245,7 +258,7 @@ def fail_task(
parent_run_id: str | None,
end_time: str,
task: OperatorLineage,
):
) -> RunEvent:
"""
Emit openlineage event of type FAIL.
Expand All @@ -272,7 +285,7 @@ def fail_task(
outputs=task.outputs,
producer=_PRODUCER,
)
self.emit(event)
return self.emit(event)

def dag_started(
self,
Expand Down
46 changes: 35 additions & 11 deletions airflow/providers/openlineage/plugins/listener.py
Expand Up @@ -21,15 +21,18 @@
from datetime import datetime
from typing import TYPE_CHECKING

from openlineage.client.serde import Serde

from airflow.listeners import hookimpl
from airflow.providers.openlineage.extractors import ExtractorManager
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState
from airflow.providers.openlineage.utils.utils import (
get_airflow_run_facet,
get_custom_facets,
get_job_name,
print_warning,
)
from airflow.stats import Stats
from airflow.utils.timeout import timeout

if TYPE_CHECKING:
Expand Down Expand Up @@ -82,16 +85,19 @@ def on_running():
execution_date=task_instance.execution_date,
try_number=task_instance.try_number,
)
event_type = RunState.RUNNING.value.lower()
operator_name = task.task_type.lower()

task_metadata = self.extractor_manager.extract_metadata(dagrun, task)
with Stats.timer(f"ol.extract.{event_type}.{operator_name}"):
task_metadata = self.extractor_manager.extract_metadata(dagrun, task)

start_date = task_instance.start_date if task_instance.start_date else datetime.now()
data_interval_start = (
dagrun.data_interval_start.isoformat() if dagrun.data_interval_start else None
)
data_interval_end = dagrun.data_interval_end.isoformat() if dagrun.data_interval_end else None

self.adapter.start_task(
redacted_event = self.adapter.start_task(
run_id=task_uuid,
job_name=get_job_name(task),
job_description=dag.description,
Expand All @@ -108,6 +114,10 @@ def on_running():
**get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid),
},
)
Stats.gauge(
f"ol.event.size.{event_type}.{operator_name}",
len(Serde.to_json(redacted_event).encode("utf-8")),
)

on_running()

Expand All @@ -129,21 +139,28 @@ def on_success():
execution_date=task_instance.execution_date,
try_number=task_instance.try_number - 1,
)
event_type = RunState.COMPLETE.value.lower()
operator_name = task.task_type.lower()

task_metadata = self.extractor_manager.extract_metadata(
dagrun, task, complete=True, task_instance=task_instance
)
with Stats.timer(f"ol.extract.{event_type}.{operator_name}"):
task_metadata = self.extractor_manager.extract_metadata(
dagrun, task, complete=True, task_instance=task_instance
)

end_date = task_instance.end_date if task_instance.end_date else datetime.now()

self.adapter.complete_task(
redacted_event = self.adapter.complete_task(
run_id=task_uuid,
job_name=get_job_name(task),
parent_job_name=dag.dag_id,
parent_run_id=parent_run_id,
end_time=end_date.isoformat(),
task=task_metadata,
)
Stats.gauge(
f"ol.event.size.{event_type}.{operator_name}",
len(Serde.to_json(redacted_event).encode("utf-8")),
)

on_success()

Expand All @@ -165,21 +182,28 @@ def on_failure():
execution_date=task_instance.execution_date,
try_number=task_instance.try_number,
)
event_type = RunState.FAIL.value.lower()
operator_name = task.task_type.lower()

task_metadata = self.extractor_manager.extract_metadata(
dagrun, task, complete=True, task_instance=task_instance
)
with Stats.timer(f"ol.extract.{event_type}.{operator_name}"):
task_metadata = self.extractor_manager.extract_metadata(
dagrun, task, complete=True, task_instance=task_instance
)

end_date = task_instance.end_date if task_instance.end_date else datetime.now()

self.adapter.fail_task(
redacted_event = self.adapter.fail_task(
run_id=task_uuid,
job_name=get_job_name(task),
parent_job_name=dag.dag_id,
parent_run_id=parent_run_id,
end_time=end_date.isoformat(),
task=task_metadata,
)
Stats.gauge(
f"ol.event.size.{event_type}.{operator_name}",
len(Serde.to_json(redacted_event).encode("utf-8")),
)

on_failure()

Expand Down

0 comments on commit 9848954

Please sign in to comment.