Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,6 @@ def __init__(self, flags=None, help=None, action=None, default=None, nargs=None,
help="Tree view",
action="store_true")

# list_dags
ARG_REPORT = Arg(
("-r", "--report"),
help="Show DagBag loading report",
action="store_true")

# clear
ARG_UPSTREAM = Arg(
("-u", "--upstream"),
Expand Down Expand Up @@ -665,7 +659,13 @@ class GroupCommand(NamedTuple):
name='list',
help="List all the DAGs",
func=lazy_load_command('airflow.cli.commands.dag_command.dag_list_dags'),
args=(ARG_SUBDIR, ARG_REPORT),
args=(ARG_SUBDIR, ARG_OUTPUT),
),
ActionCommand(
name='report',
help='Show DagBag loading report',
func=lazy_load_command('airflow.cli.commands.dag_command.dag_report'),
args=(ARG_SUBDIR, ARG_OUTPUT),
),
ActionCommand(
name='list_runs',
Expand Down
39 changes: 26 additions & 13 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import logging
import signal
import subprocess
import textwrap
from typing import List

from tabulate import tabulate
Expand All @@ -40,7 +39,7 @@
from airflow.utils.session import create_session


def _tabulate_dag_runs(dag_runs: List[DagRun], tablefmt="fancy_grid"):
def _tabulate_dag_runs(dag_runs: List[DagRun], tablefmt: str = "fancy_grid") -> str:
tabulat_data = (
{
'ID': dag_run.id,
Expand All @@ -52,12 +51,27 @@ def _tabulate_dag_runs(dag_runs: List[DagRun], tablefmt="fancy_grid"):
'End date': dag_run.end_date.isoformat() if dag_run.end_date else '',
} for dag_run in dag_runs
)
return "\n%s" % tabulate(
return tabulate(
tabular_data=tabulat_data,
tablefmt=tablefmt
)


def _tabulate_dags(dags: List[DAG], tablefmt: str = "fancy_grid") -> str:
tabulat_data = (
{
'DAG ID': dag.dag_id,
'Filepath': dag.filepath,
'Owner': dag.owner,
} for dag in sorted(dags, key=lambda d: d.dag_id)
)
return tabulate(
tabular_data=tabulat_data,
tablefmt=tablefmt,
headers='keys'
)


@cli_utils.action_logging
def dag_backfill(args, dag=None):
"""Creates backfill job or dry run for a DAG"""
Expand Down Expand Up @@ -260,16 +274,15 @@ def dag_next_execution(args):
def dag_list_dags(args):
"""Displays dags with or without stats at the command line"""
dagbag = DagBag(process_subdir(args.subdir))
list_template = textwrap.dedent("""\n
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
{dag_list}
""")
dag_list = "\n".join(sorted(dagbag.dags))
print(list_template.format(dag_list=dag_list))
if args.report:
print(dagbag.dagbag_report())
dags = dagbag.dags.values()
print(_tabulate_dags(dags, tablefmt=args.output))


@cli_utils.action_logging
def dag_report(args):
"""Displays dagbag stats at the command line"""
dagbag = DagBag(process_subdir(args.subdir))
print(tabulate(dagbag.dagbag_stats, headers="keys", tablefmt=args.output))


@cli_utils.action_logging
Expand Down
25 changes: 23 additions & 2 deletions tests/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from airflow.settings import Session
from airflow.utils import timezone
from airflow.utils.state import State
from tests.test_utils.config import conf_vars

dag_folder_path = '/'.join(os.path.realpath(__file__).split('/')[:-1])

Expand Down Expand Up @@ -302,9 +303,29 @@ def reset_dr_db(dag_id):

reset_dr_db(dag_id)

@conf_vars({
('core', 'load_examples'): 'true'
})
def test_cli_report(self):
args = self.parser.parse_args(['dags', 'report'])
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
dag_command.dag_report(args)
out = temp_stdout.getvalue()

self.assertIn("airflow/example_dags/example_complex.py ", out)
self.assertIn("['example_complex']", out)

@conf_vars({
('core', 'load_examples'): 'true'
})
def test_cli_list_dags(self):
args = self.parser.parse_args(['dags', 'list', '--report'])
dag_command.dag_list_dags(args)
args = self.parser.parse_args(['dags', 'list'])
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
dag_command.dag_list_dags(args)
out = temp_stdout.getvalue()
self.assertIn("Owner", out)
self.assertIn("│ airflow │", out)
self.assertIn("airflow/example_dags/example_complex.py", out)

def test_cli_list_dag_runs(self):
dag_command.dag_trigger(self.parser.parse_args([
Expand Down