Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def dag_backfill(args, dag=None):

signal.signal(signal.SIGTERM, sigint_handler)

dag = dag or get_dag(args)
dag = dag or get_dag(args.subdir, args.dag_id)

if not args.start_date and not args.end_date:
raise AirflowException("Provide a start_date and/or end_date")
Expand Down Expand Up @@ -170,7 +170,7 @@ def set_is_paused(is_paused, args):

def dag_show(args):
"""Displays DAG or saves it's graphic representation to the file"""
dag = get_dag(args)
dag = get_dag(args.subdir, args.dag_id)
dot = render_dag(dag)
if args.save:
filename, _, fileformat = args.save.rpartition('.')
Expand Down Expand Up @@ -203,7 +203,7 @@ def dag_state(args):
>>> airflow dags state tutorial 2015-01-01T00:00:00.000000
running
"""
dag = get_dag(args)
dag = get_dag(args.subdir, args.dag_id)
dr = DagRun.find(dag.dag_id, execution_date=args.execution_date)
print(dr[0].state if len(dr) > 0 else None) # pylint: disable=len-as-condition

Expand All @@ -215,7 +215,7 @@ def dag_next_execution(args):
>>> airflow dags next_execution tutorial
2018-08-31 10:38:00
"""
dag = get_dag(args)
dag = get_dag(args.subdir, args.dag_id)

if dag.is_paused:
print("[INFO] Please be reminded this DAG is PAUSED now.")
Expand Down
14 changes: 7 additions & 7 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def task_run(args, dag=None):
settings.configure_orm(disable_connection_pool=True)

if not args.pickle and not dag:
dag = get_dag(args)
dag = get_dag(args.subdir, args.dag_id)
elif not dag:
with db.create_session() as session:
print(f'Loading pickle id {args.pickle}')
Expand Down Expand Up @@ -148,7 +148,7 @@ def task_failed_deps(args):
Trigger Rule: Task's trigger rule 'all_success' requires all upstream tasks
to have succeeded, but found 1 non-success(es).
"""
dag = get_dag(args)
dag = get_dag(args.subdir, args.dag_id)
task = dag.get_task(task_id=args.task_id)
ti = TaskInstance(task, args.execution_date)

Expand All @@ -170,7 +170,7 @@ def task_state(args):
>>> airflow tasks state tutorial sleep 2015-01-01
success
"""
dag = get_dag(args)
dag = get_dag(args.subdir, args.dag_id)
task = dag.get_task(task_id=args.task_id)
ti = TaskInstance(task, args.execution_date)
print(ti.current_state())
Expand All @@ -179,7 +179,7 @@ def task_state(args):
@cli_utils.action_logging
def task_list(args, dag=None):
"""Lists the tasks within a DAG at the command line"""
dag = dag or get_dag(args)
dag = dag or get_dag(args.subdir, args.dag_id)
if args.tree:
dag.tree_view()
else:
Expand All @@ -202,7 +202,7 @@ def task_test(args, dag=None):
if not already_has_stream_handler:
logging.getLogger('airflow.task').propagate = True

dag = dag or get_dag(args)
dag = dag or get_dag(args.subdir, args.dag_id)

task = dag.get_task(task_id=args.task_id)
# Add CLI provided task_params to task.params
Expand Down Expand Up @@ -235,7 +235,7 @@ def task_test(args, dag=None):
@cli_utils.action_logging
def task_render(args):
"""Renders and displays templated fields for a given task"""
dag = get_dag(args)
dag = get_dag(args.subdir, args.dag_id)
task = dag.get_task(task_id=args.task_id)
ti = TaskInstance(task, args.execution_date)
ti.render_templates()
Expand All @@ -254,7 +254,7 @@ def task_clear(args):
logging.basicConfig(
level=settings.LOGGING_LEVEL,
format=settings.SIMPLE_LOG_FORMAT)
dags = get_dags(args)
dags = get_dags(args.subdir, args.dag_id, use_regex=args.dag_regex)

if args.task_regex:
for idx, dag in enumerate(dags):
Expand Down
28 changes: 15 additions & 13 deletions airflow/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import traceback
from argparse import Namespace
from datetime import datetime
from typing import Optional

from airflow import AirflowException, settings
from airflow.models import DagBag, Log
Expand Down Expand Up @@ -126,35 +127,36 @@ def _build_metrics(func_name, namespace):
return metrics


def process_subdir(subdir):
def process_subdir(subdir: Optional[str]):
"""Expands path to absolute by replacing 'DAGS_FOLDER', '~', '.', etc."""
if subdir:
if not settings.DAGS_FOLDER:
raise ValueError("DAGS_FOLDER variable in settings should be fil.")
subdir = subdir.replace('DAGS_FOLDER', settings.DAGS_FOLDER)
subdir = os.path.abspath(os.path.expanduser(subdir))
return subdir


def get_dag(args):
def get_dag(subdir: Optional[str], dag_id: str):
"""Returns DAG of a given dag_id"""
dagbag = DagBag(process_subdir(args.subdir))
if args.dag_id not in dagbag.dags:
dagbag = DagBag(process_subdir(subdir))
if dag_id not in dagbag.dags:
raise AirflowException(
'dag_id could not be found: {}. Either the dag did not exist or it failed to '
'parse.'.format(args.dag_id))
return dagbag.dags[args.dag_id]
'parse.'.format(dag_id))
return dagbag.dags[dag_id]


def get_dags(args):
def get_dags(subdir: Optional[str], dag_id: str, use_regex: bool = False):
"""Returns DAG(s) matching a given regex or dag_id"""
if not args.dag_regex:
return [get_dag(args)]
dagbag = DagBag(process_subdir(args.subdir))
matched_dags = [dag for dag in dagbag.dags.values() if re.search(
args.dag_id, dag.dag_id)]
if not use_regex:
return [get_dag(subdir, dag_id)]
dagbag = DagBag(process_subdir(subdir))
matched_dags = [dag for dag in dagbag.dags.values() if re.search(dag_id, dag.dag_id)]
if not matched_dags:
raise AirflowException(
'dag_id could not be found with regex: {}. Either the dag did not exist '
'or it failed to parse.'.format(args.dag_id))
'or it failed to parse.'.format(dag_id))
return matched_dags


Expand Down
2 changes: 1 addition & 1 deletion tests/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def test_local_run(self):
interactive=True,
execution_date=timezone.parse('2018-04-27T08:39:51.298439+00:00')
)
dag = get_dag(args)
dag = get_dag(args.subdir, args.dag_id)
reset(dag.dag_id)

task_command.task_run(args)
Expand Down
8 changes: 3 additions & 5 deletions tests/utils/test_cli_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from datetime import datetime

from airflow import AirflowException, settings
from airflow.bin.cli import CLIFactory
from airflow.utils import cli, cli_action_loggers


Expand Down Expand Up @@ -77,15 +76,14 @@ def test_process_subdir_path_with_placeholder(self):
self.assertEqual(os.path.join(settings.DAGS_FOLDER, 'abc'), cli.process_subdir('DAGS_FOLDER/abc'))

def test_get_dags(self):
parser = CLIFactory.get_parser()
dags = cli.get_dags(parser.parse_args(['tasks', 'clear', 'example_subdag_operator', '--yes']))
dags = cli.get_dags(None, "example_subdag_operator")
self.assertEqual(len(dags), 1)

dags = cli.get_dags(parser.parse_args(['tasks', 'clear', 'subdag', '-dx', '--yes']))
dags = cli.get_dags(None, "subdag", True)
self.assertGreater(len(dags), 1)

with self.assertRaises(AirflowException):
cli.get_dags(parser.parse_args(['tasks', 'clear', 'foobar', '-dx', '--yes']))
cli.get_dags(None, "foobar", True)


@contextmanager
Expand Down