Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
d0e22ad
restore unique constraint on logical date and make it nullable
vatsrahul1001 Jan 29, 2025
2b11aed
restore unique constraint on logical date and make it nullable
vatsrahul1001 Jan 29, 2025
237360d
fix migration file
vatsrahul1001 Jan 29, 2025
6f423c1
Merge branch 'main' into restore-unique-constraint-logical-date
vatsrahul1001 Jan 29, 2025
18efafb
fix migration file
vatsrahul1001 Jan 29, 2025
8f9e076
Merge branch 'restore-unique-constraint-logical-date' of github.com:a…
vatsrahul1001 Jan 29, 2025
1c47c75
Merge branch 'main' into restore-unique-constraint-logical-date
vatsrahul1001 Jan 29, 2025
d3bfe20
Merge branch 'main' of github.com:astronomer/airflow into restore-uni…
vatsrahul1001 Jan 30, 2025
0fa2477
Merge branch 'restore-unique-constraint-logical-date' of github.com:a…
vatsrahul1001 Jan 30, 2025
75a684a
fixing tests
vatsrahul1001 Jan 30, 2025
cefc9a3
refactor backfill reprocess logic
vatsrahul1001 Jan 30, 2025
8f84606
fixing tests
vatsrahul1001 Jan 30, 2025
f239846
Merge branch 'main' into refactor-backfill-reprocess-logic
vatsrahul1001 Jan 30, 2025
322b25c
fix tests
vatsrahul1001 Feb 1, 2025
0c24eff
fix tests
vatsrahul1001 Feb 1, 2025
f74c84b
Merge branch 'main' into refactor-backfill-reprocess-logic
vatsrahul1001 Feb 1, 2025
53d329b
resolve conflict
vatsrahul1001 Feb 3, 2025
c3e29c7
remove default date from logical date in dag run model
vatsrahul1001 Feb 3, 2025
0830d58
Merge branch 'main' of github.com:astronomer/airflow into restore-uni…
vatsrahul1001 Feb 3, 2025
2b4251d
fix task_filter
vatsrahul1001 Feb 3, 2025
3ec1ecd
merge main
vatsrahul1001 Feb 3, 2025
c05c19e
Merge branch 'restore-unique-constraint-logical-date' of github.com:a…
vatsrahul1001 Feb 3, 2025
4b223d8
Merge branch 'refactor-backfill-reprocess-logic' of github.com:astron…
vatsrahul1001 Feb 3, 2025
16cdaa1
Merge branch 'main' into refactor-backfill-reprocess-logic
vatsrahul1001 Feb 3, 2025
a4bc83d
Merge branch 'main' into refactor-backfill-reprocess-logic
vatsrahul1001 Feb 3, 2025
90595d6
fix failing tests
vatsrahul1001 Feb 3, 2025
8b654b5
Merge branch 'main' into refactor-backfill-reprocess-logic
vatsrahul1001 Feb 3, 2025
fd4c069
make logical date as required field
vatsrahul1001 Feb 3, 2025
78f8f43
refactor
vatsrahul1001 Feb 3, 2025
7dd4320
Merge branch 'main' into make-logical-date-as-required-field-to-trigg…
vatsrahul1001 Feb 3, 2025
54bbbe5
Merge branch 'main' of github.com:astronomer/airflow into make-logica…
vatsrahul1001 Feb 6, 2025
fab917d
Merge branch 'make-logical-date-as-required-field-to-trigger-dag-api'…
vatsrahul1001 Feb 7, 2025
15dfbf9
Merge branch 'main' of github.com:astronomer/airflow into make-logica…
vatsrahul1001 Feb 7, 2025
c8b8577
remove backfill related changes
vatsrahul1001 Feb 7, 2025
5467412
implement review comments
vatsrahul1001 Feb 7, 2025
d521d4d
add time now to var
vatsrahul1001 Feb 7, 2025
3da8eae
Merge branch 'main' into make-logical-date-as-required-field-to-trigg…
vatsrahul1001 Feb 7, 2025
afc7383
fix tests
vatsrahul1001 Feb 7, 2025
a8bd93a
Merge branch 'make-logical-date-as-required-field-to-trigger-dag-api'…
vatsrahul1001 Feb 7, 2025
92d4c00
Merge branch 'main' into make-logical-date-as-required-field-to-trigg…
vatsrahul1001 Feb 7, 2025
590b55d
Merge branch 'main' into make-logical-date-as-required-field-to-trigg…
vatsrahul1001 Feb 10, 2025
b91f3fa
fix review comments
vatsrahul1001 Feb 10, 2025
8ce5073
Merge branch 'main' into make-logical-date-as-required-field-to-trigg…
vatsrahul1001 Feb 10, 2025
ccb7ed1
Update tests/api_fastapi/core_api/routes/public/test_dag_run.py
vatsrahul1001 Feb 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
from datetime import datetime
from enum import Enum

from pydantic import AwareDatetime, Field, NonNegativeInt, computed_field, model_validator
import pendulum
from pydantic import AwareDatetime, Field, NonNegativeInt, model_validator

from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
from airflow.models import DagRun
from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down Expand Up @@ -82,9 +82,9 @@ class TriggerDAGRunPostBody(StrictBaseModel):
"""Trigger DAG Run Serializer for POST body."""

dag_run_id: str | None = None
logical_date: AwareDatetime | None
data_interval_start: AwareDatetime | None = None
data_interval_end: AwareDatetime | None = None

conf: dict = Field(default_factory=dict)
note: str | None = None

Expand All @@ -96,18 +96,16 @@ def check_data_intervals(cls, values):
)
return values

## when logical date is null, the run id should be generated from run_after + random string.
# TODO: AIP83: we need to modify this validator after https://github.com/apache/airflow/pull/46398 is merged
@model_validator(mode="after")
def validate_dag_run_id(self):
if not self.dag_run_id:
self.dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, self.logical_date)
self.dag_run_id = DagRun.generate_run_id(
DagRunType.MANUAL, self.logical_date or pendulum.now("UTC")
)
return self

# Mypy issue https://github.com/python/mypy/issues/1362
@computed_field # type: ignore[misc]
@property
def logical_date(self) -> datetime:
return timezone.utcnow()


class DAGRunsBatchBody(StrictBaseModel):
"""List DAG Runs body for batch endpoint."""
Expand Down
8 changes: 8 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10501,6 +10501,12 @@ components:
- type: string
- type: 'null'
title: Dag Run Id
logical_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Logical Date
data_interval_start:
anyOf:
- type: string
Expand All @@ -10523,6 +10529,8 @@ components:
title: Note
additionalProperties: false
type: object
required:
- logical_date
title: TriggerDAGRunPostBody
description: Trigger DAG Run Serializer for POST body.
TriggerResponse:
Expand Down
20 changes: 7 additions & 13 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from airflow.models import DAG, DagModel, DagRun
from airflow.models.dag_version import DagVersion
from airflow.timetables.base import DataInterval
from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down Expand Up @@ -345,6 +346,7 @@ def trigger_dag_run(
) -> DAGRunResponse:
"""Trigger a DAG."""
dm = session.scalar(select(DagModel).where(DagModel.is_active, DagModel.dag_id == dag_id).limit(1))
now = pendulum.now("UTC")
if not dm:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id: '{dag_id}' not found")

Expand All @@ -354,7 +356,8 @@ def trigger_dag_run(
f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered",
)

logical_date = pendulum.instance(body.logical_date)
logical_date = timezone.coerce_datetime(body.logical_date)
Comment thread
Lee-W marked this conversation as resolved.
coerced_logical_date = timezone.coerce_datetime(logical_date)

try:
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
Expand All @@ -365,20 +368,11 @@ def trigger_dag_run(
end=pendulum.instance(body.data_interval_end),
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)

if body.dag_run_id:
run_id = body.dag_run_id
else:
run_id = dag.timetable.generate_run_id(
run_type=DagRunType.MANUAL,
logical_date=logical_date,
data_interval=data_interval,
)
Comment thread
uranusjr marked this conversation as resolved.
data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date or now)

dag_run = dag.create_dagrun(
run_id=run_id,
logical_date=logical_date,
run_id=cast(str, body.dag_run_id),
logical_date=coerced_logical_date,
data_interval=data_interval,
run_after=data_interval.end,
conf=body.conf,
Expand Down
13 changes: 13 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5816,6 +5816,18 @@ export const $TriggerDAGRunPostBody = {
],
title: "Dag Run Id",
},
logical_date: {
anyOf: [
{
type: "string",
format: "date-time",
},
{
type: "null",
},
],
title: "Logical Date",
},
data_interval_start: {
anyOf: [
{
Expand Down Expand Up @@ -5858,6 +5870,7 @@ export const $TriggerDAGRunPostBody = {
},
additionalProperties: false,
type: "object",
required: ["logical_date"],
title: "TriggerDAGRunPostBody",
description: "Trigger DAG Run Serializer for POST body.",
} as const;
Expand Down
1 change: 1 addition & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,7 @@ export type TimeDelta = {
*/
export type TriggerDAGRunPostBody = {
dag_run_id?: string | null;
logical_date: string | null;
data_interval_start?: string | null;
data_interval_end?: string | null;
conf?: {
Expand Down
1 change: 1 addition & 0 deletions airflow/ui/src/queries/useTrigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export const useTrigger = ({ dagId, onSuccessConfirm }: { dagId: string; onSucce
dag_run_id: checkDagRunId,
data_interval_end: formattedDataIntervalEnd,
data_interval_start: formattedDataIntervalStart,
logical_date: null,
note: checkNote,
},
});
Expand Down
66 changes: 50 additions & 16 deletions tests/api_fastapi/core_api/routes/public/test_dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1139,12 +1139,7 @@ def _dags_for_trigger_tests(self, session=None):
"dag_run_id, note, data_interval_start, data_interval_end",
[
("dag_run_5", "test-note", None, None),
(
"dag_run_6",
"test-note",
"2024-01-03T00:00:00+00:00",
"2024-01-04T05:00:00+00:00",
),
("dag_run_6", "test-note", "2024-01-03T00:00:00+00:00", "2024-01-04T05:00:00+00:00"),
(None, None, None, None),
],
)
Expand All @@ -1153,7 +1148,7 @@ def test_should_respond_200(
):
fixed_now = timezone.utcnow().isoformat()

request_json = {"note": note}
request_json = {"note": note, "logical_date": fixed_now}
if dag_run_id is not None:
request_json["dag_run_id"] = dag_run_id
if data_interval_start is not None:
Expand Down Expand Up @@ -1297,29 +1292,34 @@ def test_should_respond_200(
],
)
def test_invalid_data(self, test_client, post_body, expected_detail):
now = timezone.utcnow().isoformat()
post_body["logical_date"] = now
response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json=post_body)
assert response.status_code == 422
assert response.json() == expected_detail

@mock.patch("airflow.models.DAG.create_dagrun")
def test_dagrun_creation_exception_is_handled(self, mock_create_dagrun, test_client):
now = timezone.utcnow().isoformat()
error_message = "Encountered Error"

mock_create_dagrun.side_effect = ValueError(error_message)

response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={})
response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={"logical_date": now})
assert response.status_code == 400
assert response.json() == {"detail": error_message}

def test_should_respond_404_if_a_dag_is_inactive(self, test_client, session):
now = timezone.utcnow().isoformat()
self._dags_for_trigger_tests(session)
response = test_client.post("/public/dags/inactive/dagRuns", json={})
response = test_client.post("/public/dags/inactive/dagRuns", json={"logical_date": now})
assert response.status_code == 404
assert response.json()["detail"] == "DAG with dag_id: 'inactive' not found"

def test_should_respond_400_if_a_dag_has_import_errors(self, test_client, session):
now = timezone.utcnow().isoformat()
self._dags_for_trigger_tests(session)
response = test_client.post("/public/dags/import_errors/dagRuns", json={})
response = test_client.post("/public/dags/import_errors/dagRuns", json={"logical_date": now})
assert response.status_code == 400
assert (
response.json()["detail"]
Expand All @@ -1334,11 +1334,11 @@ def test_should_response_409_for_duplicate_logical_date(self, test_client):
note = "duplicate logical date test"
response_1 = test_client.post(
f"/public/dags/{DAG1_ID}/dagRuns",
json={"dag_run_id": RUN_ID_1, "note": note},
json={"dag_run_id": RUN_ID_1, "note": note, "logical_date": now},
)
response_2 = test_client.post(
f"/public/dags/{DAG1_ID}/dagRuns",
json={"dag_run_id": RUN_ID_2, "note": note},
json={"dag_run_id": RUN_ID_2, "note": note, "logical_date": now},
)

assert response_1.status_code == 200
Expand Down Expand Up @@ -1378,9 +1378,14 @@ def test_should_response_409_for_duplicate_logical_date(self, test_client):
def test_should_response_422_for_missing_start_date_or_end_date(
self, test_client, data_interval_start, data_interval_end
):
now = timezone.utcnow().isoformat()
response = test_client.post(
f"/public/dags/{DAG1_ID}/dagRuns",
json={"data_interval_start": data_interval_start, "data_interval_end": data_interval_end},
json={
"data_interval_start": data_interval_start,
"data_interval_end": data_interval_end,
"logical_date": now,
},
)
assert response.status_code == 422
assert (
Expand All @@ -1389,21 +1394,50 @@ def test_should_response_422_for_missing_start_date_or_end_date(
)

def test_raises_validation_error_for_invalid_params(self, test_client):
now = timezone.utcnow().isoformat()
response = test_client.post(
f"/public/dags/{DAG2_ID}/dagRuns",
json={"conf": {"validated_number": 5000}},
json={"conf": {"validated_number": 5000}, "logical_date": now},
)
assert response.status_code == 400
assert "Invalid input for param validated_number" in response.json()["detail"]

def test_response_404(self, test_client):
response = test_client.post("/public/dags/randoms/dagRuns", json={})
now = timezone.utcnow().isoformat()
response = test_client.post("/public/dags/randoms/dagRuns", json={"logical_date": now})
assert response.status_code == 404
assert response.json()["detail"] == "DAG with dag_id: 'randoms' not found"

def test_response_409(self, test_client):
response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={"dag_run_id": DAG1_RUN1_ID})
now = timezone.utcnow().isoformat()
response = test_client.post(
f"/public/dags/{DAG1_ID}/dagRuns", json={"dag_run_id": DAG1_RUN1_ID, "logical_date": now}
)
assert response.status_code == 409
response_json = response.json()
assert "detail" in response_json
assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error"]

def test_should_respond_200_with_null_logical_date(self, test_client):
response = test_client.post(
f"/public/dags/{DAG1_ID}/dagRuns",
json={"logical_date": None},
)
assert response.status_code == 200
assert response.json() == {
"dag_run_id": mock.ANY,
"dag_id": DAG1_ID,
"logical_date": None,
"queued_at": mock.ANY,
"start_date": None,
"end_date": None,
"data_interval_start": mock.ANY,
"data_interval_end": mock.ANY,
"last_scheduling_decision": None,
"run_type": "manual",
"state": "queued",
"external_trigger": True,
"triggered_by": "rest_api",
"conf": {},
"note": None,
}
5 changes: 3 additions & 2 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,11 @@ def test_dagrun_no_deadlock_with_depends_on_past(self, dag_maker, session):
run_type=DagRunType.SCHEDULED,
start_date=DEFAULT_DATE,
)
dr2 = dag_maker.create_dagrun_after(
dr,
next_date = DEFAULT_DATE + datetime.timedelta(days=1)
dr2 = dag_maker.create_dagrun(
run_id="test_dagrun_no_deadlock_2",
start_date=DEFAULT_DATE + datetime.timedelta(days=1),
logical_date=next_date,
)
ti1_op1 = dr.get_task_instance(task_id="dop")
dr2.get_task_instance(task_id="dop")
Expand Down