Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def set_transport_variable():
lm.clear()
listener = OpenLineageListener()
listener.adapter._client = listener.adapter.get_or_create_openlineage_client()
listener.adapter._client.transport = VariableTransport()
listener.adapter._client.transport = VariableTransport({})
lm.add_listener(listener)
yield
lm.clear()
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
"operator_class_path": "{{ result.endswith('.PythonOperator') }}",
"wait_for_downstream": false,
"retry_exponential_backoff": false,
"ignore_first_depends_on_past": false,
"wait_for_past_depends_before_skipping": false
},
"taskUuid": "{{ is_uuid(result) }}"
Expand Down Expand Up @@ -144,7 +143,6 @@
"operator_class_path": "{{ result.endswith('.PythonOperator') }}",
"wait_for_downstream": false,
"retry_exponential_backoff": false,
"ignore_first_depends_on_past": false,
"wait_for_past_depends_before_skipping": false
},
"taskUuid": "{{ is_uuid(result) }}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@ def do_nothing():
pass


default_args = {"start_date": datetime(2021, 1, 1), "retries": 1}

# Instantiate the DAG
with DAG(
"openlineage_basic_dag",
default_args=default_args,
start_date=datetime(2021, 1, 1),
schedule=None,
catchup=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"namespace": "{{ result is string }}",
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
"facets": {
"jobType": {
Expand All @@ -24,7 +24,7 @@
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"namespace": "{{ result is string }}",
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
"facets": {
"jobType": {
Expand All @@ -42,7 +42,7 @@
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"namespace": "{{ result is string }}",
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
"facets": {
"jobType": {
Expand All @@ -61,7 +61,7 @@
"runId": "{{ is_uuid(result) }}"
},
"job": {
"namespace": "default",
"namespace": "{{ result is string }}",
"name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
"facets": {
"jobType": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,11 @@ def execute(self, context: Context) -> None:
self.event_templates[key] = event
for key, template in self.event_templates.items(): # type: ignore[union-attr]
send_event = Variable.get(key=key, deserialize_json=True)
self.log.info("Events: %s, %s, %s", send_event, len(send_event), type(send_event))
if len(send_event) == 0:
raise ValueError(f"No event for key {key}")
if len(send_event) != 1 and not self.multiple_events:
raise ValueError(f"Expected one event for key {key}, got {len(send_event)}")
self.log.info("Events: %s, %s, %s", send_event, len(send_event), type(send_event))
if not match(template, json.loads(send_event[0]), self.env):
raise ValueError("Event received does not match one specified in test")
if self.delete:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@
# under the License.
from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from openlineage.client.serde import Serde
from openlineage.client.transport import Transport, get_default_factory
from openlineage.client.transport import Config, Transport, get_default_factory

from airflow.models.variable import Variable

if TYPE_CHECKING:
from openlineage.client.client import Event

log = logging.getLogger(__name__)


class VariableTransport(Transport):
"""
Expand All @@ -37,12 +40,17 @@ class VariableTransport(Transport):

kind = "variable"

def __init__(self, config: Config) -> None:
log.debug("Constructing OpenLineage transport that will send events to Airflow Variables.")

def emit(self, event: Event) -> None:
key = f"{event.job.name}.event.{event.eventType.value.lower()}" # type: ignore[union-attr]
event_str = Serde.to_json(event)
if (var := Variable.get(key=key, default_var=None, deserialize_json=True)) is not None:
log.debug("Appending OL event to Airflow Variable `%s`", key)
Variable.set(key=key, value=var + [event_str], serialize_json=True)
else:
log.debug("Emitting OL event to new Airflow Variable `%s`", key)
Variable.set(key=key, value=[event_str], serialize_json=True)


Expand Down
Loading