diff --git a/t/integration/tasks.py b/t/integration/tasks.py index 1aaeed32378..2b4937a3725 100644 --- a/t/integration/tasks.py +++ b/t/integration/tasks.py @@ -16,15 +16,18 @@ def identity(x): @shared_task -def add(x, y): - """Add two numbers.""" - return x + y +def add(x, y, z=None): + """Add two or three numbers.""" + if z: + return x + y + z + else: + return x + y -@shared_task -def raise_error(*args): - """Deliberately raise an error.""" - raise ValueError("deliberate error") +@shared_task(typing=False) +def add_not_typed(x, y): + """Add two numbers, but don't check arguments""" + return x + y @shared_task(ignore_result=True) @@ -33,6 +36,12 @@ def add_ignore_result(x, y): return x + y +@shared_task +def raise_error(*args): + """Deliberately raise an error.""" + raise ValueError("deliberate error") + + @shared_task def chain_add(x, y): ( @@ -162,6 +171,24 @@ def collect_ids(self, res, i): return res, (self.request.root_id, self.request.parent_id, i) +@shared_task(bind=True, default_retry_delay=1) +def retry(self, return_value=None): + """Task simulating multiple retries. + + When return_value is provided, the task after retries returns + the result. Otherwise it fails. + """ + if return_value: + attempt = getattr(self, 'attempt', 0) + print('attempt', attempt) + if attempt >= 3: + delattr(self, 'attempt') + return return_value + self.attempt = attempt + 1 + + raise self.retry(exc=ExpectedException(), countdown=5) + + @shared_task(bind=True, expires=60.0, max_retries=1) def retry_once(self, *args, expires=60.0, max_retries=1, countdown=0.1): """Task that fails and is retried. Returns the number of retries.""" diff --git a/t/integration/test_tasks.py b/t/integration/test_tasks.py index edfda576f5b..ca71196a283 100644 --- a/t/integration/test_tasks.py +++ b/t/integration/test_tasks.py @@ -1,10 +1,14 @@ +from datetime import datetime, timedelta +from time import sleep, perf_counter + import pytest +import celery from celery import group from .conftest import get_active_redis_channels -from .tasks import (ClassBasedAutoRetryTask, add, add_ignore_result, - print_unicode, retry_once, retry_once_priority, sleeping) +from .tasks import (ClassBasedAutoRetryTask, add, add_ignore_result, add_not_typed, retry, + print_unicode, retry_once, retry_once_priority, sleeping, fail, ExpectedException) TIMEOUT = 10 @@ -28,8 +32,200 @@ def test_class_based_task_retried(self, celery_session_app, assert res.get(timeout=TIMEOUT) == 1 +def _producer(j): + """Single producer helper function""" + results = [] + for i in range(20): + results.append([i + j, add.delay(i, j)]) + for expected, result in results: + value = result.get(timeout=10) + assert value == expected + assert result.status == 'SUCCESS' + assert result.ready() is True + assert result.successful() is True + return j + + class test_tasks: + def test_simple_call(self): + """Tests direct simple call of task""" + assert add(1, 1) == 2 + assert add(1, 1, z=1) == 3 + + @flaky + def test_basic_task(self, manager): + """Tests basic task call""" + results = [] + # Tests calling task only with args + for i in range(10): + results.append([i + i, add.delay(i, i)]) + for expected, result in results: + value = result.get(timeout=10) + assert value == expected + assert result.status == 'SUCCESS' + assert result.ready() is True + assert result.successful() is True + + results = [] + # Tests calling task with args and kwargs + for i in range(10): + results.append([3*i, add.delay(i, i, z=i)]) + for expected, result in results: + value = result.get(timeout=10) + assert value == expected + assert result.status == 'SUCCESS' + assert result.ready() is True + assert result.successful() is True + + @flaky + def test_multiprocess_producer(self, manager): + """Testing multiple processes calling tasks.""" + from multiprocessing import Pool + pool = Pool(20) + ret = pool.map(_producer, range(120)) + assert list(ret) == list(range(120)) + + @flaky + def test_multithread_producer(self, manager): + """Testing multiple threads calling tasks.""" + from multiprocessing.pool import ThreadPool + pool = ThreadPool(20) + ret = pool.map(_producer, range(120)) + assert list(ret) == list(range(120)) + + @flaky + def test_ignore_result(self, manager): + """Testing calling task with ignoring results.""" + result = add.apply_async((1, 2), ignore_result=True) + assert result.get() is None + + @flaky + def test_timeout(self, manager): + """Testing timeout of getting results from tasks.""" + result = sleeping.delay(10) + with pytest.raises(celery.exceptions.TimeoutError): + result.get(timeout=5) + + @flaky + def test_expired(self, manager): + """Testing expiration of task.""" + # Fill the queue with tasks which took > 1 sec to process + for _ in range(4): + sleeping.delay(2) + # Execute task with expiration = 1 sec + result = add.apply_async((1, 1), expires=1) + with pytest.raises(celery.exceptions.TaskRevokedError): + result.get() + assert result.status == 'REVOKED' + assert result.ready() is True + assert result.failed() is False + assert result.successful() is False + + # Fill the queue with tasks which took > 1 sec to process + for _ in range(4): + sleeping.delay(2) + # Execute task with expiration at now + 1 sec + result = add.apply_async((1, 1), expires=datetime.utcnow() + timedelta(seconds=1)) + with pytest.raises(celery.exceptions.TaskRevokedError): + result.get() + assert result.status == 'REVOKED' + assert result.ready() is True + assert result.failed() is False + assert result.successful() is False + + @flaky + def test_eta(self, manager): + """Tests tasks scheduled at some point in future.""" + start = perf_counter() + # Schedule task to be executed in 3 seconds + result = add.apply_async((1, 1), countdown=3) + sleep(1) + assert result.status == 'PENDING' + assert result.ready() is False + assert result.get() == 2 + end = perf_counter() + assert result.status == 'SUCCESS' + assert result.ready() is True + # Difference between calling the task and result must be bigger than 3 secs + assert (end - start) > 3 + + start = perf_counter() + # Schedule task to be executed at time now + 3 seconds + result = add.apply_async((2, 2), eta=datetime.utcnow() + timedelta(seconds=3)) + sleep(1) + assert result.status == 'PENDING' + assert result.ready() is False + assert result.get() == 4 + end = perf_counter() + assert result.status == 'SUCCESS' + assert result.ready() is True + # Difference between calling the task and result must be bigger than 3 secs + assert (end - start) > 3 + + @flaky + def test_fail(self, manager): + """Tests that the failing task propagates back correct exception.""" + result = fail.delay() + with pytest.raises(ExpectedException): + result.get(timeout=5) + assert result.status == 'FAILURE' + assert result.ready() is True + assert result.failed() is True + assert result.successful() is False + + @flaky + def test_wrong_arguments(self, manager): + """Tests that proper exceptions are raised when task is called with wrong arguments.""" + with pytest.raises(TypeError): + add(5) + + with pytest.raises(TypeError): + add(5, 5, wrong_arg=5) + + with pytest.raises(TypeError): + add.delay(5) + + with pytest.raises(TypeError): + add.delay(5, wrong_arg=5) + + # Tasks with typing=False are not checked but execution should fail + result = add_not_typed.delay(5) + with pytest.raises(TypeError): + result.get(timeout=5) + assert result.status == 'FAILURE' + + result = add_not_typed.delay(5, wrong_arg=5) + with pytest.raises(TypeError): + result.get(timeout=5) + assert result.status == 'FAILURE' + + @flaky + def test_retry(self, manager): + """Tests retrying of task.""" + # Tests when max. retries is reached + result = retry.delay() + for _ in range(5): + status = result.status + if status != 'PENDING': + break + sleep(1) + assert status == 'RETRY' + with pytest.raises(ExpectedException): + result.get() + assert result.status == 'FAILURE' + + # Tests when task is retried but after returns correct result + result = retry.delay(return_value='bar') + for _ in range(5): + status = result.status + if status != 'PENDING': + break + sleep(1) + assert status == 'RETRY' + assert result.get() == 'bar' + assert result.status == 'SUCCESS' + @flaky def test_task_accepted(self, manager, sleep=1): r1 = sleeping.delay(sleep)