Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relax mandatory requirement for start_date when schedule=None #35356

Merged
merged 6 commits into from Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 8 additions & 3 deletions airflow/models/dag.py
Expand Up @@ -548,6 +548,13 @@ def __init__(

# sort out DAG's scheduling behavior
scheduling_args = [schedule_interval, timetable, schedule]

has_scheduling_args = any(a is not NOTSET and bool(a) for a in scheduling_args)
has_empty_start_date = not ("start_date" in self.default_args or self.start_date)

if has_scheduling_args and has_empty_start_date:
raise ValueError("DAG is missing the start_date parameter")

if not at_most_one(*scheduling_args):
raise ValueError("At most one allowed for args 'schedule_interval', 'timetable', and 'schedule'.")
if schedule_interval is not NOTSET:
Expand Down Expand Up @@ -2618,10 +2625,8 @@ def add_task(self, task: Operator) -> None:

from airflow.utils.task_group import TaskGroupContext

if not self.start_date and not task.start_date:
raise AirflowException("DAG is missing the start_date parameter")
# if the task has no start date, assign it the same as the DAG
elif not task.start_date:
if not task.start_date:
task.start_date = self.start_date
# otherwise, the task will start on the later of its own start date and
# the DAG's start date
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dagrun.py
Expand Up @@ -1116,7 +1116,7 @@ def verify_integrity(self, *, session: Session = NEW_SESSION) -> None:
def task_filter(task: Operator) -> bool:
return task.task_id not in task_ids and (
self.is_backfill
or task.start_date <= self.execution_date
or (task.start_date is None or task.start_date <= self.execution_date)
and (task.end_date is None or self.execution_date <= task.end_date)
)

Expand Down
52 changes: 33 additions & 19 deletions tests/models/test_dag.py
Expand Up @@ -765,7 +765,7 @@ def test_following_schedule_relativedelta(self):
"""
dag_id = "test_schedule_dag_relativedelta"
delta = relativedelta(hours=+1)
dag = DAG(dag_id=dag_id, schedule=delta)
dag = DAG(dag_id=dag_id, schedule=delta, start_date=TEST_DATE)
dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE))

_next = dag.following_schedule(TEST_DATE)
Expand All @@ -780,7 +780,7 @@ def test_following_schedule_relativedelta_with_deprecated_schedule_interval(self
"""
dag_id = "test_schedule_dag_relativedelta"
delta = relativedelta(hours=+1)
dag = DAG(dag_id=dag_id, schedule_interval=delta)
dag = DAG(dag_id=dag_id, schedule_interval=delta, start_date=TEST_DATE)
dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE))

_next = dag.following_schedule(TEST_DATE)
Expand All @@ -799,7 +799,7 @@ def test_following_schedule_relativedelta_with_depr_schedule_interval_decorated_
dag_id = "test_schedule_dag_relativedelta"
delta = relativedelta(hours=+1)

@dag(dag_id=dag_id, schedule_interval=delta)
@dag(dag_id=dag_id, schedule_interval=delta, start_date=TEST_DATE)
def mydag():
BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE)

Expand Down Expand Up @@ -827,6 +827,20 @@ def test_following_schedule_datetime_timezone(self):
when = dag.following_schedule(start)
assert when.isoformat() == "2018-03-25T03:00:00+00:00"

def test_create_dagrun_when_schedule_is_none_and_empty_start_date(self):
# Check that we don't get an AttributeError 'start_date' for self.start_date when schedule is none
dag = DAG("dag_with_none_schedule_and_empty_start_date")
dag.add_task(BaseOperator(task_id="task_without_start_date"))
dagrun = dag.create_dagrun(
state=State.RUNNING, run_type=DagRunType.MANUAL, execution_date=DEFAULT_DATE
)
assert dagrun is not None

def test_fail_dag_when_schedule_is_non_none_and_empty_start_date(self):
# Check that we get a ValueError 'start_date' for self.start_date when schedule is non-none
with pytest.raises(ValueError, match="DAG is missing the start_date parameter"):
DAG(dag_id="dag_with_non_none_schedule_and_empty_start_date", schedule="@hourly")

def test_following_schedule_datetime_timezone_utc0530(self):
# Check that we don't get an AttributeError 'name' for self.timezone
class UTC0530(datetime.tzinfo):
Expand Down Expand Up @@ -942,8 +956,8 @@ def test_bulk_write_to_db_interval_save_runtime(self, interval):
mock_active_runs_of_dags = mock.MagicMock(side_effect=DagRun.active_runs_of_dags)
with mock.patch.object(DagRun, "active_runs_of_dags", mock_active_runs_of_dags):
dags_null_timetable = [
DAG("dag-interval-None", schedule_interval=None),
DAG("dag-interval-test", schedule_interval=interval),
DAG("dag-interval-None", schedule_interval=None, start_date=TEST_DATE),
DAG("dag-interval-test", schedule_interval=interval, start_date=TEST_DATE),
]
DAG.bulk_write_to_db(dags_null_timetable, session=settings.Session())
if interval:
Expand Down Expand Up @@ -1530,7 +1544,7 @@ def test_schedule_dag_once(self):
it is called, and not scheduled the second.
"""
dag_id = "test_schedule_dag_once"
dag = DAG(dag_id=dag_id, schedule="@once")
dag = DAG(dag_id=dag_id, schedule="@once", start_date=TEST_DATE)
assert isinstance(dag.timetable, OnceTimetable)
dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE))

Expand All @@ -1553,7 +1567,7 @@ def test_fractional_seconds(self):
Tests if fractional seconds are stored in the database
"""
dag_id = "test_fractional_seconds"
dag = DAG(dag_id=dag_id, schedule="@once")
dag = DAG(dag_id=dag_id, schedule="@once", start_date=TEST_DATE)
dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE))

start_date = timezone.utcnow()
Expand Down Expand Up @@ -1658,25 +1672,25 @@ def test_get_paused_dag_ids(self):
def test_timetable_and_description_from_schedule_interval_arg(
self, schedule_interval_arg, expected_timetable, interval_description
):
dag = DAG("test_schedule_interval_arg", schedule=schedule_interval_arg)
dag = DAG("test_schedule_interval_arg", schedule=schedule_interval_arg, start_date=TEST_DATE)
assert dag.timetable == expected_timetable
assert dag.schedule_interval == schedule_interval_arg
assert dag.timetable.description == interval_description

def test_timetable_and_description_from_dataset(self):
dag = DAG("test_schedule_interval_arg", schedule=[Dataset(uri="hello")])
dag = DAG("test_schedule_interval_arg", schedule=[Dataset(uri="hello")], start_date=TEST_DATE)
assert dag.timetable == DatasetTriggeredTimetable()
assert dag.schedule_interval == "Dataset"
assert dag.timetable.description == "Triggered by datasets"

def test_schedule_interval_still_works(self):
dag = DAG("test_schedule_interval_arg", schedule_interval="*/5 * * * *")
dag = DAG("test_schedule_interval_arg", schedule_interval="*/5 * * * *", start_date=TEST_DATE)
assert dag.timetable == cron_timetable("*/5 * * * *")
assert dag.schedule_interval == "*/5 * * * *"
assert dag.timetable.description == "Every 5 minutes"

def test_timetable_still_works(self):
dag = DAG("test_schedule_interval_arg", timetable=cron_timetable("*/6 * * * *"))
dag = DAG("test_schedule_interval_arg", timetable=cron_timetable("*/6 * * * *"), start_date=TEST_DATE)
assert dag.timetable == cron_timetable("*/6 * * * *")
assert dag.schedule_interval == "*/6 * * * *"
assert dag.timetable.description == "Every 6 minutes"
Expand All @@ -1702,7 +1716,7 @@ def test_timetable_still_works(self):
],
)
def test_description_from_timetable(self, timetable, expected_description):
dag = DAG("test_schedule_interval_description", timetable=timetable)
dag = DAG("test_schedule_interval_description", timetable=timetable, start_date=TEST_DATE)
assert dag.timetable == timetable
assert dag.timetable.description == expected_description

Expand Down Expand Up @@ -2449,7 +2463,7 @@ def test_return_date_range_with_num_method(self):
start_date = TEST_DATE
delta = timedelta(days=1)

dag = DAG("dummy-dag", schedule=delta)
dag = DAG("dummy-dag", schedule=delta, start_date=start_date)
dag_dates = dag.date_range(start_date=start_date, num=3)

assert dag_dates == [
Expand Down Expand Up @@ -2502,10 +2516,10 @@ def test_dag_owner_links(self):
)
def test_schedule_dag_param(self, kwargs):
with pytest.raises(ValueError, match="At most one"):
with DAG(dag_id="hello", **kwargs):
with DAG(dag_id="hello", start_date=TEST_DATE, **kwargs):
pass

def test_continuous_schedule_interval_limits_max_active_runs(self):
def test_continuous_schedule_interval_linmits_max_active_runs(self):
dag = DAG("continuous", start_date=DEFAULT_DATE, schedule_interval="@continuous", max_active_runs=1)
assert isinstance(dag.timetable, ContinuousTimetable)
assert dag.max_active_runs == 1
Expand Down Expand Up @@ -3010,19 +3024,19 @@ def mydag():

@pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()])
def test_dag_timetable_match_schedule_interval(timetable):
dag = DAG("my-dag", timetable=timetable)
dag = DAG("my-dag", timetable=timetable, start_date=DEFAULT_DATE)
assert dag._check_schedule_interval_matches_timetable()


@pytest.mark.parametrize("schedule_interval", [None, "@once", "@daily", timedelta(days=1)])
def test_dag_schedule_interval_match_timetable(schedule_interval):
dag = DAG("my-dag", schedule=schedule_interval)
dag = DAG("my-dag", schedule=schedule_interval, start_date=DEFAULT_DATE)
assert dag._check_schedule_interval_matches_timetable()


@pytest.mark.parametrize("schedule_interval", [None, "@daily", timedelta(days=1)])
def test_dag_schedule_interval_change_after_init(schedule_interval):
dag = DAG("my-dag", timetable=OnceTimetable())
dag = DAG("my-dag", timetable=OnceTimetable(), start_date=DEFAULT_DATE)
dag.schedule_interval = schedule_interval
assert not dag._check_schedule_interval_matches_timetable()

Expand Down Expand Up @@ -3391,7 +3405,7 @@ def test_get_next_data_interval(
data_interval_end,
expected_data_interval,
):
dag = DAG(dag_id="test_get_next_data_interval", schedule="@daily")
dag = DAG(dag_id="test_get_next_data_interval", schedule="@daily", start_date=DEFAULT_DATE)
dag_model = DagModel(
dag_id="test_get_next_data_interval",
next_dagrun=logical_date,
Expand Down
4 changes: 3 additions & 1 deletion tests/providers/google/cloud/sensors/test_gcs.py
Expand Up @@ -227,7 +227,9 @@ def test_gcs_object_existence_async_sensor_execute_complete(self):
class TestTsFunction:
def test_should_support_datetime(self):
context = {
"dag": DAG(dag_id=TEST_DAG_ID, schedule=timedelta(days=5)),
"dag": DAG(
dag_id=TEST_DAG_ID, schedule=timedelta(days=5), start_date=datetime(2019, 2, 14, 0, 0)
),
"execution_date": datetime(2019, 2, 14, 0, 0),
}
result = ts_function(context)
Expand Down