Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion airflow/api/common/experimental/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

from sqlalchemy import or_


def _create_dagruns(dag, execution_dates, state, run_id_template):
"""
Infers from the dates which dag runs need to be created and does so.
Expand Down Expand Up @@ -181,7 +180,39 @@ def set_state(task, execution_date, upstream=False, downstream=False,
if len(sub_dag_ids) > 0:
tis_altered += qry_sub_dag.all()

session.expunge_all()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unrelated. Why add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bolkedebruin This is for appending results from this function. Without expunge sqlalchemy will through UnboundedExceptionError since the session is closed before function returns.

session.close()

return tis_altered

def set_dag_run_state(dag, execution_date, state=State.SUCCESS, commit=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, put this in models.py. It's okay to have the stub here but the logic of setting the dagrun to the state should be in models

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saguziel There is a function called set_dag_run_state in models.py, but the logics are quite different. set_dag_run_state here basically called set_state above which is designed for mark success functionality and considers a lot more edge cases. But definitely the API functions need refactoring in the future to be consistent with other parts of the codebase.

"""
Set the state of a dag run and all task instances associated with the dag
run for a specific execution date.
:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking
:param state: the state to which the DAG need to be set
:param commit: commit DAG and tasks to be altered to the database
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this method seems to imply that commit should always be true. Maybe there should be another method that gets a list of task instances that would be modified without altering them?

:return: list of tasks that have been created and updated
:raises: AssertionError if dag or execution_date is invalid
"""
res = []

if not dag or not execution_date:
return res

# Mark all task instances in the dag run
for task in dag.tasks:
task.dag = dag
new_state = set_state(task=task, execution_date=execution_date,
state=state, commit=commit)
res.extend(new_state)

# Mark the dag run
if commit:
drs = DagRun.find(dag.dag_id, execution_date=execution_date)
for dr in drs:
dr.dag = dag
dr.update_state()

return res
60 changes: 60 additions & 0 deletions airflow/www/templates/airflow/dag.html
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,36 @@ <h4 class="modal-title" id="myModalLabel">
</div>
</div>
</div>
<!-- Modal for dag -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Edit"
"Clear"
and
"Mark Success"

are fine for the button names.

<div class="modal fade" id="dagModal"
tabindex="-1" role="dialog"
aria-labelledby="dagModalLabel" aria-hidden="true">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<h4 class="modal-title" id="dagModalLabel">
<span id='dag_id'></span>
</h4>
</div>
<div class="modal-body">
<button id="btn_edit_dagrun" type="button" class="btn btn-primary">
Edit
</button>
<button id="btn_dagrun_clear" type="button" class="btn btn-primary">
Clear
</button>
<button id="btn_dagrun_success" type="button" class="btn btn-primary">
Mark Success
</button>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-default" data-dismiss="modal">
Close
</button>
</div>
</div>
</div>
</div>
{% endblock %}
{% block tail %}
{{ lib.form_js() }}
Expand All @@ -239,6 +269,7 @@ <h4 class="modal-title" id="myModalLabel">
$('.never_active').removeClass('active');
});

var id = '';
var dag_id = '{{ dag.dag_id }}';
var task_id = '';
var exection_date = '';
Expand All @@ -263,6 +294,14 @@ <h4 class="modal-title" id="myModalLabel">
}
}

function call_modal_dag(dag) {
id = dag && dag.id;
execution_date = dag && dag.execution_date;
$('#dag_id').html(dag_id);
$('#dagModal').modal({});
$("#dagModal").css("margin-top","0px");
}

$("#btn_rendered").click(function(){
url = "{{ url_for('airflow.rendered') }}" +
"?task_id=" + task_id +
Expand Down Expand Up @@ -328,6 +367,15 @@ <h4 class="modal-title" id="myModalLabel">
window.location = url;
});

$("#btn_dagrun_clear").click(function(){
url = "{{ url_for('airflow.dagrun_clear') }}" +
"?task_id=" + encodeURIComponent(task_id) +
"&dag_id=" + encodeURIComponent(dag_id) +
"&execution_date=" + execution_date +
"&origin=" + encodeURIComponent(window.location);
window.location = url;
});

$("#btn_success").click(function(){
url = "{{ url_for('airflow.success') }}" +
"?task_id=" + encodeURIComponent(task_id) +
Expand All @@ -342,6 +390,14 @@ <h4 class="modal-title" id="myModalLabel">
window.location = url;
});

$('#btn_dagrun_success').click(function(){
url = "{{ url_for('airflow.dagrun_success') }}" +
"?dag_id=" + encodeURIComponent(dag_id) +
"&execution_date=" + execution_date +
"&origin=" + encodeURIComponent(window.location);
window.location = url;
});

$("#btn_gantt").click(function(){
url = "{{ url_for('airflow.gantt') }}" +
"?dag_id=" + dag_id +
Expand All @@ -367,5 +423,9 @@ <h4 class="modal-title" id="myModalLabel">
$.post(url);
});

$('#btn_edit_dagrun').click(function(){
window.location = '/admin/dagrun/edit/?id=' + id;
});

</script>
{% endblock %}
11 changes: 5 additions & 6 deletions airflow/www/templates/airflow/tree.html
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{#
{#
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like these were accidental removals

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aoen Hmm maybe my editor (Atom) removed the white space automatically but the Apache license content should be exactly the same.

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -232,9 +232,8 @@
.enter()
.append('rect')
.on("click", function(d){
if(d.task_id === undefined){
window.location = '/admin/dagrun/edit/?id=' + d.id;
}
if(d.task_id === undefined)
call_modal_dag(d);
else if(nodeobj[d.task_id].operator=='SubDagOperator')
call_modal(d.task_id, d.execution_date, true);
else
Expand Down
113 changes: 88 additions & 25 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from airflow import configuration as conf
from airflow import models
from airflow import settings
from airflow.api.common.experimental.mark_tasks import set_dag_run_state
from airflow.exceptions import AirflowException
from airflow.settings import Session
from airflow.models import XCom, DagRun
Expand Down Expand Up @@ -1026,6 +1027,36 @@ def trigger(self):
"it should start any moment now.".format(dag_id))
return redirect(origin)

def _clear_dag_tis(self, dag, start_date, end_date, origin,
recursive=False, confirmed=False):
if confirmed:
count = dag.clear(
start_date=start_date,
end_date=end_date,
include_subdags=recursive)

flash("{0} task instances have been cleared".format(count))
return redirect(origin)

tis = dag.clear(
start_date=start_date,
end_date=end_date,
include_subdags=recursive,
dry_run=True)
if not tis:
flash("No task instances to clear", 'error')
response = redirect(origin)
else:
details = "\n".join([str(t) for t in tis])

response = self.render(
'airflow/confirm.html',
message=("Here's the list of task instances you are about "
"to clear:"),
details=details)

return response

@expose('/clear')
@login_required
@wwwutils.action_logging
Expand All @@ -1052,34 +1083,28 @@ def clear(self):

end_date = execution_date if not future else None
start_date = execution_date if not past else None
if confirmed:
count = dag.clear(
start_date=start_date,
end_date=end_date,
include_subdags=recursive)

flash("{0} task instances have been cleared".format(count))
return redirect(origin)
else:
tis = dag.clear(
start_date=start_date,
end_date=end_date,
include_subdags=recursive,
dry_run=True)
if not tis:
flash("No task instances to clear", 'error')
response = redirect(origin)
else:
details = "\n".join([str(t) for t in tis])
return self._clear_dag_tis(dag, start_date, end_date, origin,
recursive=recursive, confirmed=confirmed)

response = self.render(
'airflow/confirm.html',
message=(
"Here's the list of task instances you are about "
"to clear:"),
details=details,)
@expose('/dagrun_clear')
@login_required
@wwwutils.action_logging
@wwwutils.notify_owner
def dagrun_clear(self):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
origin = request.args.get('origin')
execution_date = request.args.get('execution_date')
confirmed = request.args.get('confirmed') == "true"

return response
dag = dagbag.get_dag(dag_id)
execution_date = dateutil.parser.parse(execution_date)
start_date = execution_date
end_date = execution_date

return self._clear_dag_tis(dag, start_date, end_date, origin,
recursive=True, confirmed=confirmed)

@expose('/blocked')
@login_required
Expand All @@ -1104,6 +1129,44 @@ def blocked(self):
})
return wwwutils.json_response(payload)

@expose('/dagrun_success')
@login_required
@wwwutils.action_logging
@wwwutils.notify_owner
def dagrun_success(self):
dag_id = request.args.get('dag_id')
execution_date = request.args.get('execution_date')
confirmed = request.args.get('confirmed') == 'true'
origin = request.args.get('origin')

if not execution_date:
flash('Invalid execution date', 'error')
return redirect(origin)

execution_date = dateutil.parser.parse(execution_date)
dag = dagbag.get_dag(dag_id)

if not dag:
flash('Cannot find DAG: {}'.format(dag_id), 'error')
return redirect(origin)

new_dag_state = set_dag_run_state(dag, execution_date, state=State.SUCCESS,
commit=confirmed)

if confirmed:
flash('Marked success on {} task instances'.format(len(new_dag_state)))
return redirect(origin)

else:
details = '\n'.join([str(t) for t in new_dag_state])

response = self.render('airflow/confirm.html',
message=("Here's the list of task instances you are "
"about to mark as successful:"),
details=details)

return response

@expose('/success')
@login_required
@wwwutils.action_logging
Expand Down
Loading