Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data_interval_start and data_interval_end in dagrun create API endpoint #36630

Merged
merged 5 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models import DagModel, DagRun
from airflow.security import permissions
from airflow.timetables.base import DataInterval
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.db import get_query_count
from airflow.utils.log.action_logger import action_event_from_permission
Expand Down Expand Up @@ -336,11 +337,22 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
if not dagrun_instance:
try:
dag = get_airflow_app().dag_bag.get_dag(dag_id)

data_interval_start = post_body.get("data_interval_start")
data_interval_end = post_body.get("data_interval_end")
if data_interval_start and data_interval_end:
data_interval = DataInterval(
start=pendulum.instance(data_interval_start),
end=pendulum.instance(data_interval_end),
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)

dag_run = dag.create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
execution_date=logical_date,
data_interval=dag.timetable.infer_manual_data_interval(run_after=logical_date),
data_interval=data_interval,
state=DagRunState.QUEUED,
conf=post_body.get("conf"),
external_trigger=True,
Expand Down
6 changes: 4 additions & 2 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2900,12 +2900,14 @@ components:
data_interval_start:
type: string
format: date-time
readOnly: true
description: |
The beginning of the interval the DAG run covers.
nullable: true
data_interval_end:
type: string
format: date-time
readOnly: true
description: |
The end of the interval the DAG run covers.
nullable: true
last_scheduling_decision:
type: string
Expand Down
16 changes: 13 additions & 3 deletions airflow/api_connexion/schemas/dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import json
from typing import NamedTuple

from marshmallow import fields, post_dump, pre_load, validate
from marshmallow import ValidationError, fields, post_dump, pre_load, validate, validates_schema
from marshmallow.schema import Schema
from marshmallow.validate import Range
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
Expand Down Expand Up @@ -69,8 +69,8 @@ class Meta:
state = DagStateField(dump_only=True)
external_trigger = auto_field(dump_default=True, dump_only=True)
conf = ConfObject()
data_interval_start = auto_field(dump_only=True)
data_interval_end = auto_field(dump_only=True)
data_interval_start = auto_field(validate=validate_istimezone)
data_interval_end = auto_field(validate=validate_istimezone)
last_scheduling_decision = auto_field(dump_only=True)
run_type = auto_field(dump_only=True)
note = auto_field(dump_only=False)
Expand Down Expand Up @@ -121,6 +121,16 @@ def autofill(self, data, **kwargs):

return ret_data

@validates_schema
def validate_data_interval_dates(self, data, **kwargs):
data_interval_start_exists = data.get("data_interval_start") is not None
data_interval_end_exists = data.get("data_interval_end") is not None

if data_interval_start_exists != data_interval_end_exists:
raise ValidationError(
"Both 'data_interval_start' and 'data_interval_end' must be specified together"
)


class SetDagRunStateFormSchema(Schema):
"""Schema for handling the request of setting state of DAG run."""
Expand Down
10 changes: 8 additions & 2 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1077,9 +1077,15 @@ export interface components {
start_date?: string | null;
/** Format: date-time */
end_date?: string | null;
/** Format: date-time */
/**
* Format: date-time
* @description The beginning of the interval the DAG run covers.
*/
data_interval_start?: string | null;
/** Format: date-time */
/**
* Format: date-time
* @description The end of the interval the DAG run covers.
*/
data_interval_end?: string | null;
/** Format: date-time */
last_scheduling_decision?: string | null;
Expand Down
89 changes: 82 additions & 7 deletions tests/api_connexion/endpoints/test_dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -1159,32 +1159,59 @@ def test_end_date_gte_lte(self, payload, expected_dag_run_ids):
class TestPostDagRun(TestDagRunEndpoint):
@pytest.mark.parametrize("logical_date_field_name", ["execution_date", "logical_date"])
@pytest.mark.parametrize(
"dag_run_id, logical_date, note",
"dag_run_id, logical_date, note, data_interval_start, data_interval_end",
[
pytest.param("TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note", id="all-present"),
pytest.param(None, "2020-06-11T18:00:00+00:00", None, id="only-date"),
pytest.param(None, None, None, id="all-missing"),
pytest.param(
"TEST_DAG_RUN", "2020-06-11T18:00:00+00:00", "test-note", None, None, id="all-present"
),
pytest.param(
"TEST_DAG_RUN",
"2024-06-11T18:00:00+00:00",
"test-note",
"2024-01-03T00:00:00+00:00",
"2024-01-04T05:00:00+00:00",
id="all-present-with-dates",
),
pytest.param(None, "2020-06-11T18:00:00+00:00", None, None, None, id="only-date"),
pytest.param(None, None, None, None, None, id="all-missing"),
],
)
def test_should_respond_200(self, session, logical_date_field_name, dag_run_id, logical_date, note):
def test_should_respond_200(
self,
session,
logical_date_field_name,
dag_run_id,
logical_date,
note,
data_interval_start,
data_interval_end,
):
self._create_dag("TEST_DAG_ID")

# We'll patch airflow.utils.timezone.utcnow to always return this so we
# can check the returned dates.
fixed_now = timezone.utcnow()

# raise NotImplementedError("TODO: Add tests for data_interval_start and data_interval_end")

request_json = {}
if logical_date is not None:
request_json[logical_date_field_name] = logical_date
if dag_run_id is not None:
request_json["dag_run_id"] = dag_run_id
if data_interval_start is not None:
request_json["data_interval_start"] = data_interval_start
if data_interval_end is not None:
request_json["data_interval_end"] = data_interval_end

request_json["note"] = note
with mock.patch("airflow.utils.timezone.utcnow", lambda: fixed_now):
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns",
json=request_json,
environ_overrides={"REMOTE_USER": "test"},
)

assert response.status_code == 200

if logical_date is None:
Expand All @@ -1195,6 +1222,13 @@ def test_should_respond_200(self, session, logical_date_field_name, dag_run_id,
expected_dag_run_id = f"manual__{expected_logical_date}"
else:
expected_dag_run_id = dag_run_id

expected_data_interval_start = expected_logical_date
expected_data_interval_end = expected_logical_date
if data_interval_start is not None and data_interval_end is not None:
expected_data_interval_start = data_interval_start
expected_data_interval_end = data_interval_end

assert response.json == {
"conf": {},
"dag_id": "TEST_DAG_ID",
Expand All @@ -1205,8 +1239,8 @@ def test_should_respond_200(self, session, logical_date_field_name, dag_run_id,
"external_trigger": True,
"start_date": None,
"state": "queued",
"data_interval_end": expected_logical_date,
"data_interval_start": expected_logical_date,
"data_interval_end": expected_data_interval_end,
"data_interval_start": expected_data_interval_start,
"last_scheduling_decision": None,
"run_type": "manual",
"note": note,
Expand Down Expand Up @@ -1323,6 +1357,47 @@ def test_should_response_400_for_conflicting_execution_date_logical_date(self):
assert response.json["title"] == "logical_date conflicts with execution_date"
assert response.json["detail"] == (f"'{logical_date}' != '{execution_date}'")

@pytest.mark.parametrize(
"data_interval_start, data_interval_end, expected",
[
(
"2020-11-10T08:25:56.939143",
None,
"'2020-11-10T08:25:56.939143' is not a 'date-time' - 'data_interval_start'",
),
(
None,
"2020-11-10T08:25:56.939143",
"'2020-11-10T08:25:56.939143' is not a 'date-time' - 'data_interval_end'",
),
(
"2020-11-10T08:25:56.939143+00:00",
None,
"{'_schema': [\"Both 'data_interval_start' and 'data_interval_end' must be specified together\"]}",
),
(
None,
"2020-11-10T08:25:56.939143+00:00",
"{'_schema': [\"Both 'data_interval_start' and 'data_interval_end' must be specified together\"]}",
),
],
)
def test_should_response_400_for_missing_start_date_or_end_date(
self, data_interval_start, data_interval_end, expected
):
self._create_dag("TEST_DAG_ID")
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns",
json={
"execution_date": "2020-11-10T08:25:56.939143+00:00",
"data_interval_start": data_interval_start,
"data_interval_end": data_interval_end,
},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 400
assert response.json["detail"] == expected

@pytest.mark.parametrize(
"data, expected",
[
Expand Down