Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
language: python
sudo: false
python:
- '2.7'
- '3.4'
install:
- pip install -r requirements.txt
- pip install -r test-requirements.txt
script: python manage.py test
deploy:
provider: pypi
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,7 @@ To start a worker:
manage.py worker [queue_name]

`queue_name` is optional, and will default to `default`

## Testing

It may be necessary to supply a DATABASE_PORT environment variable.
32 changes: 20 additions & 12 deletions django_dbq/management/commands/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from simplesignals.process import WorkerProcessBase
from time import sleep
import logging
import multiprocessing


logger = logging.getLogger(__name__)
Expand All @@ -14,18 +15,8 @@
DEFAULT_QUEUE_NAME = 'default'


def process_job(queue_name):
"""This function grabs the next available job for a given queue, and runs its next task."""

with transaction.atomic():
job = Job.objects.get_ready_or_none(queue_name)
if not job:
return

logger.info('Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s', job.name, queue_name, job.pk, job.state, job.next_task)
job.state = Job.STATES.PROCESSING
job.save()

def run_next_task(job):
"""Updates a job by running its next task"""
try:
task_function = import_by_path(job.next_task)
task_function(job)
Expand Down Expand Up @@ -55,6 +46,23 @@ def process_job(queue_name):
raise


def process_job(queue_name):
"""This function grabs the next available job for a given queue, and runs its next task."""

with transaction.atomic():
job = Job.objects.get_ready_or_none(queue_name)
if not job:
return

logger.info('Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s', job.name, queue_name, job.pk, job.state, job.next_task)
job.state = Job.STATES.PROCESSING
job.save()

child = multiprocessing.Process(target=run_next_task, args=(job,))
child.start()
child.join()


class Worker(WorkerProcessBase):

process_title = "jobworker"
Expand Down
3 changes: 2 additions & 1 deletion django_dbq/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class Job(models.Model):
queue_name = models.CharField(max_length=20, default='default', db_index=True)

class Meta:
ordering = ['-created']
ordering = ['created']

objects = JobManager()

Expand Down Expand Up @@ -100,3 +100,4 @@ def run_creation_hook(self):
logger.info("Running creation hook %s for new job", creation_hook_name)
creation_hook_function = import_by_path(creation_hook_name)
creation_hook_function(self)

22 changes: 15 additions & 7 deletions django_dbq/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,29 @@ def test_create_job_with_queue(self):
def test_get_next_ready_job(self):
self.assertTrue(Job.objects.get_ready_or_none('default') is None)

Job.objects.create(name='testjob', state=Job.STATES.READY, created=datetime.now())
Job.objects.create(name='testjob', state=Job.STATES.PROCESSING, created=datetime.now())
expected = Job.objects.create(name='testjob', state=Job.STATES.READY, created=datetime.now() - timedelta(minutes=1))
Job.objects.create(name='testjob', state=Job.STATES.READY)
Job.objects.create(name='testjob', state=Job.STATES.PROCESSING)
expected = Job.objects.create(name='testjob', state=Job.STATES.READY)
expected.created = datetime.now() - timedelta(minutes=1)
expected.save()

self.assertEqual(Job.objects.get_ready_or_none('default'), expected)

def test_get_next_ready_job_created(self):
"""
Created jobs should be picked too
Created jobs should be picked too.

We create three jobs, and expect the oldest in NEW or READY to be
selected by get_ready_or_none (the model is ordered by 'created' and the
query picks the .first())
"""
self.assertTrue(Job.objects.get_ready_or_none('default') is None)

Job.objects.create(name='testjob', state=Job.STATES.NEW, created=datetime.now())
Job.objects.create(name='testjob', state=Job.STATES.PROCESSING, created=datetime.now())
expected = Job.objects.create(name='testjob', state=Job.STATES.NEW, created=datetime.now() - timedelta(minutes=1))
Job.objects.create(name='testjob', state=Job.STATES.NEW)
Job.objects.create(name='testjob', state=Job.STATES.PROCESSING)
expected = Job.objects.create(name='testjob', state=Job.STATES.NEW)
expected.created = datetime.now() - timedelta(minutes=1)
expected.save()

self.assertEqual(Job.objects.get_ready_or_none('default'), expected)

Expand Down
2 changes: 2 additions & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-r requirements.txt
pymysql==0.6.7
9 changes: 7 additions & 2 deletions testsettings.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import os
import pymysql
pymysql.install_as_MySQLdb()

DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': ':memory:',
'ENGINE': 'django.db.backends.mysql',
'NAME': 'django_db_queue',
'PORT': os.getenv('DATABASE_PORT', 3306),
},
}

Expand Down