Skip to content

Commit

Permalink
Add .../dagRuns/DR-ID/upstreamDatasetEvents endpoint (#25080)
Browse files Browse the repository at this point in the history
Tells you, for a given dag run, the dataset events that are "part of" the dag run.  I.e. they were part of the collection of dataset events that contributed to the triggering of the dag run.  In practice we just query the events that occurred since the prev dag run.  We may make the relationship tighter, see #24969.
  • Loading branch information
dstandish committed Jul 18, 2022
1 parent 2f70daf commit 8aa0b24
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 6 deletions.
68 changes: 68 additions & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Expand Up @@ -41,12 +41,17 @@
dagruns_batch_form_schema,
set_dagrun_state_form_schema,
)
from airflow.api_connexion.schemas.dataset_schema import (
DatasetEventCollection,
dataset_event_collection_schema,
)
from airflow.api_connexion.schemas.task_instance_schema import (
TaskInstanceReferenceCollection,
task_instance_reference_collection_schema,
)
from airflow.api_connexion.types import APIResponse
from airflow.models import DagModel, DagRun
from airflow.models.dataset import DatasetDagRef, DatasetEvent
from airflow.security import permissions
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down Expand Up @@ -86,6 +91,69 @@ def get_dag_run(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION)
return dagrun_schema.dump(dag_run)


@security.requires_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
],
)
@provide_session
def get_upstream_dataset_events(
*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION
) -> APIResponse:
"""Get a DAG Run."""
dag_run: Optional[DagRun] = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag_id,
DagRun.run_id == dag_run_id,
)
.one_or_none()
)
if dag_run is None:
raise NotFound(
"DAGRun not found",
detail=f"DAGRun with DAG ID: '{dag_id}' and DagRun ID: '{dag_run_id}' not found",
)
events = _get_upstream_dataset_events(dag_run=dag_run, session=session)
return dataset_event_collection_schema.dump(
DatasetEventCollection(dataset_events=events, total_entries=len(events))
)


def _get_upstream_dataset_events(*, dag_run: DagRun, session: Session) -> List["DagRun"]:
"""If dag run is dataset-triggered, return the dataset events that triggered it."""
if not dag_run.run_type == DagRunType.DATASET_TRIGGERED:
return []

previous_dag_run = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag_run.dag_id,
DagRun.execution_date < dag_run.execution_date,
DagRun.run_type == DagRunType.DATASET_TRIGGERED,
)
.order_by(DagRun.execution_date.desc())
.first()
)

dataset_event_filters = [
DatasetDagRef.dag_id == dag_run.dag_id,
DatasetEvent.created_at <= dag_run.execution_date,
]
if previous_dag_run:
dataset_event_filters.append(DatasetEvent.created_at > previous_dag_run.execution_date)
dataset_events = (
session.query(DatasetEvent)
.join(DatasetDagRef, DatasetEvent.dataset_id == DatasetDagRef.dataset_id)
.filter(*dataset_event_filters)
.order_by(DatasetEvent.created_at)
.all()
)
return dataset_events


def _fetch_dag_runs(
query: Query,
*,
Expand Down
36 changes: 32 additions & 4 deletions airflow/api_connexion/openapi/v1.yaml
Expand Up @@ -818,6 +818,34 @@ paths:
'404':
$ref: '#/components/responses/NotFound'

/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents:
parameters:
- $ref: '#/components/parameters/DAGID'
- $ref: '#/components/parameters/DAGRunID'
get:
summary: Get dataset events for a DAG run
description: |
Get datasets for a dag run.
*New in version 2.4.0*
x-openapi-router-controller: airflow.api_connexion.endpoints.dag_run_endpoint
operationId: get_upstream_dataset_events
tags: [DAGRun, Dataset]
responses:
'200':
description: Success.
content:
application/json:
schema:
$ref: '#/components/schemas/DatasetEventCollection'
'401':
$ref: '#/components/responses/Unauthenticated'
'403':
$ref: '#/components/responses/PermissionDenied'
'404':
$ref: '#/components/responses/NotFound'


/eventLogs:
get:
summary: List log entries
Expand Down Expand Up @@ -3508,19 +3536,19 @@ components:
source_dag_id:
type: string
description: The DAG ID that updated the dataset.
nullable: false
nullable: true
source_task_id:
type: string
description: The task ID that updated the dataset.
nullable: false
nullable: true
source_run_id:
type: string
description: The DAG run ID that updated the dataset.
nullable: false
nullable: true
source_map_index:
type: integer
description: The task map index that updated the dataset.
nullable: false
nullable: true
created_at:
type: string
description: The dataset event creation time
Expand Down
185 changes: 183 additions & 2 deletions tests/api_connexion/endpoints/test_dag_run_endpoint.py
Expand Up @@ -16,18 +16,22 @@
# under the License.
from datetime import timedelta
from unittest import mock
from uuid import uuid4

import pendulum
import pytest
from freezegun import freeze_time
from parameterized import parameterized

from airflow import settings
from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
from airflow.models import DAG, DagModel, DagRun
from airflow.models import DAG, DagModel, DagRun, Dataset
from airflow.models.dataset import DatasetEvent
from airflow.operators.empty import EmptyOperator
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import State
from airflow.utils.state import DagRunState, State
from airflow.utils.types import DagRunType
from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_roles, delete_user
from tests.test_utils.config import conf_vars
Expand All @@ -44,6 +48,7 @@ def configured_app(minimal_app_for_api):
role_name="Test",
permissions=[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
Expand Down Expand Up @@ -1483,3 +1488,179 @@ def test_should_respond_404(self):
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 404


def test__get_upstream_dataset_events_no_prior(configured_app):
"""If no prior dag runs, return all events"""
from airflow.api_connexion.endpoints.dag_run_endpoint import _get_upstream_dataset_events

# setup dags and datasets
unique_id = str(uuid4())
session = settings.Session()
dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, dataset1b])
DAG.bulk_write_to_db(dags=[dag2], session=session)
session.add_all([dataset1a, dataset1b])
session.commit()

# add 5 events
session.add_all([DatasetEvent(dataset_id=dataset1a.id), DatasetEvent(dataset_id=dataset1b.id)])
session.add_all([DatasetEvent(dataset_id=dataset1a.id), DatasetEvent(dataset_id=dataset1b.id)])
session.add_all([DatasetEvent(dataset_id=dataset1a.id)])
session.commit()

# create a single dag run, no prior dag runs
dr = DagRun(dag2.dag_id, run_id=unique_id, run_type=DagRunType.DATASET_TRIGGERED)
dr.dag = dag2
session.add(dr)
session.commit()
session.expunge_all()

# check result
events = _get_upstream_dataset_events(dag_run=dr, session=session)
assert len(events) == 5


def test__get_upstream_dataset_events_with_prior(configured_app):
"""
Events returned should be those that occurred after last DATASET_TRIGGERED
dag run and up to the exec date of current dag run.
"""
from airflow.api_connexion.endpoints.dag_run_endpoint import _get_upstream_dataset_events

# setup dags and datasets
unique_id = str(uuid4())
session = settings.Session()
dataset1a = Dataset(uri=f"s3://{unique_id}-1a")
dataset1b = Dataset(uri=f"s3://{unique_id}-1b")
dag2 = DAG(dag_id=f"datasets-{unique_id}-2", schedule_on=[dataset1a, dataset1b])
DAG.bulk_write_to_db(dags=[dag2], session=session)
session.add_all([dataset1a, dataset1b])
session.commit()

# add 2 events, then a dag run, then 3 events, then another dag run then another event
first_timestamp = pendulum.datetime(2022, 1, 1, tz='UTC')
session.add_all(
[
DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp),
DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp),
]
)
dr1 = DagRun(
dag2.dag_id,
run_id=unique_id + '-1',
run_type=DagRunType.DATASET_TRIGGERED,
execution_date=first_timestamp.add(microseconds=1000),
)
dr1.dag = dag2
session.add(dr1)
session.add_all(
[
DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=2000)),
DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=3000)),
DatasetEvent(dataset_id=dataset1b.id, created_at=first_timestamp.add(microseconds=4000)),
]
)
dr2 = DagRun( # this dag run should be ignored
dag2.dag_id,
run_id=unique_id + '-3',
run_type=DagRunType.MANUAL,
execution_date=first_timestamp.add(microseconds=3000),
)
dr2.dag = dag2
session.add(dr2)
dr3 = DagRun(
dag2.dag_id,
run_id=unique_id + '-2',
run_type=DagRunType.DATASET_TRIGGERED,
execution_date=first_timestamp.add(microseconds=4000), # exact same time as 3rd event in window
)
dr3.dag = dag2
session.add(dr3)
session.add_all(
[DatasetEvent(dataset_id=dataset1a.id, created_at=first_timestamp.add(microseconds=5000))]
)
session.commit()
session.expunge_all()

events = _get_upstream_dataset_events(dag_run=dr3, session=session)

event_times = [x.created_at for x in events]
assert event_times == [
first_timestamp.add(microseconds=2000),
first_timestamp.add(microseconds=3000),
first_timestamp.add(microseconds=4000),
]


class TestGetDagRunDatasetTriggerEvents(TestDagRunEndpoint):
@mock.patch('airflow.api_connexion.endpoints.dag_run_endpoint._get_upstream_dataset_events')
def test_should_respond_200(self, mock_get_events, session):
dagrun_model = DagRun(
dag_id="TEST_DAG_ID",
run_id="TEST_DAG_RUN_ID",
run_type=DagRunType.DATASET_TRIGGERED,
execution_date=timezone.parse(self.default_time),
start_date=timezone.parse(self.default_time),
external_trigger=True,
state=DagRunState.RUNNING,
)
session.add(dagrun_model)
session.commit()
result = session.query(DagRun).all()
assert len(result) == 1
created_at = pendulum.now('UTC')
# make sure whatever is returned by this func is what comes out in response.
mock_get_events.return_value = [DatasetEvent(dataset_id=1, created_at=created_at)]
response = self.client.get(
"api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamDatasetEvents",
environ_overrides={'REMOTE_USER': "test"},
)
assert response.status_code == 200
expected_response = {
'dataset_events': [
{
'created_at': str(created_at),
'dataset_id': 1,
'extra': None,
'id': None,
'source_dag_id': None,
'source_map_index': None,
'source_run_id': None,
'source_task_id': None,
}
],
'total_entries': 1,
}
assert response.json == expected_response

def test_should_respond_404(self):
response = self.client.get(
"api/v1/dags/invalid-id/dagRuns/invalid-id/upstreamDatasetEvents",
environ_overrides={'REMOTE_USER': "test"},
)
assert response.status_code == 404
expected_resp = {
'detail': "DAGRun with DAG ID: 'invalid-id' and DagRun ID: 'invalid-id' not found",
'status': 404,
'title': 'DAGRun not found',
'type': EXCEPTIONS_LINK_MAP[404],
}
assert expected_resp == response.json

def test_should_raises_401_unauthenticated(self, session):
dagrun_model = DagRun(
dag_id="TEST_DAG_ID",
run_id="TEST_DAG_RUN_ID",
run_type=DagRunType.MANUAL,
execution_date=timezone.parse(self.default_time),
start_date=timezone.parse(self.default_time),
external_trigger=True,
)
session.add(dagrun_model)
session.commit()

response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamDatasetEvents")

assert_401(response)

0 comments on commit 8aa0b24

Please sign in to comment.