Skip to content

Commit

Permalink
[AIRFLOW-6438] Filter DAGs returned by blocked (#7019)
Browse files Browse the repository at this point in the history
(cherry-picked from 5491f4d)
  • Loading branch information
kaxil committed Feb 3, 2020
1 parent caa3567 commit b461e83
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 24 deletions.
2 changes: 1 addition & 1 deletion airflow/www_rbac/templates/airflow/dags.html
Expand Up @@ -493,7 +493,7 @@ <h2>DAGs</h2>

if (encoded_dag_ids.length > 0) {
// dags on page fetch stats
d3.json("{{ url_for('Airflow.blocked') }}", blockedHandler);
d3.json("{{ url_for('Airflow.blocked') }}?dag_ids=" + (encoded_dag_ids.join(',')), blockedHandler);
d3.json("{{ url_for('Airflow.last_dagruns') }}?dag_ids=" + (encoded_dag_ids.join(',')), lastDagRunsHandler);
d3.json("{{ url_for('Airflow.dag_stats') }}?dag_ids=" + (encoded_dag_ids.join(',')), dagStatsHandler);
d3.json("{{ url_for('Airflow.task_stats') }}?dag_ids=" + (encoded_dag_ids.join(',')), taskStatsHandler);
Expand Down
58 changes: 35 additions & 23 deletions airflow/www_rbac/views.py
Expand Up @@ -1093,32 +1093,44 @@ def dagrun_clear(self):
@has_access
@provide_session
def blocked(self, session=None):
DR = models.DagRun
filter_dag_ids = appbuilder.sm.get_accessible_dag_ids()
allowed_dag_ids = appbuilder.sm.get_accessible_dag_ids()

payload = []
if filter_dag_ids:
dags = (
session.query(DR.dag_id, sqla.func.count(DR.id))
.filter(DR.state == State.RUNNING)
.group_by(DR.dag_id)
if 'all_dags' in allowed_dag_ids:
allowed_dag_ids = [dag_id for dag_id, in session.query(models.DagModel.dag_id)]

)
if 'all_dags' not in filter_dag_ids:
dags = dags.filter(DR.dag_id.in_(filter_dag_ids))
dags = dags.all()
selected_dag_ids = {
unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id
}

for dag_id, active_dag_runs in dags:
dag = dagbag.get_dag(dag_id)
max_active_runs = dagbag.dags[dag_id].max_active_runs
if dag:
# TODO: Make max_active_runs a column so we can query for it directly
max_active_runs = dag.max_active_runs
payload.append({
'dag_id': dag_id,
'active_dag_run': active_dag_runs,
'max_active_runs': max_active_runs,
})
if selected_dag_ids:
filter_dag_ids = selected_dag_ids.intersection(allowed_dag_ids)
else:
filter_dag_ids = allowed_dag_ids

if not filter_dag_ids:
return wwwutils.json_response([])

DR = models.DagRun

dags = (
session.query(DR.dag_id, sqla.func.count(DR.id))
.filter(DR.state == State.RUNNING)
.filter(DR.dag_id.in_(filter_dag_ids))
.group_by(DR.dag_id)
)

payload = []
for dag_id, active_dag_runs in dags:
max_active_runs = 0
dag = dagbag.get_dag(dag_id)
if dag:
# TODO: Make max_active_runs a column so we can query for it directly
max_active_runs = dag.max_active_runs
payload.append({
'dag_id': dag_id,
'active_dag_run': active_dag_runs,
'max_active_runs': max_active_runs,
})
return wwwutils.json_response(payload)

def _mark_dagrun_state_as_failed(self, dag_id, execution_date, confirmed, origin):
Expand Down

0 comments on commit b461e83

Please sign in to comment.