Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ def prioritize_queued(self, session, executor, dagbag):
else:
d[ti.pool].append(ti)

overloaded_dags = set()
for pool, tis in list(d.items()):
if not pool:
# Arbitrary:
Expand Down Expand Up @@ -538,11 +539,13 @@ def prioritize_queued(self, session, executor, dagbag):
if self.do_pickle and self.executor.__class__ not in (
executors.LocalExecutor,
executors.SequentialExecutor):
logging.info("Pickling DAG {}".format(dag))
pickle_id = dag.pickle(session).id

if (
not task.dag.concurrency_reached and
ti.are_dependencies_met()):
if dag.dag_id in overloaded_dags or dag.concurrency_reached:
overloaded_dags.append(dag.dag_id)
continue
if ti.are_dependencies_met():
executor.queue_task_instance(
ti, force=True, pickle_id=pickle_id)
open_slots -= 1
Expand Down
14 changes: 12 additions & 2 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ <h2>DAGs</h2>
{% endif %}
</td>
<td>
<a class="label label-default" href="/admin/dagrun/?flt2_dag_id_equals={{ dag.dag_id }}">
<a class="label label-default schedule {{ dag.dag_id }}" href="/admin/dagrun/?flt2_dag_id_equals={{ dag.dag_id }}">
{{ dag.schedule_interval }}</a></td>
<td>{{ dag.owner if dag else orm_dags[dag_id].owners }}</td>
<td style="padding:0px; width:180px; height:10px;">
<td style="padding:0px; width:200px; height:10px;">
<svg height="10" width="10" id='{{ dag.safe_dag_id }}' style="display: block;"></svg>
</td>
<td class="text-center" style="width:160px;"><nobr>
Expand Down Expand Up @@ -129,6 +129,16 @@ <h2>DAGs</h2>
circle_margin = 4;
stroke_width = 2;
stroke_width_hover = 6;
d3.json("{{ url_for('airflow.blocked') }}", function(error, json) {
$.each(json, function() {
if(this.active_dag_run >= this.max_active_runs) {
$('.label.schedule.' + this.dag_id)
.css('background-color', 'red')
.attr("title", "Number of active DAG runs (" + this.active_dag_run + ") has been reached")
.tooltip();
}
});
});
d3.json("{{ url_for('airflow.dag_stats') }}", function(error, json) {
for(var dag_id in json) {
states = json[dag_id];
Expand Down
17 changes: 13 additions & 4 deletions airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
from builtins import object
from cgi import escape
from io import BytesIO as IO
import gzip
import functools
import gzip
import json

from flask import after_this_request, request
from flask import after_this_request, request, Response
from flask.ext.login import current_user
import wtforms
from wtforms.compat import text_type

from airflow import configuration
from airflow import login, models, settings
from airflow import configuration, models, settings, utils
AUTHENTICATE = configuration.getboolean('webserver', 'AUTHENTICATE')


Expand Down Expand Up @@ -94,6 +94,15 @@ def wrapper(*args, **kwargs):
return wrapper


def json_response(obj):
"""
returns a json response from a json serializable python object
"""
return Response(
response=json.dumps(
obj, indent=4, cls=utils.AirflowJsonEncoder),
status=200,
mimetype="application/json")

def gzipped(f):
'''
Expand Down
46 changes: 28 additions & 18 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,7 @@ def chart_data(self):
if chart_type == 'datatable':
payload['data'] = data
payload['state'] = 'SUCCESS'
return Response(
response=json.dumps(
payload, indent=4, cls=utils.AirflowJsonEncoder),
status=200,
mimetype="application/json")
return wwwutils.json_response(payload)

elif chart_type == 'para':
df.rename(columns={
Expand Down Expand Up @@ -516,12 +512,7 @@ def chart_data(self):
payload['hc'] = hc
payload['data'] = data
payload['request_dict'] = request_dict

return Response(
response=json.dumps(
payload, indent=4, cls=utils.AirflowJsonEncoder),
status=200,
mimetype="application/json")
return wwwutils.json_response(payload)

@expose('/chart')
@data_profiling_required
Expand Down Expand Up @@ -600,9 +591,8 @@ def dag_stats(self):
'color': State.color(state)
}
payload[dag.safe_dag_id].append(d)
return Response(
response=json.dumps(payload, indent=4),
status=200, mimetype="application/json")
return wwwutils.json_response(payload)


@expose('/code')
@login_required
Expand Down Expand Up @@ -666,10 +656,7 @@ def headers(self):
d['is_authenticated'] = current_user.is_authenticated()
if hasattr(current_user, 'username'):
d['username'] = current_user.username

return Response(
response=json.dumps(d, indent=4),
status=200, mimetype="application/json")
return wwwutils.json_response(d)

@expose('/login', methods=['GET', 'POST'])
def login(self):
Expand Down Expand Up @@ -924,6 +911,29 @@ def clear(self):

return response

@expose('/blocked')
@login_required
def blocked(self):
session = settings.Session()
DR = models.DagRun
dags = (
session.query(DR.dag_id, sqla.func.count(DR.id))
.filter(DR.state == State.RUNNING)
.group_by(DR.dag_id)
.all()
)
payload = []
for dag_id, active_dag_runs in dags:
max_active_runs = 0
if dag_id in dagbag.dags:
max_active_runs = dagbag.dags[dag_id].max_active_runs
payload.append({
'dag_id': dag_id,
'active_dag_run': active_dag_runs,
'max_active_runs': max_active_runs,
})
return wwwutils.json_response(payload)

@expose('/success')
@login_required
@wwwutils.action_logging
Expand Down
2 changes: 2 additions & 0 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ def test_dag_views(self):
response = self.app.get(
'/admin/airflow/code?dag_id=example_bash_operator')
assert "example_bash_operator" in response.data.decode('utf-8')
response = self.app.get(
'/admin/airflow/blocked')
response = self.app.get(
'/admin/configurationview/')
assert "Airflow Configuration" in response.data.decode('utf-8')
Expand Down