From e941ddb559e32f6732f9034687a9703aca3f62d6 Mon Sep 17 00:00:00 2001 From: Jamie Matthews Date: Tue, 13 Oct 2015 09:15:37 +0100 Subject: [PATCH 1/5] Use multiprocessing to run tasks in child processes --- django_dbq/management/commands/worker.py | 32 +++++++++++++++--------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/django_dbq/management/commands/worker.py b/django_dbq/management/commands/worker.py index 326a030..138da73 100644 --- a/django_dbq/management/commands/worker.py +++ b/django_dbq/management/commands/worker.py @@ -6,6 +6,7 @@ from simplesignals.process import WorkerProcessBase from time import sleep import logging +import multiprocessing logger = logging.getLogger(__name__) @@ -14,18 +15,8 @@ DEFAULT_QUEUE_NAME = 'default' -def process_job(queue_name): - """This function grabs the next available job for a given queue, and runs its next task.""" - - with transaction.atomic(): - job = Job.objects.get_ready_or_none(queue_name) - if not job: - return - - logger.info('Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s', job.name, queue_name, job.pk, job.state, job.next_task) - job.state = Job.STATES.PROCESSING - job.save() - +def run_next_task(job): + """Updates a job by running its next task""" try: task_function = import_by_path(job.next_task) task_function(job) @@ -55,6 +46,23 @@ def process_job(queue_name): raise +def process_job(queue_name): + """This function grabs the next available job for a given queue, and runs its next task.""" + + with transaction.atomic(): + job = Job.objects.get_ready_or_none(queue_name) + if not job: + return + + logger.info('Processing job: name="%s" queue="%s" id=%s state=%s next_task=%s', job.name, queue_name, job.pk, job.state, job.next_task) + job.state = Job.STATES.PROCESSING + job.save() + + child = multiprocessing.Process(target=run_next_task, args=(job,)) + child.start() + child.join() + + class Worker(WorkerProcessBase): process_title = "jobworker" From 625b88a3d2e3306de08a9015067c377b80ca2fda Mon Sep 17 00:00:00 2001 From: Pete Smith Date: Tue, 13 Oct 2015 14:40:00 +0100 Subject: [PATCH 2/5] run tests in mysql --- .travis.yml | 2 +- README.md | 4 ++++ testsettings.py | 9 +++++++-- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 77fdfe0..0789ada 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ python: - '2.7' - '3.4' install: -- pip install -r requirements.txt +- pip install -r test-requirements.txt script: python manage.py test deploy: provider: pypi diff --git a/README.md b/README.md index 12dff77..3467e04 100644 --- a/README.md +++ b/README.md @@ -127,3 +127,7 @@ To start a worker: manage.py worker [queue_name] `queue_name` is optional, and will default to `default` + +## Testing + +It may be necessary to supply a DATABASE_PORT environment variable. diff --git a/testsettings.py b/testsettings.py index 7e6c822..868c9e9 100644 --- a/testsettings.py +++ b/testsettings.py @@ -1,7 +1,12 @@ +import os +import pymysql +pymysql.install_as_MySQLdb() + DATABASES = { 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': ':memory:', + 'ENGINE': 'django.db.backends.mysql', + 'NAME': 'django_db_queue', + 'PORT': os.getenv('DATABASE_PORT', 3306), }, } From 6afc4452029a5ee404cda5e2fadf01e707f67f85 Mon Sep 17 00:00:00 2001 From: Pete Smith Date: Tue, 13 Oct 2015 14:41:51 +0100 Subject: [PATCH 3/5] extra test requirements --- test-requirements.txt | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 test-requirements.txt diff --git a/test-requirements.txt b/test-requirements.txt new file mode 100644 index 0000000..31ea0da --- /dev/null +++ b/test-requirements.txt @@ -0,0 +1,2 @@ +-r requirements.txt +pymysql==0.6.7 From 50153cfb18b26502a9889617d6afedf872812dee Mon Sep 17 00:00:00 2001 From: Pete Smith Date: Tue, 13 Oct 2015 14:43:31 +0100 Subject: [PATCH 4/5] this project does not require sudo --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 0789ada..2c473f2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,5 @@ language: python +sudo: false python: - '2.7' - '3.4' From 09cb22c1dd49ae99eb882d2b05300b64faf3fb96 Mon Sep 17 00:00:00 2001 From: Pete Smith Date: Tue, 13 Oct 2015 17:08:39 +0100 Subject: [PATCH 5/5] revise the model ordering to fit the tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit the tests were attempting to set the created date upon creation, which will not work per the Django docs for an auto_now_add field: "Note that the current date is always used; it’s not just a default value that you can override." - https://docs.djangoproject.com/en/1.8/ref/models/fields/#django.db.models.DateField.auto_now_add The tests seemed to be looking for the oldest ready job, so adjusted the model's default ordering to compensate. --- django_dbq/models.py | 3 ++- django_dbq/tests.py | 22 +++++++++++++++------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/django_dbq/models.py b/django_dbq/models.py index 37141a8..9d3999f 100644 --- a/django_dbq/models.py +++ b/django_dbq/models.py @@ -66,7 +66,7 @@ class Job(models.Model): queue_name = models.CharField(max_length=20, default='default', db_index=True) class Meta: - ordering = ['-created'] + ordering = ['created'] objects = JobManager() @@ -100,3 +100,4 @@ def run_creation_hook(self): logger.info("Running creation hook %s for new job", creation_hook_name) creation_hook_function = import_by_path(creation_hook_name) creation_hook_function(self) + diff --git a/django_dbq/tests.py b/django_dbq/tests.py index 757672b..654e6ea 100644 --- a/django_dbq/tests.py +++ b/django_dbq/tests.py @@ -93,21 +93,29 @@ def test_create_job_with_queue(self): def test_get_next_ready_job(self): self.assertTrue(Job.objects.get_ready_or_none('default') is None) - Job.objects.create(name='testjob', state=Job.STATES.READY, created=datetime.now()) - Job.objects.create(name='testjob', state=Job.STATES.PROCESSING, created=datetime.now()) - expected = Job.objects.create(name='testjob', state=Job.STATES.READY, created=datetime.now() - timedelta(minutes=1)) + Job.objects.create(name='testjob', state=Job.STATES.READY) + Job.objects.create(name='testjob', state=Job.STATES.PROCESSING) + expected = Job.objects.create(name='testjob', state=Job.STATES.READY) + expected.created = datetime.now() - timedelta(minutes=1) + expected.save() self.assertEqual(Job.objects.get_ready_or_none('default'), expected) def test_get_next_ready_job_created(self): """ - Created jobs should be picked too + Created jobs should be picked too. + + We create three jobs, and expect the oldest in NEW or READY to be + selected by get_ready_or_none (the model is ordered by 'created' and the + query picks the .first()) """ self.assertTrue(Job.objects.get_ready_or_none('default') is None) - Job.objects.create(name='testjob', state=Job.STATES.NEW, created=datetime.now()) - Job.objects.create(name='testjob', state=Job.STATES.PROCESSING, created=datetime.now()) - expected = Job.objects.create(name='testjob', state=Job.STATES.NEW, created=datetime.now() - timedelta(minutes=1)) + Job.objects.create(name='testjob', state=Job.STATES.NEW) + Job.objects.create(name='testjob', state=Job.STATES.PROCESSING) + expected = Job.objects.create(name='testjob', state=Job.STATES.NEW) + expected.created = datetime.now() - timedelta(minutes=1) + expected.save() self.assertEqual(Job.objects.get_ready_or_none('default'), expected)