Skip to content

Commit

Permalink
A manual run can't look like a scheduled one (#28397)
Browse files Browse the repository at this point in the history
Fix #27818

(cherry picked from commit 7ccbe4e)
  • Loading branch information
uranusjr authored and ephraimbuddy committed Jan 12, 2023
1 parent ae49fcf commit cf966c8
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 5 deletions.
23 changes: 18 additions & 5 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2549,7 +2549,7 @@ def create_dagrun(
external_trigger: bool | None = False,
conf: dict | None = None,
run_type: DagRunType | None = None,
session=NEW_SESSION,
session: Session = NEW_SESSION,
dag_hash: str | None = None,
creating_job_id: int | None = None,
data_interval: tuple[datetime, datetime] | None = None,
Expand Down Expand Up @@ -2586,14 +2586,27 @@ def create_dagrun(
else:
data_interval = self.infer_automated_data_interval(logical_date)

if run_type is None or isinstance(run_type, DagRunType):
pass
elif isinstance(run_type, str): # Compatibility: run_type used to be a str.
run_type = DagRunType(run_type)
else:
raise ValueError(f"`run_type` should be a DagRunType, not {type(run_type)}")

if run_id: # Infer run_type from run_id if needed.
if not isinstance(run_id, str):
raise ValueError(f"`run_id` should be a str, not {type(run_id)}")
if not run_type:
run_type = DagRunType.from_run_id(run_id)
inferred_run_type = DagRunType.from_run_id(run_id)
if run_type is None:
# No explicit type given, use the inferred type.
run_type = inferred_run_type
elif run_type == DagRunType.MANUAL and inferred_run_type != DagRunType.MANUAL:
# Prevent a manual run from using an ID that looks like a scheduled run.
raise ValueError(
f"A {run_type.value} DAG run cannot use ID {run_id!r} since it "
f"is reserved for {inferred_run_type.value} runs"
)
elif run_type and logical_date is not None: # Generate run_id from run_type and execution_date.
if not isinstance(run_type, DagRunType):
raise ValueError(f"`run_type` should be a DagRunType, not {type(run_type)}")
run_id = self.timetable.generate_run_id(
run_type=run_type, logical_date=logical_date, data_interval=data_interval
)
Expand Down
20 changes: 20 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3147,3 +3147,23 @@ def generate_run_id(self, *, run_type, logical_date, data_interval, **extra) ->
)

assert dag_run.run_id == "abc"


@pytest.mark.parametrize(
"run_id_type",
[DagRunType.BACKFILL_JOB, DagRunType.SCHEDULED, DagRunType.DATASET_TRIGGERED],
)
def test_create_dagrun_disallow_manual_to_use_automated_run_id(run_id_type: DagRunType) -> None:
dag = DAG(dag_id="test", start_date=DEFAULT_DATE, schedule="@daily")
run_id = run_id_type.generate_run_id(DEFAULT_DATE)
with pytest.raises(ValueError) as ctx:
dag.create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
execution_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
state=DagRunState.QUEUED,
)
assert str(ctx.value) == (
f"A manual DAG run cannot use ID {run_id!r} since it is reserved for {run_id_type.value} runs"
)

0 comments on commit cf966c8

Please sign in to comment.