From 9848954e789b46c483f5c912fd2cdd5c3bc3cbd6 Mon Sep 17 00:00:00 2001 From: Kacper Muda Date: Fri, 1 Mar 2024 14:54:20 +0100 Subject: [PATCH] feat: Add OpenLineage metrics for event_size and extraction time (#37797) Co-authored-by: Maciej Obuchowski --- .../providers/openlineage/plugins/adapter.py | 29 ++++++++---- .../providers/openlineage/plugins/listener.py | 46 ++++++++++++++----- 2 files changed, 56 insertions(+), 19 deletions(-) diff --git a/airflow/providers/openlineage/plugins/adapter.py b/airflow/providers/openlineage/plugins/adapter.py index b6b8eeb4dc70b..0ea5d7213b1d9 100644 --- a/airflow/providers/openlineage/plugins/adapter.py +++ b/airflow/providers/openlineage/plugins/adapter.py @@ -18,6 +18,7 @@ import os import uuid +from contextlib import ExitStack from typing import TYPE_CHECKING import yaml @@ -118,17 +119,29 @@ def build_task_instance_run_id(dag_id, task_id, execution_date, try_number): ) def emit(self, event: RunEvent): + """Emit OpenLineage event. + + :param event: Event to be emitted. + :return: Redacted Event. + """ if not self._client: self._client = self.get_or_create_openlineage_client() redacted_event: RunEvent = self._redacter.redact(event, max_depth=20) # type: ignore[assignment] + event_type = event.eventType.value.lower() + transport_type = f"{self._client.transport.kind}".lower() + try: - with Stats.timer("ol.emit.attempts"): - return self._client.emit(redacted_event) + with ExitStack() as stack: + stack.enter_context(Stats.timer(f"ol.emit.attempts.{event_type}.{transport_type}")) + stack.enter_context(Stats.timer("ol.emit.attempts")) + self._client.emit(redacted_event) except Exception as e: Stats.incr("ol.emit.failed") self.log.warning("Failed to emit OpenLineage event of id %s", event.run.runId) self.log.debug("OpenLineage emission failure: %s", e) + return redacted_event + def start_task( self, run_id: str, @@ -143,7 +156,7 @@ def start_task( owners: list[str], task: OperatorLineage | None, run_facets: dict[str, BaseFacet] | None = None, # Custom run facets - ): + ) -> RunEvent: """ Emit openlineage event of type START. @@ -198,7 +211,7 @@ def start_task( outputs=task.outputs if task else [], producer=_PRODUCER, ) - self.emit(event) + return self.emit(event) def complete_task( self, @@ -208,7 +221,7 @@ def complete_task( parent_run_id: str | None, end_time: str, task: OperatorLineage, - ): + ) -> RunEvent: """ Emit openlineage event of type COMPLETE. @@ -235,7 +248,7 @@ def complete_task( outputs=task.outputs, producer=_PRODUCER, ) - self.emit(event) + return self.emit(event) def fail_task( self, @@ -245,7 +258,7 @@ def fail_task( parent_run_id: str | None, end_time: str, task: OperatorLineage, - ): + ) -> RunEvent: """ Emit openlineage event of type FAIL. @@ -272,7 +285,7 @@ def fail_task( outputs=task.outputs, producer=_PRODUCER, ) - self.emit(event) + return self.emit(event) def dag_started( self, diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 8c731dd6ffe1d..af5405f92a6d8 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -21,15 +21,18 @@ from datetime import datetime from typing import TYPE_CHECKING +from openlineage.client.serde import Serde + from airflow.listeners import hookimpl from airflow.providers.openlineage.extractors import ExtractorManager -from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter +from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState from airflow.providers.openlineage.utils.utils import ( get_airflow_run_facet, get_custom_facets, get_job_name, print_warning, ) +from airflow.stats import Stats from airflow.utils.timeout import timeout if TYPE_CHECKING: @@ -82,8 +85,11 @@ def on_running(): execution_date=task_instance.execution_date, try_number=task_instance.try_number, ) + event_type = RunState.RUNNING.value.lower() + operator_name = task.task_type.lower() - task_metadata = self.extractor_manager.extract_metadata(dagrun, task) + with Stats.timer(f"ol.extract.{event_type}.{operator_name}"): + task_metadata = self.extractor_manager.extract_metadata(dagrun, task) start_date = task_instance.start_date if task_instance.start_date else datetime.now() data_interval_start = ( @@ -91,7 +97,7 @@ def on_running(): ) data_interval_end = dagrun.data_interval_end.isoformat() if dagrun.data_interval_end else None - self.adapter.start_task( + redacted_event = self.adapter.start_task( run_id=task_uuid, job_name=get_job_name(task), job_description=dag.description, @@ -108,6 +114,10 @@ def on_running(): **get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid), }, ) + Stats.gauge( + f"ol.event.size.{event_type}.{operator_name}", + len(Serde.to_json(redacted_event).encode("utf-8")), + ) on_running() @@ -129,14 +139,17 @@ def on_success(): execution_date=task_instance.execution_date, try_number=task_instance.try_number - 1, ) + event_type = RunState.COMPLETE.value.lower() + operator_name = task.task_type.lower() - task_metadata = self.extractor_manager.extract_metadata( - dagrun, task, complete=True, task_instance=task_instance - ) + with Stats.timer(f"ol.extract.{event_type}.{operator_name}"): + task_metadata = self.extractor_manager.extract_metadata( + dagrun, task, complete=True, task_instance=task_instance + ) end_date = task_instance.end_date if task_instance.end_date else datetime.now() - self.adapter.complete_task( + redacted_event = self.adapter.complete_task( run_id=task_uuid, job_name=get_job_name(task), parent_job_name=dag.dag_id, @@ -144,6 +157,10 @@ def on_success(): end_time=end_date.isoformat(), task=task_metadata, ) + Stats.gauge( + f"ol.event.size.{event_type}.{operator_name}", + len(Serde.to_json(redacted_event).encode("utf-8")), + ) on_success() @@ -165,14 +182,17 @@ def on_failure(): execution_date=task_instance.execution_date, try_number=task_instance.try_number, ) + event_type = RunState.FAIL.value.lower() + operator_name = task.task_type.lower() - task_metadata = self.extractor_manager.extract_metadata( - dagrun, task, complete=True, task_instance=task_instance - ) + with Stats.timer(f"ol.extract.{event_type}.{operator_name}"): + task_metadata = self.extractor_manager.extract_metadata( + dagrun, task, complete=True, task_instance=task_instance + ) end_date = task_instance.end_date if task_instance.end_date else datetime.now() - self.adapter.fail_task( + redacted_event = self.adapter.fail_task( run_id=task_uuid, job_name=get_job_name(task), parent_job_name=dag.dag_id, @@ -180,6 +200,10 @@ def on_failure(): end_time=end_date.isoformat(), task=task_metadata, ) + Stats.gauge( + f"ol.event.size.{event_type}.{operator_name}", + len(Serde.to_json(redacted_event).encode("utf-8")), + ) on_failure()