Skip to content

Commit

Permalink
Merge branch 'main' into feature/update-steps-in-release-on-projects
Browse files Browse the repository at this point in the history
  • Loading branch information
wslulciuc committed Jun 11, 2021
2 parents 1ad899e + 3fabbbe commit 5cc9d71
Show file tree
Hide file tree
Showing 16 changed files with 493 additions and 189 deletions.
26 changes: 26 additions & 0 deletions integrations/airflow/README.md
Expand Up @@ -115,13 +115,39 @@ To enable logging, set the environment variable `MARQUEZ_LOG_LEVEL` to `DEBUG`,
$ export MARQUEZ_LOG_LEVEL=INFO
```

## Triggering Child Jobs
Commonly, Airflow DAGs will trigger processes on remote systems, such as an Apache Spark or Apache
Beam job. Those systems may have their own OpenLineage integration and report their own
job runs and dataset inputs/outputs. To propagate the job hierarchy, tasks must send their own run
id so that the downstream process can report the [ParentRunFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/OpenLineage.json#/definitions/ParentRunFacet)
with the proper run id.

The `lineage_run_id` macro exists to inject the run id of a given task into the arguments sent to a
remote processing job's Airflow operator. The macro requires the DAG run_id and the task to access
the generated run id for that task. For example, a Spark job can be triggered using the
`DataProcPySparkOperator` with the correct parent run id using the following configuration:
```python
t1 = DataProcPySparkOperator(
task_id=job_name,
#required pyspark configuration,
job_name=job_name,
dataproc_pyspark_properties={
'spark.driver.extraJavaOptions':
f"-javaagent:{jar}={os.environ.get('MARQUEZ_URL')}/api/v1/namespaces/{os.getenv('MARQUEZ_NAMESPACE', 'default')}/jobs/{job_name}/runs/{{{{lineage_run_id(run_id, task)}}}}?api_key={os.environ.get('MARQUEZ_API_KEY')}"
dag=dag)
```
## Development

To install all dependencies for _local_ development:

```bash
# Bash
$ pip3 install -e .[dev]
```
```zsh
# escape the brackets in zsh
$ pip3 install -e .\[dev\]
```

To run the entire test suite, you'll first want to initialize the Airflow database:

Expand Down
61 changes: 44 additions & 17 deletions integrations/airflow/marquez_airflow/dag.py
Expand Up @@ -9,27 +9,26 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from typing import List, Union, Optional
from uuid import uuid4

import airflow.models
import time

from airflow.models import DagRun
from airflow.utils.db import provide_session
from airflow.utils.state import State

# Handling of import of different airflow versions
from airflow.version import version as AIRFLOW_VERSION
from marquez_airflow.extractors import StepMetadata, BaseExtractor
from marquez_airflow.extractors.extractors import Extractors
from marquez_airflow.utils import (
JobIdMapping,
get_location,
DagUtils,
get_custom_facets
get_custom_facets,
new_lineage_run_id
)

# Handling of import of different airflow versions
from airflow.version import version as AIRFLOW_VERSION
from pkg_resources import parse_version

if parse_version(AIRFLOW_VERSION) >= parse_version("1.10.11"):
from airflow import LoggingMixin
else:
Expand All @@ -41,9 +40,44 @@
_MARQUEZ = MarquezAdapter()


@provide_session
def lineage_run_id(run_id, task, session=None):
"""
Macro function which returns the generated run id for a given task. This
can be used to forward the run id from a task to a child run so the job
hierarchy is preserved. Invoke as a jinja template, e.g.
PythonOperator(
task_id='render_template',
python_callable=my_task_function,
op_args=['{{ task_run_id(run_id, task) }}'], # task_run_id macro invoked
provide_context=False,
dag=dag
)
:param run_id:
:param task:
:param session:
:return:
"""
name = DAG._marquez_job_name(task.dag_id, task.task_id)
ids = JobIdMapping.get(name, run_id, session)
if ids is None:
return ""
elif isinstance(ids, list):
return "" if len(ids) == 0 else ids[0]
else:
return str(ids)


class DAG(airflow.models.DAG, LoggingMixin):
def __init__(self, *args, extractor_mapper=None, **kwargs):
self.log.debug("marquez-airflow dag starting")
macros = {}
if kwargs.__contains__("user_defined_macros"):
macros = kwargs["user_defined_macros"]
macros["lineage_run_id"] = lineage_run_id
kwargs["user_defined_macros"] = macros
super().__init__(*args, **kwargs)
self.extractors = {}

Expand Down Expand Up @@ -94,7 +128,7 @@ def _register_dagrun(self, dagrun: DagRun, is_external_trigger: bool, execution_
step = self._extract_metadata(dagrun, task)

job_name = self._marquez_job_name(self.dag_id, task.task_id)
run_id = self._marquez_run_id(dagrun.run_id, task.task_id)
run_id = new_lineage_run_id(dagrun.run_id, task_id)

task_run_id = _MARQUEZ.start_task(
run_id,
Expand Down Expand Up @@ -158,7 +192,7 @@ def _report_task_instance(self, task_instance, dagrun, session):
step = self._extract_metadata(dagrun, task, task_instance)

job_name = self._marquez_job_name(self.dag_id, task.task_id)
run_id = self._marquez_run_id(dagrun.run_id, task.task_id)
run_id = new_lineage_run_id(dagrun.run_id, task.task_id)

if not task_run_id:
task_run_id = _MARQUEZ.start_task(
Expand Down Expand Up @@ -258,9 +292,6 @@ def _timed_log_message(self, start_time):
return f'airflow_dag_id={self.dag_id} ' \
f'duration_ms={(self._now_ms() - start_time)}'

def new_run_id(self) -> str:
return str(uuid4())

@staticmethod
def _get_location(task):
try:
Expand All @@ -279,10 +310,6 @@ def _marquez_job_name_from_task_instance(task_instance):
def _marquez_job_name(dag_id: str, task_id: str) -> str:
return f'{dag_id}.{task_id}'

@staticmethod
def _marquez_run_id(dag_run_id: str, task_id: str) -> str:
return f'{dag_run_id}.{task_id}'

@staticmethod
def _now_ms():
return int(round(time.time() * 1000))
11 changes: 8 additions & 3 deletions integrations/airflow/marquez_airflow/utils.py
Expand Up @@ -10,16 +10,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import os
import subprocess
import json
from uuid import uuid4

import airflow
from airflow.models import Connection
from airflow.utils.db import provide_session

from marquez_airflow.facets import AirflowVersionRunFacet, AirflowRunArgsRunFacet
from marquez_airflow.facets import AirflowVersionRunFacet, \
AirflowRunArgsRunFacet

try:
# Import from pendulum 1.x version
Expand Down Expand Up @@ -156,6 +157,10 @@ def get_custom_facets(task, is_external_trigger: bool):
}


def new_lineage_run_id(dag_run_id: str, task_id: str) -> str:
return str(uuid4())


class DagUtils:

def get_execution_date(**kwargs):
Expand Down

0 comments on commit 5cc9d71

Please sign in to comment.