From b48cdf1c5c5722d7248495f1f97a2b73c851626e Mon Sep 17 00:00:00 2001 From: Kacper Muda Date: Wed, 23 Apr 2025 12:30:37 +0200 Subject: [PATCH] tests: Fix OpenLineage VariableTransport's initialization --- .../openlineage/tests/system/openlineage/conftest.py | 2 +- .../tests/system/openlineage/example_openlineage.json | 2 -- .../tests/system/openlineage/example_openlineage.py | 3 --- .../openlineage/example_openlineage_mapped_sensor.json | 8 ++++---- .../openlineage/tests/system/openlineage/operator.py | 2 +- .../tests/system/openlineage/transport/variable.py | 10 +++++++++- 6 files changed, 15 insertions(+), 12 deletions(-) diff --git a/providers/openlineage/tests/system/openlineage/conftest.py b/providers/openlineage/tests/system/openlineage/conftest.py index c4c0ee2571deb..4b45c39ba416b 100644 --- a/providers/openlineage/tests/system/openlineage/conftest.py +++ b/providers/openlineage/tests/system/openlineage/conftest.py @@ -30,7 +30,7 @@ def set_transport_variable(): lm.clear() listener = OpenLineageListener() listener.adapter._client = listener.adapter.get_or_create_openlineage_client() - listener.adapter._client.transport = VariableTransport() + listener.adapter._client.transport = VariableTransport({}) lm.add_listener(listener) yield lm.clear() diff --git a/providers/openlineage/tests/system/openlineage/example_openlineage.json b/providers/openlineage/tests/system/openlineage/example_openlineage.json index 3332d6650d406..a11ee42bd7363 100644 --- a/providers/openlineage/tests/system/openlineage/example_openlineage.json +++ b/providers/openlineage/tests/system/openlineage/example_openlineage.json @@ -54,7 +54,6 @@ "operator_class_path": "{{ result.endswith('.PythonOperator') }}", "wait_for_downstream": false, "retry_exponential_backoff": false, - "ignore_first_depends_on_past": false, "wait_for_past_depends_before_skipping": false }, "taskUuid": "{{ is_uuid(result) }}" @@ -144,7 +143,6 @@ "operator_class_path": "{{ result.endswith('.PythonOperator') }}", "wait_for_downstream": false, "retry_exponential_backoff": false, - "ignore_first_depends_on_past": false, "wait_for_past_depends_before_skipping": false }, "taskUuid": "{{ is_uuid(result) }}" diff --git a/providers/openlineage/tests/system/openlineage/example_openlineage.py b/providers/openlineage/tests/system/openlineage/example_openlineage.py index 5a1e5ddc22dcc..f823898f2544a 100644 --- a/providers/openlineage/tests/system/openlineage/example_openlineage.py +++ b/providers/openlineage/tests/system/openlineage/example_openlineage.py @@ -29,12 +29,9 @@ def do_nothing(): pass -default_args = {"start_date": datetime(2021, 1, 1), "retries": 1} - # Instantiate the DAG with DAG( "openlineage_basic_dag", - default_args=default_args, start_date=datetime(2021, 1, 1), schedule=None, catchup=False, diff --git a/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_sensor.json b/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_sensor.json index c112fb35448ad..60d9ca85ca850 100644 --- a/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_sensor.json +++ b/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_sensor.json @@ -6,7 +6,7 @@ "runId": "{{ is_uuid(result) }}" }, "job": { - "namespace": "default", + "namespace": "{{ result is string }}", "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds", "facets": { "jobType": { @@ -24,7 +24,7 @@ "runId": "{{ is_uuid(result) }}" }, "job": { - "namespace": "default", + "namespace": "{{ result is string }}", "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds", "facets": { "jobType": { @@ -42,7 +42,7 @@ "runId": "{{ is_uuid(result) }}" }, "job": { - "namespace": "default", + "namespace": "{{ result is string }}", "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds", "facets": { "jobType": { @@ -61,7 +61,7 @@ "runId": "{{ is_uuid(result) }}" }, "job": { - "namespace": "default", + "namespace": "{{ result is string }}", "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds", "facets": { "jobType": { diff --git a/providers/openlineage/tests/system/openlineage/operator.py b/providers/openlineage/tests/system/openlineage/operator.py index 740eaa044e8b5..670512ce42d1f 100644 --- a/providers/openlineage/tests/system/openlineage/operator.py +++ b/providers/openlineage/tests/system/openlineage/operator.py @@ -217,11 +217,11 @@ def execute(self, context: Context) -> None: self.event_templates[key] = event for key, template in self.event_templates.items(): # type: ignore[union-attr] send_event = Variable.get(key=key, deserialize_json=True) + self.log.info("Events: %s, %s, %s", send_event, len(send_event), type(send_event)) if len(send_event) == 0: raise ValueError(f"No event for key {key}") if len(send_event) != 1 and not self.multiple_events: raise ValueError(f"Expected one event for key {key}, got {len(send_event)}") - self.log.info("Events: %s, %s, %s", send_event, len(send_event), type(send_event)) if not match(template, json.loads(send_event[0]), self.env): raise ValueError("Event received does not match one specified in test") if self.delete: diff --git a/providers/openlineage/tests/system/openlineage/transport/variable.py b/providers/openlineage/tests/system/openlineage/transport/variable.py index beeeac5aff6d0..6a8b75d4bfba8 100644 --- a/providers/openlineage/tests/system/openlineage/transport/variable.py +++ b/providers/openlineage/tests/system/openlineage/transport/variable.py @@ -16,16 +16,19 @@ # under the License. from __future__ import annotations +import logging from typing import TYPE_CHECKING from openlineage.client.serde import Serde -from openlineage.client.transport import Transport, get_default_factory +from openlineage.client.transport import Config, Transport, get_default_factory from airflow.models.variable import Variable if TYPE_CHECKING: from openlineage.client.client import Event +log = logging.getLogger(__name__) + class VariableTransport(Transport): """ @@ -37,12 +40,17 @@ class VariableTransport(Transport): kind = "variable" + def __init__(self, config: Config) -> None: + log.debug("Constructing OpenLineage transport that will send events to Airflow Variables.") + def emit(self, event: Event) -> None: key = f"{event.job.name}.event.{event.eventType.value.lower()}" # type: ignore[union-attr] event_str = Serde.to_json(event) if (var := Variable.get(key=key, default_var=None, deserialize_json=True)) is not None: + log.debug("Appending OL event to Airflow Variable `%s`", key) Variable.set(key=key, value=var + [event_str], serialize_json=True) else: + log.debug("Emitting OL event to new Airflow Variable `%s`", key) Variable.set(key=key, value=[event_str], serialize_json=True)