Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion airflow-core/src/airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,18 @@ def iter_next_dagrun_info() -> Iterator[DagRunInfo | None]:
else:
columns = ["logical_date", "data_interval.start", "data_interval.end", "run_after"]
getters = [(c, operator.attrgetter(c)) for c in columns]
AirflowConsole().print_as_table([{n: f(o) for n, f in getters} for o in iter_next_dagrun_info()])
rows = []
for info in iter_next_dagrun_info():
if info is None:
print(
"[WARN] No following schedule can be found. "
"This DAG may have schedule interval '@once' or `None`.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"This DAG may have schedule interval '@once' or `None`.",
"This Dag may have schedule interval '@once' or `None`.",

file=sys.stderr,
)
break
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iter_next_dagrun_info() generator naturally exhausts itself on None, making this break technically unnecessary.

For consistency with the existing non---table block below, we can just remove it here. No pressure though, either way is fine.

rows.append({n: f(info) for n, f in getters})
if rows:
AirflowConsole().print_as_table(rows)
return

if args.field:
Expand Down
58 changes: 58 additions & 0 deletions airflow-core/tests/unit/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,64 @@ def test_next_execution(self, dag_id, delta, schedule, catchup, first, second, t
clear_db_dags()
parse_and_sync_to_db(os.devnull, include_examples=True)

def test_next_execution_table_with_no_schedule(self, tmp_path, capsys):
dag_id = "future_schedule_none_table"
file_content = os.linesep.join(
[
"from airflow import DAG",
"from airflow.providers.standard.operators.empty import EmptyOperator",
"from datetime import timedelta; from pendulum import today",
f"dag = DAG('{dag_id}', start_date=today(tz='UTC') + timedelta(days=5), schedule=None, catchup=True)",
"task = EmptyOperator(task_id='empty_task',dag=dag)",
]
)
dag_file = tmp_path / f"{dag_id}.py"
dag_file.write_text(file_content)

with time_machine.travel(DEFAULT_DATE):
clear_db_dags()
parse_and_sync_to_db(tmp_path, include_examples=False)

args = self.parser.parse_args(["dags", "next-execution", dag_id, "--table"])
dag_command.dag_next_execution(args)
captured = capsys.readouterr()

assert captured.out == ""
assert "[WARN] No following schedule can be found." in captured.err

clear_db_dags()
parse_and_sync_to_db(os.devnull, include_examples=True)

def test_next_execution_table_with_once_and_multiple_executions(self, tmp_path, capsys):
dag_id = "past_schedule_once_table"
file_content = os.linesep.join(
[
"from airflow import DAG",
"from airflow.providers.standard.operators.empty import EmptyOperator",
"from datetime import timedelta; from pendulum import today",
f"dag = DAG('{dag_id}', start_date=today(tz='UTC') + timedelta(days=-5), schedule='@once', catchup=True)",
"task = EmptyOperator(task_id='empty_task',dag=dag)",
]
)
dag_file = tmp_path / f"{dag_id}.py"
dag_file.write_text(file_content)

with time_machine.travel(DEFAULT_DATE):
clear_db_dags()
parse_and_sync_to_db(tmp_path, include_examples=False)

args = self.parser.parse_args(
["dags", "next-execution", dag_id, "--table", "--num-executions", "2"]
)
dag_command.dag_next_execution(args)
captured = capsys.readouterr()

assert dec_27.isoformat() in captured.out
assert "[WARN] No following schedule can be found." in captured.err

clear_db_dags()
parse_and_sync_to_db(os.devnull, include_examples=True)

@conf_vars({("core", "load_examples"): "true"})
def test_cli_report(self, stdout_capture):
args = self.parser.parse_args(["dags", "report", "--output", "json"])
Expand Down
Loading