Skip to content

Commit

Permalink
Use default view in TriggerDagRunLink (#11778)
Browse files Browse the repository at this point in the history
  • Loading branch information
turbaszek committed Nov 11, 2020
1 parent 7478e18 commit 289c9b5
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
5 changes: 3 additions & 2 deletions airflow/operators/dagrun_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

import datetime
from typing import Dict, Optional, Union
from urllib.parse import quote

from airflow.api.common.experimental.trigger_dag import trigger_dag
from airflow.exceptions import DagNotFound, DagRunAlreadyExists
from airflow.models import BaseOperator, BaseOperatorLink, DagBag, DagModel, DagRun
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults
from airflow.utils.helpers import build_airflow_url_with_query
from airflow.utils.types import DagRunType


Expand All @@ -37,7 +37,8 @@ class TriggerDagRunLink(BaseOperatorLink):
name = 'Triggered DAG'

def get_link(self, operator, dttm):
return f"/graph?dag_id={operator.trigger_dag_id}&root=&execution_date={quote(dttm.isoformat())}"
query = {"dag_id": operator.trigger_dag_id, "execution_date": dttm.isoformat()}
return build_airflow_url_with_query(query)


class TriggerDagRunOperator(BaseOperator):
Expand Down
5 changes: 3 additions & 2 deletions airflow/sensors/external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import datetime
import os
from typing import FrozenSet, Optional, Union
from urllib.parse import quote

from sqlalchemy import func

Expand All @@ -28,6 +27,7 @@
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.helpers import build_airflow_url_with_query
from airflow.utils.session import provide_session
from airflow.utils.state import State

Expand All @@ -41,7 +41,8 @@ class ExternalTaskSensorLink(BaseOperatorLink):
name = 'External DAG'

def get_link(self, operator, dttm):
return f"/graph?dag_id={operator.external_dag_id}&root=&execution_date={quote(dttm.isoformat())}"
query = {"dag_id": operator.external_dag_id, "execution_date": dttm.isoformat()}
return build_airflow_url_with_query(query)


class ExternalTaskSensor(BaseSensorOperator):
Expand Down
12 changes: 12 additions & 0 deletions airflow/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
from functools import reduce
from itertools import filterfalse, tee
from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, TypeVar
from urllib import parse

from jinja2 import Template

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.utils.module_loading import import_string

Expand Down Expand Up @@ -202,3 +204,13 @@ def cross_downstream(*args, **kwargs):
stacklevel=2,
)
return import_string('airflow.models.baseoperator.cross_downstream')(*args, **kwargs)


def build_airflow_url_with_query(query: Dict[str, Any]) -> str:
"""
Build airflow url using base_url and default_view and provided query
For example:
'http://0.0.0.0:8000/base/graph?dag_id=my-task&root=&execution_date=2020-10-27T10%3A59%3A25.615587
"""
view = conf.get('webserver', 'dag_default_view').lower()
return f"/{view}?{parse.urlencode(query)}"
13 changes: 12 additions & 1 deletion tests/utils/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from airflow.models.dag import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils import helpers
from airflow.utils.helpers import merge_dicts
from airflow.utils.helpers import build_airflow_url_with_query, merge_dicts
from tests.test_utils.config import conf_vars


class TestHelpers(unittest.TestCase):
Expand Down Expand Up @@ -136,3 +137,13 @@ def test_merge_dicts_recursive_right_only(self):
dict2 = {'a': 1, 'r': {'c': 3, 'b': 0}}
merged = merge_dicts(dict1, dict2)
self.assertDictEqual(merged, {'a': 1, 'r': {'b': 0, 'c': 3}})

@conf_vars(
{
("webserver", "dag_default_view"): "custom",
}
)
def test_build_airflow_url_with_query(self):
query = {"dag_id": "test_dag", "param": "key/to.encode"}
url = build_airflow_url_with_query(query)
assert url == "/custom?dag_id=test_dag&param=key%2Fto.encode"

0 comments on commit 289c9b5

Please sign in to comment.