Skip to content

Commit

Permalink
Relax mandatory requirement for start_date when schedule=None (#35356)
Browse files Browse the repository at this point in the history
* Relax mandatory requirement for start_date when schedule=None

* Updated run_type in unit tests

* Added check for empty start_date and non empty schedule

* Fix the build failures

* Fix the build failures

* Update based on review comments

(cherry picked from commit 930f165)
  • Loading branch information
vishnucoder1 authored and ephraimbuddy committed Dec 5, 2023
1 parent 713dfdf commit bc95360
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 24 deletions.
11 changes: 8 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,13 @@ def __init__(

# sort out DAG's scheduling behavior
scheduling_args = [schedule_interval, timetable, schedule]

has_scheduling_args = any(a is not NOTSET and bool(a) for a in scheduling_args)
has_empty_start_date = not ("start_date" in self.default_args or self.start_date)

if has_scheduling_args and has_empty_start_date:
raise ValueError("DAG is missing the start_date parameter")

if not at_most_one(*scheduling_args):
raise ValueError("At most one allowed for args 'schedule_interval', 'timetable', and 'schedule'.")
if schedule_interval is not NOTSET:
Expand Down Expand Up @@ -2618,10 +2625,8 @@ def add_task(self, task: Operator) -> None:

from airflow.utils.task_group import TaskGroupContext

if not self.start_date and not task.start_date:
raise AirflowException("DAG is missing the start_date parameter")
# if the task has no start date, assign it the same as the DAG
elif not task.start_date:
if not task.start_date:
task.start_date = self.start_date
# otherwise, the task will start on the later of its own start date and
# the DAG's start date
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,7 @@ def verify_integrity(self, *, session: Session = NEW_SESSION) -> None:
def task_filter(task: Operator) -> bool:
return task.task_id not in task_ids and (
self.is_backfill
or task.start_date <= self.execution_date
or (task.start_date is None or task.start_date <= self.execution_date)
and (task.end_date is None or self.execution_date <= task.end_date)
)

Expand Down
52 changes: 33 additions & 19 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ def test_following_schedule_relativedelta(self):
"""
dag_id = "test_schedule_dag_relativedelta"
delta = relativedelta(hours=+1)
dag = DAG(dag_id=dag_id, schedule=delta)
dag = DAG(dag_id=dag_id, schedule=delta, start_date=TEST_DATE)
dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE))

_next = dag.following_schedule(TEST_DATE)
Expand All @@ -780,7 +780,7 @@ def test_following_schedule_relativedelta_with_deprecated_schedule_interval(self
"""
dag_id = "test_schedule_dag_relativedelta"
delta = relativedelta(hours=+1)
dag = DAG(dag_id=dag_id, schedule_interval=delta)
dag = DAG(dag_id=dag_id, schedule_interval=delta, start_date=TEST_DATE)
dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE))

_next = dag.following_schedule(TEST_DATE)
Expand All @@ -799,7 +799,7 @@ def test_following_schedule_relativedelta_with_depr_schedule_interval_decorated_
dag_id = "test_schedule_dag_relativedelta"
delta = relativedelta(hours=+1)

@dag(dag_id=dag_id, schedule_interval=delta)
@dag(dag_id=dag_id, schedule_interval=delta, start_date=TEST_DATE)
def mydag():
BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE)

Expand Down Expand Up @@ -827,6 +827,20 @@ def test_following_schedule_datetime_timezone(self):
when = dag.following_schedule(start)
assert when.isoformat() == "2018-03-25T03:00:00+00:00"

def test_create_dagrun_when_schedule_is_none_and_empty_start_date(self):
# Check that we don't get an AttributeError 'start_date' for self.start_date when schedule is none
dag = DAG("dag_with_none_schedule_and_empty_start_date")
dag.add_task(BaseOperator(task_id="task_without_start_date"))
dagrun = dag.create_dagrun(
state=State.RUNNING, run_type=DagRunType.MANUAL, execution_date=DEFAULT_DATE
)
assert dagrun is not None

def test_fail_dag_when_schedule_is_non_none_and_empty_start_date(self):
# Check that we get a ValueError 'start_date' for self.start_date when schedule is non-none
with pytest.raises(ValueError, match="DAG is missing the start_date parameter"):
DAG(dag_id="dag_with_non_none_schedule_and_empty_start_date", schedule="@hourly")

def test_following_schedule_datetime_timezone_utc0530(self):
# Check that we don't get an AttributeError 'name' for self.timezone
class UTC0530(datetime.tzinfo):
Expand Down Expand Up @@ -942,8 +956,8 @@ def test_bulk_write_to_db_interval_save_runtime(self, interval):
mock_active_runs_of_dags = mock.MagicMock(side_effect=DagRun.active_runs_of_dags)
with mock.patch.object(DagRun, "active_runs_of_dags", mock_active_runs_of_dags):
dags_null_timetable = [
DAG("dag-interval-None", schedule_interval=None),
DAG("dag-interval-test", schedule_interval=interval),
DAG("dag-interval-None", schedule_interval=None, start_date=TEST_DATE),
DAG("dag-interval-test", schedule_interval=interval, start_date=TEST_DATE),
]
DAG.bulk_write_to_db(dags_null_timetable, session=settings.Session())
if interval:
Expand Down Expand Up @@ -1530,7 +1544,7 @@ def test_schedule_dag_once(self):
it is called, and not scheduled the second.
"""
dag_id = "test_schedule_dag_once"
dag = DAG(dag_id=dag_id, schedule="@once")
dag = DAG(dag_id=dag_id, schedule="@once", start_date=TEST_DATE)
assert isinstance(dag.timetable, OnceTimetable)
dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE))

Expand All @@ -1553,7 +1567,7 @@ def test_fractional_seconds(self):
Tests if fractional seconds are stored in the database
"""
dag_id = "test_fractional_seconds"
dag = DAG(dag_id=dag_id, schedule="@once")
dag = DAG(dag_id=dag_id, schedule="@once", start_date=TEST_DATE)
dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE))

start_date = timezone.utcnow()
Expand Down Expand Up @@ -1658,25 +1672,25 @@ def test_get_paused_dag_ids(self):
def test_timetable_and_description_from_schedule_interval_arg(
self, schedule_interval_arg, expected_timetable, interval_description
):
dag = DAG("test_schedule_interval_arg", schedule=schedule_interval_arg)
dag = DAG("test_schedule_interval_arg", schedule=schedule_interval_arg, start_date=TEST_DATE)
assert dag.timetable == expected_timetable
assert dag.schedule_interval == schedule_interval_arg
assert dag.timetable.description == interval_description

def test_timetable_and_description_from_dataset(self):
dag = DAG("test_schedule_interval_arg", schedule=[Dataset(uri="hello")])
dag = DAG("test_schedule_interval_arg", schedule=[Dataset(uri="hello")], start_date=TEST_DATE)
assert dag.timetable == DatasetTriggeredTimetable()
assert dag.schedule_interval == "Dataset"
assert dag.timetable.description == "Triggered by datasets"

def test_schedule_interval_still_works(self):
dag = DAG("test_schedule_interval_arg", schedule_interval="*/5 * * * *")
dag = DAG("test_schedule_interval_arg", schedule_interval="*/5 * * * *", start_date=TEST_DATE)
assert dag.timetable == cron_timetable("*/5 * * * *")
assert dag.schedule_interval == "*/5 * * * *"
assert dag.timetable.description == "Every 5 minutes"

def test_timetable_still_works(self):
dag = DAG("test_schedule_interval_arg", timetable=cron_timetable("*/6 * * * *"))
dag = DAG("test_schedule_interval_arg", timetable=cron_timetable("*/6 * * * *"), start_date=TEST_DATE)
assert dag.timetable == cron_timetable("*/6 * * * *")
assert dag.schedule_interval == "*/6 * * * *"
assert dag.timetable.description == "Every 6 minutes"
Expand All @@ -1702,7 +1716,7 @@ def test_timetable_still_works(self):
],
)
def test_description_from_timetable(self, timetable, expected_description):
dag = DAG("test_schedule_interval_description", timetable=timetable)
dag = DAG("test_schedule_interval_description", timetable=timetable, start_date=TEST_DATE)
assert dag.timetable == timetable
assert dag.timetable.description == expected_description

Expand Down Expand Up @@ -2449,7 +2463,7 @@ def test_return_date_range_with_num_method(self):
start_date = TEST_DATE
delta = timedelta(days=1)

dag = DAG("dummy-dag", schedule=delta)
dag = DAG("dummy-dag", schedule=delta, start_date=start_date)
dag_dates = dag.date_range(start_date=start_date, num=3)

assert dag_dates == [
Expand Down Expand Up @@ -2502,10 +2516,10 @@ def test_dag_owner_links(self):
)
def test_schedule_dag_param(self, kwargs):
with pytest.raises(ValueError, match="At most one"):
with DAG(dag_id="hello", **kwargs):
with DAG(dag_id="hello", start_date=TEST_DATE, **kwargs):
pass

def test_continuous_schedule_interval_limits_max_active_runs(self):
def test_continuous_schedule_interval_linmits_max_active_runs(self):
dag = DAG("continuous", start_date=DEFAULT_DATE, schedule_interval="@continuous", max_active_runs=1)
assert isinstance(dag.timetable, ContinuousTimetable)
assert dag.max_active_runs == 1
Expand Down Expand Up @@ -3010,19 +3024,19 @@ def mydag():

@pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()])
def test_dag_timetable_match_schedule_interval(timetable):
dag = DAG("my-dag", timetable=timetable)
dag = DAG("my-dag", timetable=timetable, start_date=DEFAULT_DATE)
assert dag._check_schedule_interval_matches_timetable()


@pytest.mark.parametrize("schedule_interval", [None, "@once", "@daily", timedelta(days=1)])
def test_dag_schedule_interval_match_timetable(schedule_interval):
dag = DAG("my-dag", schedule=schedule_interval)
dag = DAG("my-dag", schedule=schedule_interval, start_date=DEFAULT_DATE)
assert dag._check_schedule_interval_matches_timetable()


@pytest.mark.parametrize("schedule_interval", [None, "@daily", timedelta(days=1)])
def test_dag_schedule_interval_change_after_init(schedule_interval):
dag = DAG("my-dag", timetable=OnceTimetable())
dag = DAG("my-dag", timetable=OnceTimetable(), start_date=DEFAULT_DATE)
dag.schedule_interval = schedule_interval
assert not dag._check_schedule_interval_matches_timetable()

Expand Down Expand Up @@ -3391,7 +3405,7 @@ def test_get_next_data_interval(
data_interval_end,
expected_data_interval,
):
dag = DAG(dag_id="test_get_next_data_interval", schedule="@daily")
dag = DAG(dag_id="test_get_next_data_interval", schedule="@daily", start_date=DEFAULT_DATE)
dag_model = DagModel(
dag_id="test_get_next_data_interval",
next_dagrun=logical_date,
Expand Down
4 changes: 3 additions & 1 deletion tests/providers/google/cloud/sensors/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ def test_gcs_object_existence_async_sensor_execute_complete(self):
class TestTsFunction:
def test_should_support_datetime(self):
context = {
"dag": DAG(dag_id=TEST_DAG_ID, schedule=timedelta(days=5)),
"dag": DAG(
dag_id=TEST_DAG_ID, schedule=timedelta(days=5), start_date=datetime(2019, 2, 14, 0, 0)
),
"execution_date": datetime(2019, 2, 14, 0, 0),
}
result = ts_function(context)
Expand Down

0 comments on commit bc95360

Please sign in to comment.