Skip to content

Commit

Permalink
Base date for fetching dag grid view must include selected run_id
Browse files Browse the repository at this point in the history
Previously, if user set dag_run_id parameter in the url, that
refers to a old run, which doesn't fit in the most recent 25 runs,
then the requested run will not be selected.

This change fixes this by setting the base_date to a time where
the run_id is known to exist if dag_run_id is provided as
an explicit query parameter.

closes: #34723
  • Loading branch information
hterik committed Feb 20, 2024
1 parent 011cd3d commit d7c403d
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 6 deletions.
4 changes: 4 additions & 0 deletions airflow/www/static/js/api/useGridData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import useFilters, {
} from "src/dag/useFilters";
import type { Task, DagRun, RunOrdering } from "src/types";
import { camelCase } from "lodash";
import useSelection, { RUN_ID } from "src/dag/useSelection";

const DAG_ID_PARAM = "dag_id";

Expand Down Expand Up @@ -80,6 +81,7 @@ const useGridData = () => {
filterUpstream,
},
} = useFilters();
const { firstRunIdSetByUrl } = useSelection();

const query = useQuery(
[
Expand All @@ -91,6 +93,7 @@ const useGridData = () => {
root,
filterUpstream,
filterDownstream,
firstRunIdSetByUrl,
],
async () => {
const params = {
Expand All @@ -102,6 +105,7 @@ const useGridData = () => {
[NUM_RUNS_PARAM]: numRuns,
[RUN_TYPE_PARAM]: runType,
[RUN_STATE_PARAM]: runState,
[RUN_ID]: firstRunIdSetByUrl || "",
};
const response = await axios.get<AxiosResponse, GridData>(gridDataUrl, {
params,
Expand Down
14 changes: 13 additions & 1 deletion airflow/www/static/js/dag/Main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import FilterBar from "./nav/FilterBar";
import LegendRow from "./nav/LegendRow";
import useToggleGroups from "./useToggleGroups";
import keyboardShortcutIdentifier from "./keyboardShortcutIdentifier";
import { DagRunSelectionContext, RUN_ID } from "./useSelection";

const detailsPanelKey = "hideDetailsPanel";
const minPanelWidth = 300;
Expand All @@ -61,7 +62,7 @@ const headerHeight =
10
) || 0;

const Main = () => {
const MainInContext = () => {
const {
data: { groups },
isLoading,
Expand Down Expand Up @@ -256,4 +257,15 @@ const Main = () => {
);
};

const Main = () => {
const [searchParams] = useSearchParams();
const [firstRunIdSetByUrl] = useState(searchParams.get(RUN_ID));

return (
<DagRunSelectionContext.Provider value={firstRunIdSetByUrl}>
<MainInContext />
</DagRunSelectionContext.Provider>
);
};

export default Main;
9 changes: 8 additions & 1 deletion airflow/www/static/js/dag/useSelection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
* under the License.
*/

import { createContext, useContext } from "react";
import { useSearchParams } from "react-router-dom";

const RUN_ID = "dag_run_id";
export const RUN_ID = "dag_run_id";
const TASK_ID = "task_id";
const MAP_INDEX = "map_index";

Expand All @@ -29,8 +30,13 @@ export interface SelectionProps {
mapIndex?: number;
}

// The first run_id need to be treated differently from the selection, because it is used in backend to
// calculate the base_date, which we don't want jumping around when user is clicking in the grid.
export const DagRunSelectionContext = createContext<string | null>(null);

const useSelection = () => {
const [searchParams, setSearchParams] = useSearchParams();
const firstRunIdSetByUrl = useContext(DagRunSelectionContext);

// Clear selection, but keep other search params
const clearSelection = () => {
Expand Down Expand Up @@ -70,6 +76,7 @@ const useSelection = () => {
},
clearSelection,
onSelect,
firstRunIdSetByUrl,
};
};

Expand Down
28 changes: 24 additions & 4 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3429,6 +3429,7 @@ def task_instances(self):
def grid_data(self):
"""Return grid data."""
dag_id = request.args.get("dag_id")
run_id = request.args.get("dag_run_id")
dag = get_airflow_app().dag_bag.get_dag(dag_id)

if not dag:
Expand All @@ -3446,25 +3447,44 @@ def grid_data(self):
if num_runs is None:
num_runs = conf.getint("webserver", "default_dag_run_display_number")

try:
base_date = timezone.parse(request.args["base_date"], strict=True)
except (KeyError, ValueError):
base_date = dag.get_latest_execution_date() or timezone.utcnow()
dagrun = None
if run_id:
with create_session() as session:
dagrun = dag.get_dagrun(run_id=run_id, session=session)
if not dagrun:
return {"error": f"can't find dag_run_id={run_id}"}, 404
base_date = dagrun.execution_date
else:
try:
base_date = timezone.parse(request.args["base_date"], strict=True)
except (KeyError, ValueError):
base_date = dag.get_latest_execution_date() or timezone.utcnow()

with create_session() as session:
query = select(DagRun).where(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= base_date)

run_types = request.args.getlist("run_type")
if run_types:
if run_id:
return {"error": "Can not provide filters when dag_run_id filter is selected."}, 400
query = query.where(DagRun.run_type.in_(run_types))

run_states = request.args.getlist("run_state")
if run_states:
if run_id:
return {"error": "Can not provide filters when dag_run_id filter is selected."}, 400
query = query.where(DagRun.state.in_(run_states))

dag_runs = wwwutils.sorted_dag_runs(
query, ordering=dag.timetable.run_ordering, limit=num_runs, session=session
)
if dagrun:
found_requested_run_id = any(True for d in dag_runs if d.run_id == run_id)
if not found_requested_run_id:
return {
"error": f"Dag with dag_run_id={run_id} found, but not in selected time range or filters."
}, 404

encoded_runs = [wwwutils.encode_dag_run(dr, json_encoder=utils_json.WebEncoder) for dr in dag_runs]
data = {
"groups": dag_to_grid(dag, dag_runs, session),
Expand Down

0 comments on commit d7c403d

Please sign in to comment.