Skip to content

Commit

Permalink
Add data_interval_start and data_interval_end in dagrun create API en…
Browse files Browse the repository at this point in the history
…dpoint (#36630)
  • Loading branch information
karakanb authored Jan 26, 2024
1 parent 0f2670e commit 714a9a7
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 15 deletions.
14 changes: 13 additions & 1 deletion airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models import DagModel, DagRun
from airflow.security import permissions
from airflow.timetables.base import DataInterval
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.db import get_query_count
from airflow.utils.log.action_logger import action_event_from_permission
Expand Down Expand Up @@ -336,11 +337,22 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
if not dagrun_instance:
try:
dag = get_airflow_app().dag_bag.get_dag(dag_id)

data_interval_start = post_body.get("data_interval_start")
data_interval_end = post_body.get("data_interval_end")
if data_interval_start and data_interval_end:
data_interval = DataInterval(
start=pendulum.instance(data_interval_start),
end=pendulum.instance(data_interval_end),
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)

dag_run = dag.create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
execution_date=logical_date,
data_interval=dag.timetable.infer_manual_data_interval(run_after=logical_date),
data_interval=data_interval,
state=DagRunState.QUEUED,
conf=post_body.get("conf"),
external_trigger=True,
Expand Down
6 changes: 4 additions & 2 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2900,12 +2900,14 @@ components:
data_interval_start:
type: string
format: date-time
readOnly: true
description: |
The beginning of the interval the DAG run covers.
nullable: true
data_interval_end:
type: string
format: date-time
readOnly: true
description: |
The end of the interval the DAG run covers.
nullable: true
last_scheduling_decision:
type: string
Expand Down
16 changes: 13 additions & 3 deletions airflow/api_connexion/schemas/dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import json
from typing import NamedTuple

from marshmallow import fields, post_dump, pre_load, validate
from marshmallow import ValidationError, fields, post_dump, pre_load, validate, validates_schema
from marshmallow.schema import Schema
from marshmallow.validate import Range
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
Expand Down Expand Up @@ -69,8 +69,8 @@ class Meta:
state = DagStateField(dump_only=True)
external_trigger = auto_field(dump_default=True, dump_only=True)
conf = ConfObject()
data_interval_start = auto_field(dump_only=True)
data_interval_end = auto_field(dump_only=True)
data_interval_start = auto_field(validate=validate_istimezone)
data_interval_end = auto_field(validate=validate_istimezone)
last_scheduling_decision = auto_field(dump_only=True)
run_type = auto_field(dump_only=True)
note = auto_field(dump_only=False)
Expand Down Expand Up @@ -121,6 +121,16 @@ def autofill(self, data, **kwargs):

return ret_data

@validates_schema
def validate_data_interval_dates(self, data, **kwargs):
data_interval_start_exists = data.get("data_interval_start") is not None
data_interval_end_exists = data.get("data_interval_end") is not None

if data_interval_start_exists != data_interval_end_exists:
raise ValidationError(
"Both 'data_interval_start' and 'data_interval_end' must be specified together"
)


class SetDagRunStateFormSchema(Schema):
"""Schema for handling the request of setting state of DAG run."""
Expand Down
10 changes: 8 additions & 2 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1077,9 +1077,15 @@ export interface components {
start_date?: string | null;
/** Format: date-time */
end_date?: string | null;
/** Format: date-time */
/**
* Format: date-time
* @description The beginning of the interval the DAG run covers.
*/
data_interval_start?: string | null;
/** Format: date-time */
/**
* Format: date-time
* @description The end of the interval the DAG run covers.
*/
data_interval_end?: string | null;
/** Format: date-time */
last_scheduling_decision?: string | null;
Expand Down
89 changes: 82 additions & 7 deletions tests/api_connexion/endpoints/test_dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -1159,32 +1159,59 @@ def test_end_date_gte_lte(self, payload, expected_dag_run_ids):
class TestPostDagRun(TestDagRunEndpoint):
@pytest.mark.parametrize("logical_date_field_name", ["execution_date", "logical_date"])
@pytest.mark.parametrize(
"dag_run_id, logical_date, note",
"dag_run_id, logical_date, note, data_interval_start, data_interval_end",
[
pytest.param("TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note", id="all-present"),
pytest.param(None, "2020-06-11T18:00:00+00:00", None, id="only-date"),
pytest.param(None, None, None, id="all-missing"),
pytest.param(
"TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note", None, None, id="all-present"
),
pytest.param(
"TEST_DAG_RUN",
"2024-06-11T18:00:00+00:00",
"test-note",
"2024-01-03T00:00:00+00:00",
"2024-01-04T05:00:00+00:00",
id="all-present-with-dates",
),
pytest.param(None, "2020-06-11T18:00:00+00:00", None, None, None, id="only-date"),
pytest.param(None, None, None, None, None, id="all-missing"),
],
)
def test_should_respond_200(self, session, logical_date_field_name, dag_run_id, logical_date, note):
def test_should_respond_200(
self,
session,
logical_date_field_name,
dag_run_id,
logical_date,
note,
data_interval_start,
data_interval_end,
):
self._create_dag("TEST_DAG_ID")

# We'll patch airflow.utils.timezone.utcnow to always return this so we
# can check the returned dates.
fixed_now = timezone.utcnow()

# raise NotImplementedError("TODO: Add tests for data_interval_start and data_interval_end")

request_json = {}
if logical_date is not None:
request_json[logical_date_field_name] = logical_date
if dag_run_id is not None:
request_json["dag_run_id"] = dag_run_id
if data_interval_start is not None:
request_json["data_interval_start"] = data_interval_start
if data_interval_end is not None:
request_json["data_interval_end"] = data_interval_end

request_json["note"] = note
with mock.patch("airflow.utils.timezone.utcnow", lambda: fixed_now):
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns",
json=request_json,
environ_overrides={"REMOTE_USER": "test"},
)

assert response.status_code == 200

if logical_date is None:
Expand All @@ -1195,6 +1222,13 @@ def test_should_respond_200(self, session, logical_date_field_name, dag_run_id,
expected_dag_run_id = f"manual__{expected_logical_date}"
else:
expected_dag_run_id = dag_run_id

expected_data_interval_start = expected_logical_date
expected_data_interval_end = expected_logical_date
if data_interval_start is not None and data_interval_end is not None:
expected_data_interval_start = data_interval_start
expected_data_interval_end = data_interval_end

assert response.json == {
"conf": {},
"dag_id": "TEST_DAG_ID",
Expand All @@ -1205,8 +1239,8 @@ def test_should_respond_200(self, session, logical_date_field_name, dag_run_id,
"external_trigger": True,
"start_date": None,
"state": "queued",
"data_interval_end": expected_logical_date,
"data_interval_start": expected_logical_date,
"data_interval_end": expected_data_interval_end,
"data_interval_start": expected_data_interval_start,
"last_scheduling_decision": None,
"run_type": "manual",
"note": note,
Expand Down Expand Up @@ -1323,6 +1357,47 @@ def test_should_response_400_for_conflicting_execution_date_logical_date(self):
assert response.json["title"] == "logical_date conflicts with execution_date"
assert response.json["detail"] == (f"'{logical_date}' != '{execution_date}'")

@pytest.mark.parametrize(
"data_interval_start, data_interval_end, expected",
[
(
"2020-11-10T08:25:56.939143",
None,
"'2020-11-10T08:25:56.939143' is not a 'date-time' - 'data_interval_start'",
),
(
None,
"2020-11-10T08:25:56.939143",
"'2020-11-10T08:25:56.939143' is not a 'date-time' - 'data_interval_end'",
),
(
"2020-11-10T08:25:56.939143+00:00",
None,
"{'_schema': [\"Both 'data_interval_start' and 'data_interval_end' must be specified together\"]}",
),
(
None,
"2020-11-10T08:25:56.939143+00:00",
"{'_schema': [\"Both 'data_interval_start' and 'data_interval_end' must be specified together\"]}",
),
],
)
def test_should_response_400_for_missing_start_date_or_end_date(
self, data_interval_start, data_interval_end, expected
):
self._create_dag("TEST_DAG_ID")
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns",
json={
"execution_date": "2020-11-10T08:25:56.939143+00:00",
"data_interval_start": data_interval_start,
"data_interval_end": data_interval_end,
},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 400
assert response.json["detail"] == expected

@pytest.mark.parametrize(
"data, expected",
[
Expand Down

0 comments on commit 714a9a7

Please sign in to comment.