Skip to content

Commit

Permalink
Leaving the pool triggers worker payment (payment not yet implemented).
Browse files Browse the repository at this point in the history
  • Loading branch information
thisisdhaas committed Jun 22, 2015
1 parent 7f77fe3 commit 90ad1d2
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 11 deletions.
2 changes: 1 addition & 1 deletion ampcrowd/amt/connection.py
Expand Up @@ -84,7 +84,7 @@ def create_hit(hit_options):
reward=Price(amount=options['reward']),
duration=timedelta(minutes=options['duration']),
max_assignments=options['num_responses'],
approval_delay=0)
approval_delay=3600)
except MTurkRequestError:
logger.debug(traceback.format_exc())
raise AMTException(
Expand Down
29 changes: 25 additions & 4 deletions ampcrowd/basecrowd/models.py
Expand Up @@ -98,8 +98,17 @@ class AbstractCrowdTask(models.Model):
# Is this task a retainer task?
is_retainer = models.BooleanField(default=False)

# Has the task been retired?
is_retired = models.BooleanField(default=False)

# The last time someone working on this task pinged the server from a
# retainer pool
last_ping = models.DateTimeField(null=True)

def __unicode__(self):
return self.task_type + " : " + self.data
task_str = "Task %s (type %s): %s" % (self.task_id, self.task_type,
self.data)
return task_str + " [retainer]" if self.is_retainer else task_str

class Meta:
abstract = True
Expand All @@ -121,7 +130,7 @@ class AbstractCrowdWorker(models.Model):
last_ping = models.DateTimeField(null=True)

def __unicode__(self):
return self.worker_id
return "Worker %s" % self.worker_id

class Meta:
abstract = True
Expand Down Expand Up @@ -149,7 +158,7 @@ class AbstractCrowdWorkerResponse(models.Model):
assignment_id = models.CharField(max_length=200)

def __unicode__(self):
return self.task.task_id + " " + self.worker.worker_id
return "Response: %s to %s" % (self.worker, self.task)

class Meta:
abstract = True
Expand Down Expand Up @@ -207,7 +216,19 @@ def __unicode__(self):
def active_workers(self):
time_cutoff = timezone.now() - timedelta(
seconds=settings.PING_TIMEOUT_SECONDS)
return self.workers.filter(last_ping__gte=time_cutoff)
return self.workers.filter(tasks__task_type='retainer',
tasks__group__retainer_pool=self,
tasks__last_ping__gte=time_cutoff)

def expired_tasks(self, task_model):
time_cutoff = timezone.now() - timedelta(
seconds=settings.RETAINER_WORKER_TIMEOUT_SECONDS)
return task_model.objects.filter(task_type='retainer',
group__retainer_pool=self,
last_ping__lt=time_cutoff)
def new_expired_tasks(self, task_model):
# expired workers with a retainer task that hasn't been marked retired.
return self.expired_tasks(task_model).filter(is_retired=False)

class Meta:
abstract = True
Expand Down
34 changes: 33 additions & 1 deletion ampcrowd/basecrowd/tasks.py
Expand Up @@ -143,15 +143,47 @@ def post_retainer_tasks():
interface.delete_tasks([retainer_task.task,])
retainer_task.task.delete()
logger.info("Deleted old task %s" % retainer_task.task)

else:
logger.info("Not deleting %s, it has a worker."
% retainer_task.task)

# delete the retainer task
# Mark the recruitment task inactive
retainer_task.active = False
retainer_task.save()
logger.info('Deleted old retainer task %s' % retainer_task)

except Exception, e:
logger.warning('Could not remove task %s: %s' % (
retainer_task.task, str(e)))

@celery.task
def retire_workers():
logger = logging.getLogger(__name__)

# Process each installed crowd.
registry = CrowdRegistry.get_registry()
for crowd_name, (crowd_interface, crowd_model_spec) in registry.iteritems():

# Skip crowds that don't support retainer pools.
if not crowd_model_spec.retainer_pool_model:
logger.info("Crowd %s doesn't support retainer pools, not expiring "
"workers" % crowd_name)
continue

# Find pools that need more workers.
logger.info("Crowd %s supports retainer pools, looking for workers to "
"retire." % crowd_name)
for pool in crowd_model_spec.retainer_pool_model.objects.all():
for expired_task in pool.new_expired_tasks(crowd_model_spec.task_model):
logger.info("%s has expired. Cleaning up and paying the "
"worker." % expired_task)

# mark the retainer task as expired
expired_task.is_retired = True
expired_task.save()

# TODO: pay the worker
assert expired_task.workers.count() == 1
worker = expired_task.workers.all()[0]
logger.info("Would pay worker %s here." % worker)
1 change: 1 addition & 0 deletions ampcrowd/basecrowd/templates/basecrowd/base.html
Expand Up @@ -191,6 +191,7 @@
{% block form_tag %}
<form method="post" id="submitForm" action="">
{% endblock form_tag %}
<input type="hidden" value="{{ assignment_id }}" name="assignmentId" />
<section class="container task-container" id="DataCollection">
<div class="row col-xs-12 col-md-12">

Expand Down
19 changes: 15 additions & 4 deletions ampcrowd/basecrowd/templates/basecrowd/retainer.html
Expand Up @@ -28,10 +28,17 @@
var is_accepted = {{ is_accepted|yesno:"true,false" }};
if (is_accepted)
{
// Set up retainer trackin
// Set up retainer tracking
PING_ENDPOINT = "/crowds/{{ crowd_name }}/retainer/ping/";
WORK_ENDPOINT = "/crowds/{{ crowd_name }}/assignments/retainer/";
Retainer.init(PING_ENDPOINT, WORK_ENDPOINT);

// Set up exit button
$("#exitButton").click(function(event) {
event.preventDefault();
var data = prepare_submit_data();
submit_to_frontend(data);
});
}
}
{% endblock handle_is_accepted_func %}
Expand Down Expand Up @@ -79,9 +86,13 @@ <h1>Waiting for next task...</h1>
<!-- Button to stop working -->
{% if is_accepted %}
<div class="col-xs-12 col-md-12">
<p class="text-center">
<button id="exitButton" class="btn btn-primary">Finish working</button>
</p>
<form id="submitForm" action="">
<input type="hidden" value="{{ assignment_id }}" name="assignmentId" />
<input type="hidden" value="0" name="dummyData" />
<p class="text-center">
<input type="submit" id="exitButton" class="btn btn-primary" value="Finish working" />
</p>
</form>
</div>
{% endif %}
</div>
Expand Down
5 changes: 4 additions & 1 deletion ampcrowd/basecrowd/views.py
Expand Up @@ -343,7 +343,10 @@ def ping(request, crowd_name):
# ping_type = request.POST['ping_type']

# TODO: make this not broken when a worker is in multiple pools
worker.last_ping = timezone.now()
now = timezone.now()
task.last_ping = now
task.save()
worker.last_ping = now
worker.save()
logger.info('ping from worker %s, task %s' % (worker, task))

Expand Down
11 changes: 11 additions & 0 deletions ampcrowd/crowd_server/settings.py
Expand Up @@ -80,6 +80,12 @@
# How frequently to re-run the retainer task posting script.
RETAINER_POST_TASKS_INTERVAL = 20 # seconds

# How long until we decide that a worker has abandoned the pool.
RETAINER_WORKER_TIMEOUT_SECONDS = 10

# How frequently to re-run the worker retirement script.
RETAINER_RETIRE_WORKERS_INTERVAL = 20 # seconds

# Settings for AMQP /Celery
###########################

Expand All @@ -91,6 +97,11 @@
'schedule': timedelta(seconds=RETAINER_POST_TASKS_INTERVAL),
'args': (),
},
'retire-workers': {
'task': 'basecrowd.tasks.retire_workers',
'schedule': timedelta(seconds=RETAINER_RETIRE_WORKERS_INTERVAL),
'args': (),
}
}

# Set broker using hosts entry for 'rabbitmq'. This is set for Docker but can be set to alias
Expand Down

0 comments on commit 90ad1d2

Please sign in to comment.