Skip to content
Permalink
Browse files

Terminate in-progress workers on tasks after they've been completed.

  • Loading branch information...
thisisdhaas committed Jul 2, 2015
1 parent 1c92029 commit dfe6aec5f1aefb837addfd39a1d3b2ecfebca89a
@@ -216,6 +216,9 @@ class AbstractCrowdWorkerAssignment(models.Model):
# The time the assignment was completed
finished_at = models.DateTimeField(null=True)

# Was this assignment terminated before the worker submitted work?
terminated = models.BooleanField(default=False)

# The time the assignment took, in seconds
@property
def length(self):
@@ -15,7 +15,7 @@ def make_em_answer(task_obj, model_spec):

# Build up initial variables for em
responses = model_spec.assignment_model.objects.filter(
task__task_type=task_obj.task_type)
task__task_type=task_obj.task_type, terminated=False)
for response in responses:

answer_list = json.loads(response.content)
@@ -41,7 +41,8 @@ def make_em_answer(task_obj, model_spec):
label_set).ExpectationMaximization(iterations)

# Gather answer
point_ids = json.loads(task_obj.assignments.all()[0].content).keys()
point_ids = json.loads(task_obj.assignments
.filter(terminated=False)[0].content).keys()
answer_label = {}

for point_id in point_ids:
@@ -2,7 +2,7 @@
def make_mv_answer(task_obj):
answers = []

responses = task_obj.assignments.all()
responses = task_obj.assignments.filter(terminated=False)
for response in responses:
current_content = response.content.split(",")
answers.append(current_content)
@@ -13,15 +13,15 @@ var Retainer = {
WORK_ENDPOINT = work_url;
Retainer.requestData = prepare_submit_data();
Retainer.requestData.ping_type = 'starting';
Retainer.ping(Retainer.requestData);
Retainer.checkForWork(Retainer.requestData);
Retainer.ping();
Retainer.checkForWork();
Retainer.finished = false;
Retainer.alertNeeded = true;
},

ping: function(requestData){
ping: function(){
$.post(PING_ENDPOINT,
requestData,
Retainer.requestData,
function(data, status){
console.log('pong', data);
$('#waitTime').text(data.wait_time.toFixed(2));
@@ -33,16 +33,21 @@ var Retainer = {
if (data.pool_status == 'finished') {
Retainer.finished = true;
}
if (data.terminate_work) {
alert("Your work on this task is no longer needed. "
+ "Please press 'ok' to check for more tasks.");
Retainer.switchTasks();
}
})
.always(function(){
if (Retainer.requestData.ping_type == 'starting') {
Retainer.requestData.ping_type = 'waiting';
}
setTimeout(Retainer.ping, PING_INTERVAL, requestData);
setTimeout(Retainer.ping, PING_INTERVAL);
});
},

checkForWork: function(requestData){
checkForWork: function(){
$('#waitingDiv').show();
$('#taskFrame').hide();
if (Retainer.finished) {
@@ -53,30 +58,40 @@ var Retainer = {
return;
}
$.get(WORK_ENDPOINT,
requestData,
Retainer.requestData,
function(data, status){
if(data.start === true){
Retainer.requestData.ping_type = 'working';
Retainer.hasWork(data, Retainer.alertNeeded);
}
else {
Retainer.requestData.active_task = '';
}
console.log(data);
},
'json'
)
.always(function(){
Retainer.alertNeeded = true;
if (Retainer.requestData.ping_type == 'waiting' || Retainer.requestData.ping_type == 'starting') {
setTimeout(Retainer.checkForWork, WORK_INTERVAL, requestData);
setTimeout(Retainer.checkForWork, WORK_INTERVAL);

}
});
},

switchTasks: function() {
Retainer.requestData.ping_type = 'starting';
Retainer.alertNeeded = false;
Retainer.checkForWork();
},

hasWork: function(data, show_alert){
console.log('initialize task here');
if (show_alert)
alert('New work is available! Please start working now.');

Retainer.requestData.active_task = data.task_id
var task_frame = $('#taskFrame');
task_frame.attr('src', data.task_url);
task_frame.load(function() {
@@ -85,11 +100,7 @@ var Retainer = {
$('#waitingDiv').hide();

// sneakily override the submit behavior of the iframe
task_frame[0].contentWindow.submit_to_frontend = function() {
Retainer.requestData.ping_type = 'starting';
Retainer.alertNeeded = false;
Retainer.checkForWork(Retainer.requestData);
}
task_frame[0].contentWindow.submit_to_frontend = Retainer.switchTasks
});

}
@@ -342,6 +342,20 @@ def post_response(request, crowd_name):
current_task.save()
gather_answer.delay(current_task.task_id, model_spec)

# Check if the whole group is done
group = current_task.group
if not (group.tasks
.exclude(task_type='retainer')
.filter(is_complete=False).exists()):

# terminate in progress retainer tasks
(model_spec.assignment_model.objects
.exclude(task__task_type='retainer')
.filter(task__group=group,
finished_at__isnull=True)
.update(finished_at=timezone.now(),
terminated=True))

return HttpResponse('ok') # AJAX call succeded.


@@ -362,6 +376,7 @@ def ping(request, crowd_name):
ValueError("ping context missing required keys."))
task = model_spec.task_model.objects.get(task_id=context['task_id'])
worker = model_spec.worker_model.objects.get(worker_id=context['worker_id'])
terminate_work = False

# update waiting time
ping_type = request.POST['ping_type']
@@ -376,9 +391,16 @@ def ping(request, crowd_name):
time_since_last_ping = (now - last_ping).total_seconds()
task.time_waited_session += time_since_last_ping

# Task is working, do nothing.
# Task is working, verify that the assignment hasn't been terminated.
elif ping_type == 'working':
pass
active_task_id = request.POST.get('active_task', None)
if not active_task_id:
raise ValueError('Retainer must ping with active task id!')

active_assignment = model_spec.assignment_model.objects.get(
worker=worker, task_id=active_task_id)
if active_assignment.finished_at is not None:
terminate_work = True

task.last_ping = now
task.last_ping_type = ping_type
@@ -397,6 +419,7 @@ def ping(request, crowd_name):
'waiting_rate': retainer_config['waiting_rate'],
'per_task_rate': retainer_config['task_rate'],
'min_required_tasks': retainer_config['min_tasks_per_worker'],
'terminate_work': terminate_work,
}
return HttpResponse(json.dumps(data), content_type='application/json')

@@ -491,7 +514,8 @@ def assign_retainer_task(request, crowd_name):
response_data = json.dumps({
'start': True,
'task_url': reverse('basecrowd:get_retainer_assignment',
kwargs=url_args)
kwargs=url_args),
'task_id': assignment_task.task_id,
})
return HttpResponse(response_data, content_type='application/json')
else:

0 comments on commit dfe6aec

Please sign in to comment.
You can’t perform that action at this time.