Skip to content
Permalink
Browse files

Update django-celery integration to new approved layout.

  • Loading branch information...
thisisdhaas committed Jul 10, 2015
1 parent 00443d9 commit 40c82f7c271e8cea1a9e5322d42120c947c98f02
Showing with 37 additions and 6 deletions.
  1. +7 −6 ampcrowd/basecrowd/tasks.py
  2. +5 −0 ampcrowd/crowd_server/__init__.py
  3. +25 −0 ampcrowd/crowd_server/celery.py
@@ -5,18 +5,21 @@
import urllib
import urllib2

from celery.utils.log import get_task_logger
from django.conf import settings
from django.utils import timezone
from djcelery import celery
from celery import shared_task

from basecrowd.interface import CrowdRegistry
from basecrowd.models import TaskGroupRetainerStatus
from basecrowd.models import RetainerPoolStatus
from basecrowd.models import RetainerTask
from quality_control.em import make_em_answer

logger = get_task_logger(__name__)

# Function for gathering results after a task gets enough votes from the crowd
@celery.task
@shared_task
def gather_answer(current_task_id, model_spec):
current_task = model_spec.task_model.objects.get(task_id=current_task_id)
current_task.em_answer = make_em_answer(current_task, model_spec)
@@ -44,9 +47,8 @@ def submit_callback_answer(current_task):

# Recruit for retainer pools by auto-posting tasks as necessary.
# TODO: worry about concurrency if multiple of these run at once.
@celery.task
@shared_task
def post_retainer_tasks():
logger = logging.getLogger(__name__)

# Process each installed crowd.
registry = CrowdRegistry.get_registry()
@@ -187,9 +189,8 @@ def post_retainer_tasks():
except Exception, e:
logger.warning('Could not remove task %s: %s' % (session_task, str(e)))

@celery.task
@shared_task
def retire_workers():
logger = logging.getLogger(__name__)

# Process each installed crowd.
registry = CrowdRegistry.get_registry()
@@ -0,0 +1,5 @@
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
@@ -0,0 +1,25 @@
from __future__ import absolute_import

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'crowd_server.settings')

from django.conf import settings

app = Celery('crowd_server')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.update(
CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend',
)


@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

0 comments on commit 40c82f7

Please sign in to comment.
You can’t perform that action at this time.