Skip to content

Commit

Permalink
Add lineage_job_namespace and lineage_job_name OpenLineage macros (#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Apr 8, 2024
1 parent d409b8b commit 093ab7e
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 40 deletions.
43 changes: 33 additions & 10 deletions airflow/providers/openlineage/plugins/macros.py
Expand Up @@ -26,22 +26,41 @@
from airflow.models import TaskInstance


def lineage_job_namespace():
"""
Macro function which returns Airflow OpenLineage namespace.
.. seealso::
For more information take a look at the guide:
:ref:`howto/macros:openlineage`
"""
return conf.namespace()


def lineage_job_name(task_instance: TaskInstance):
"""
Macro function which returns Airflow task name in OpenLineage format (`<dag_id>.<task_id>`).
.. seealso::
For more information take a look at the guide:
:ref:`howto/macros:openlineage`
"""
return get_job_name(task_instance)


def lineage_run_id(task_instance: TaskInstance):
"""
Macro function which returns the generated run id for a given task.
Macro function which returns the generated run id (UUID) for a given task.
This can be used to forward the run id from a task to a child run so the job hierarchy is preserved.
.. seealso::
For more information on how to use this operator, take a look at the guide:
For more information take a look at the guide:
:ref:`howto/macros:openlineage`
"""
if TYPE_CHECKING:
assert task_instance.task

return OpenLineageAdapter.build_task_instance_run_id(
dag_id=task_instance.dag_id,
task_id=task_instance.task.task_id,
task_id=task_instance.task_id,
execution_date=task_instance.execution_date,
try_number=task_instance.try_number,
)
Expand All @@ -56,9 +75,13 @@ def lineage_parent_id(task_instance: TaskInstance):
run so the job hierarchy is preserved. Child run can easily create ParentRunFacet from these information.
.. seealso::
For more information on how to use this macro, take a look at the guide:
For more information take a look at the guide:
:ref:`howto/macros:openlineage`
"""
job_name = get_job_name(task_instance.task)
run_id = lineage_run_id(task_instance)
return f"{conf.namespace()}/{job_name}/{run_id}"
return "/".join(
(
lineage_job_namespace(),
lineage_job_name(task_instance),
lineage_run_id(task_instance),
)
)
9 changes: 7 additions & 2 deletions airflow/providers/openlineage/plugins/openlineage.py
Expand Up @@ -19,7 +19,12 @@
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.listener import get_openlineage_listener
from airflow.providers.openlineage.plugins.macros import lineage_parent_id, lineage_run_id
from airflow.providers.openlineage.plugins.macros import (
lineage_job_name,
lineage_job_namespace,
lineage_parent_id,
lineage_run_id,
)


class OpenLineageProviderPlugin(AirflowPlugin):
Expand All @@ -32,5 +37,5 @@ class OpenLineageProviderPlugin(AirflowPlugin):

name = "OpenLineageProviderPlugin"
if not conf.is_disabled():
macros = [lineage_run_id, lineage_parent_id]
macros = [lineage_job_namespace, lineage_job_name, lineage_run_id, lineage_parent_id]
listeners = [get_openlineage_listener()]
2 changes: 1 addition & 1 deletion airflow/providers/openlineage/utils/utils.py
Expand Up @@ -56,7 +56,7 @@ def get_operator_class(task: BaseOperator) -> type:
return task.__class__


def get_job_name(task):
def get_job_name(task: TaskInstance) -> str:
return f"{task.dag_id}.{task.task_id}"


Expand Down
59 changes: 39 additions & 20 deletions docs/apache-airflow-providers-openlineage/macros.rst
Expand Up @@ -24,30 +24,49 @@ Macros included in OpenLineage plugin get integrated to Airflow's main collectio

They can be invoked as a Jinja template, e.g.

Lineage run id
--------------
Lineage job & run macros
------------------------

These macros:
* ``lineage_job_namespace()``
* ``lineage_job_name(task_instance)``
* ``lineage_run_id(task_instance)``

allow injecting pieces of run information of a given Airflow task into the arguments sent to a remote processing job.
For example, ``SparkSubmitOperator`` can be set up like this:

.. code-block:: python
PythonOperator(
task_id="render_template",
python_callable=my_task_function,
op_args=[
"{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}"
], # lineage_run_id macro invoked
provide_context=False,
dag=dag,
)
SparkSubmitOperator(
task_id="my_task",
application="/script.py",
conf={
# separated components
"spark.openlineage.parentJobNamespace": "{{ macros.OpenLineagePlugin.lineage_job_namespace() }}",
"spark.openlineage.parentJobName": "{{ macros.OpenLineagePlugin.lineage_job_name(task_instance) }}",
"spark.openlineage.parentRunId": "{{ macros.OpenLineagePlugin.lineage_run_id(task_instance) }}",
},
)
Lineage parent id
-----------------

Same information, but compacted to one string, can be passed using ``linage_parent_id(task_instance)`` macro:

.. code-block:: python
PythonOperator(
task_id="render_template",
python_callable=my_task_function,
op_args=[
"{{ macros.OpenLineageProviderPlugin.lineage_parent_id(run_id, task_instance) }}"
], # lineage_parent_id macro invoked
provide_context=False,
dag=dag,
)
def my_task_function(templates_dict, **kwargs):
parent_job_namespace, parent_job_name, parent_run_id = templates_dict["parentRun"].split("/")
...
PythonOperator(
task_id="render_template",
python_callable=my_task_function,
templates_dict={
# joined components as one string `<namespace>/<name>/<run_id>`
"parentRun": "{{ macros.OpenLineageProviderPlugin.lineage_parent_id(task_instance) }}",
},
provide_context=False,
dag=dag,
)
36 changes: 29 additions & 7 deletions tests/providers/openlineage/plugins/test_macros.py
Expand Up @@ -20,16 +20,38 @@
from unittest import mock

from airflow.providers.openlineage.conf import namespace
from airflow.providers.openlineage.plugins.macros import lineage_parent_id, lineage_run_id
from airflow.providers.openlineage.plugins.macros import (
lineage_job_name,
lineage_job_namespace,
lineage_parent_id,
lineage_run_id,
)

_DAG_NAMESPACE = namespace()


def test_lineage_job_namespace():
assert lineage_job_namespace() == _DAG_NAMESPACE


def test_lineage_job_name():
task_instance = mock.MagicMock(
dag_id="dag_id",
task_id="task_id",
execution_date="execution_date",
try_number=1,
)
assert lineage_job_name(task_instance) == "dag_id.task_id"


def test_lineage_run_id():
task = mock.MagicMock(
dag_id="dag_id", execution_date="execution_date", try_number=1, task=mock.MagicMock(task_id="task_id")
task_instance = mock.MagicMock(
dag_id="dag_id",
task_id="task_id",
execution_date="execution_date",
try_number=1,
)
actual = lineage_run_id(task)
actual = lineage_run_id(task_instance)
expected = str(
uuid.uuid3(
uuid.NAMESPACE_URL,
Expand All @@ -42,12 +64,12 @@ def test_lineage_run_id():
@mock.patch("airflow.providers.openlineage.plugins.macros.lineage_run_id")
def test_lineage_parent_id(mock_run_id):
mock_run_id.return_value = "run_id"
task = mock.MagicMock(
task_instance = mock.MagicMock(
dag_id="dag_id",
task_id="task_id",
execution_date="execution_date",
try_number=1,
task=mock.MagicMock(task_id="task_id", dag_id="dag_id"),
)
actual = lineage_parent_id(task_instance=task)
actual = lineage_parent_id(task_instance)
expected = f"{_DAG_NAMESPACE}/dag_id.task_id/run_id"
assert actual == expected

0 comments on commit 093ab7e

Please sign in to comment.