Skip to content
Permalink
Browse files
Fix broken dagrun links when many runs start at the same time (#23462)
* Load requested dagrun even when there are many dagruns at (almost) the same time

* Fix code formatting issues
  • Loading branch information
repl-chris committed May 9, 2022
1 parent 1657bd2 commit 828016747ac06f6fb2564c07bb8be92246f42567
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 3 deletions.
@@ -201,9 +201,19 @@ def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
default_dag_run = conf.getint('webserver', 'default_dag_run_display_number')
num_runs = www_request.args.get('num_runs', default=default_dag_run, type=int)

# When base_date has been rounded up because of the DateTimeField widget, we want
# to use the execution_date as the starting point for our query just to ensure a
# link targeting a specific dag run actually loads that dag run. If there are
# more than num_runs dag runs in the "rounded period" then those dagruns would get
# loaded and the actual requested run would be excluded by the limit(). Once
# the user has changed base date to be anything else we want to use that instead.
query_date = base_date
if date_time < base_date and date_time + timedelta(seconds=1) >= base_date:
query_date = date_time

drs = (
session.query(DagRun)
.filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= base_date)
.filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= query_date)
.order_by(desc(DagRun.execution_date))
.limit(num_runs)
.all()
@@ -15,11 +15,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import timedelta
from urllib.parse import quote

import pytest

from airflow.configuration import conf
from airflow.models import DAG
from airflow.utils import timezone
from airflow.utils.session import provide_session
from airflow.utils.state import State

DAG_ID = "dag_for_testing_dt_nr_dr_form"
@@ -30,6 +34,7 @@
("dag_run_for_testing_dt_nr_dr_form_2", timezone.datetime(2018, 2, 2)),
("dag_run_for_testing_dt_nr_dr_form_1", timezone.datetime(2018, 1, 1)),
]
VERY_CLOSE_RUNS_DATE = timezone.datetime(2020, 1, 1, 0, 0, 0)

ENDPOINTS = [
"/graph?dag_id=dag_for_testing_dt_nr_dr_form",
@@ -44,9 +49,10 @@ def dag(app):
return dag


@provide_session
@pytest.fixture(scope="module")
def runs(dag):
return [
def runs(dag, session):
dag_runs = [
dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
@@ -56,6 +62,9 @@ def runs(dag):
)
for run_id, execution_date, in RUNS_DATA
]
yield dag_runs
for dag_run in dag_runs:
session.delete(dag_run)


def _assert_run_is_in_dropdown_not_selected(run, data):
@@ -68,6 +77,10 @@ def _assert_run_is_selected(run, data):
assert f'<option selected value="{exec_date}">{run.run_id}</option>' in data


def _assert_base_date(base_date, data):
assert f'name="base_date" required type="text" value="{base_date.isoformat()}"' in data


def _assert_base_date_and_num_runs(base_date, num, data):
assert f'name="base_date" value="{base_date}"' not in data
assert f'<option selected="" value="{num}">{num}</option>' not in data
@@ -205,3 +218,82 @@ def test_with_base_date_and_num_runs_and_execution_date_within(admin_client, run
_assert_run_is_not_in_dropdown(runs[1], data)
_assert_run_is_in_dropdown_not_selected(runs[2], data)
_assert_run_is_selected(runs[3], data)


@provide_session
@pytest.fixture
def very_close_dagruns(dag, session):
dag_runs = []
for idx, (run_id, _) in enumerate(RUNS_DATA):
execution_date = VERY_CLOSE_RUNS_DATE.replace(microsecond=idx)
dag_runs.append(
dag.create_dagrun(
run_id=run_id + '_close',
execution_date=execution_date,
data_interval=(execution_date, execution_date),
state=State.SUCCESS,
external_trigger=True,
)
)
yield dag_runs
for dag_run in dag_runs:
session.delete(dag_run)
session.commit()


@pytest.mark.parametrize("endpoint", ENDPOINTS)
def test_rounds_base_date_but_queries_with_execution_date(admin_client, very_close_dagruns, endpoint):
exec_date = quote(very_close_dagruns[1].execution_date.isoformat())
response = admin_client.get(
f'{endpoint}&num_runs=2&execution_date={exec_date}',
data={"username": "test", "password": "test"},
follow_redirects=True,
)
assert response.status_code == 200

data = response.data.decode()
_assert_base_date(VERY_CLOSE_RUNS_DATE + timedelta(seconds=1), data)
_assert_run_is_in_dropdown_not_selected(very_close_dagruns[0], data)
_assert_run_is_selected(very_close_dagruns[1], data)
_assert_run_is_not_in_dropdown(very_close_dagruns[2], data)
_assert_run_is_not_in_dropdown(very_close_dagruns[3], data)


@pytest.mark.parametrize("endpoint", ENDPOINTS)
def test_uses_execution_date_on_filter_application_if_base_date_hasnt_changed(
admin_client, very_close_dagruns, endpoint
):
base_date = quote((VERY_CLOSE_RUNS_DATE + timedelta(seconds=1)).isoformat())
exec_date = quote(very_close_dagruns[1].execution_date.isoformat())
response = admin_client.get(
f'{endpoint}&base_date={base_date}&num_runs=2&execution_date={exec_date}',
data={"username": "test", "password": "test"},
follow_redirects=True,
)
assert response.status_code == 200

data = response.data.decode()
_assert_base_date(VERY_CLOSE_RUNS_DATE + timedelta(seconds=1), data)
_assert_run_is_in_dropdown_not_selected(very_close_dagruns[0], data)
_assert_run_is_selected(very_close_dagruns[1], data)
_assert_run_is_not_in_dropdown(very_close_dagruns[2], data)
_assert_run_is_not_in_dropdown(very_close_dagruns[3], data)


@pytest.mark.parametrize("endpoint", ENDPOINTS)
def test_uses_base_date_if_changed_away_from_execution_date(admin_client, very_close_dagruns, endpoint):
base_date = quote((VERY_CLOSE_RUNS_DATE + timedelta(seconds=2)).isoformat())
exec_date = quote(very_close_dagruns[1].execution_date.isoformat())
response = admin_client.get(
f'{endpoint}&base_date={base_date}&num_runs=2&execution_date={exec_date}',
data={"username": "test", "password": "test"},
follow_redirects=True,
)
assert response.status_code == 200

data = response.data.decode()
_assert_base_date(VERY_CLOSE_RUNS_DATE + timedelta(seconds=2), data)
_assert_run_is_not_in_dropdown(very_close_dagruns[0], data)
_assert_run_is_not_in_dropdown(very_close_dagruns[1], data)
_assert_run_is_in_dropdown_not_selected(very_close_dagruns[2], data)
_assert_run_is_selected(very_close_dagruns[3], data)

0 comments on commit 8280167

Please sign in to comment.