Skip to content


Added integration tests for calling a task (#6523)
Browse files Browse the repository at this point in the history
  • Loading branch information
matusvalo committed Dec 6, 2020
1 parent a4d942b commit a192f9c
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 9 deletions.
41 changes: 34 additions & 7 deletions t/integration/
Expand Up @@ -16,15 +16,18 @@ def identity(x):

def add(x, y):
"""Add two numbers."""
return x + y
def add(x, y, z=None):
"""Add two or three numbers."""
if z:
return x + y + z
return x + y

def raise_error(*args):
"""Deliberately raise an error."""
raise ValueError("deliberate error")
def add_not_typed(x, y):
"""Add two numbers, but don't check arguments"""
return x + y

Expand All @@ -33,6 +36,12 @@ def add_ignore_result(x, y):
return x + y

def raise_error(*args):
"""Deliberately raise an error."""
raise ValueError("deliberate error")

def chain_add(x, y):
Expand Down Expand Up @@ -162,6 +171,24 @@ def collect_ids(self, res, i):
return res, (self.request.root_id, self.request.parent_id, i)

@shared_task(bind=True, default_retry_delay=1)
def retry(self, return_value=None):
"""Task simulating multiple retries.
When return_value is provided, the task after retries returns
the result. Otherwise it fails.
if return_value:
attempt = getattr(self, 'attempt', 0)
print('attempt', attempt)
if attempt >= 3:
delattr(self, 'attempt')
return return_value
self.attempt = attempt + 1

raise self.retry(exc=ExpectedException(), countdown=5)

@shared_task(bind=True, expires=60.0, max_retries=1)
def retry_once(self, *args, expires=60.0, max_retries=1, countdown=0.1):
"""Task that fails and is retried. Returns the number of retries."""
Expand Down
200 changes: 198 additions & 2 deletions t/integration/
@@ -1,10 +1,14 @@
from datetime import datetime, timedelta
from time import sleep, perf_counter

import pytest

import celery
from celery import group

from .conftest import get_active_redis_channels
from .tasks import (ClassBasedAutoRetryTask, add, add_ignore_result,
print_unicode, retry_once, retry_once_priority, sleeping)
from .tasks import (ClassBasedAutoRetryTask, add, add_ignore_result, add_not_typed, retry,
print_unicode, retry_once, retry_once_priority, sleeping, fail, ExpectedException)


Expand All @@ -28,8 +32,200 @@ def test_class_based_task_retried(self, celery_session_app,
assert res.get(timeout=TIMEOUT) == 1

def _producer(j):
"""Single producer helper function"""
results = []
for i in range(20):
results.append([i + j, add.delay(i, j)])
for expected, result in results:
value = result.get(timeout=10)
assert value == expected
assert result.status == 'SUCCESS'
assert result.ready() is True
assert result.successful() is True
return j

class test_tasks:

def test_simple_call(self):
"""Tests direct simple call of task"""
assert add(1, 1) == 2
assert add(1, 1, z=1) == 3

def test_basic_task(self, manager):
"""Tests basic task call"""
results = []
# Tests calling task only with args
for i in range(10):
results.append([i + i, add.delay(i, i)])
for expected, result in results:
value = result.get(timeout=10)
assert value == expected
assert result.status == 'SUCCESS'
assert result.ready() is True
assert result.successful() is True

results = []
# Tests calling task with args and kwargs
for i in range(10):
results.append([3*i, add.delay(i, i, z=i)])
for expected, result in results:
value = result.get(timeout=10)
assert value == expected
assert result.status == 'SUCCESS'
assert result.ready() is True
assert result.successful() is True

def test_multiprocess_producer(self, manager):
"""Testing multiple processes calling tasks."""
from multiprocessing import Pool
pool = Pool(20)
ret =, range(120))
assert list(ret) == list(range(120))

def test_multithread_producer(self, manager):
"""Testing multiple threads calling tasks."""
from multiprocessing.pool import ThreadPool
pool = ThreadPool(20)
ret =, range(120))
assert list(ret) == list(range(120))

def test_ignore_result(self, manager):
"""Testing calling task with ignoring results."""
result = add.apply_async((1, 2), ignore_result=True)
assert result.get() is None

def test_timeout(self, manager):
"""Testing timeout of getting results from tasks."""
result = sleeping.delay(10)
with pytest.raises(celery.exceptions.TimeoutError):

def test_expired(self, manager):
"""Testing expiration of task."""
# Fill the queue with tasks which took > 1 sec to process
for _ in range(4):
# Execute task with expiration = 1 sec
result = add.apply_async((1, 1), expires=1)
with pytest.raises(celery.exceptions.TaskRevokedError):
assert result.status == 'REVOKED'
assert result.ready() is True
assert result.failed() is False
assert result.successful() is False

# Fill the queue with tasks which took > 1 sec to process
for _ in range(4):
# Execute task with expiration at now + 1 sec
result = add.apply_async((1, 1), expires=datetime.utcnow() + timedelta(seconds=1))
with pytest.raises(celery.exceptions.TaskRevokedError):
assert result.status == 'REVOKED'
assert result.ready() is True
assert result.failed() is False
assert result.successful() is False

def test_eta(self, manager):
"""Tests tasks scheduled at some point in future."""
start = perf_counter()
# Schedule task to be executed in 3 seconds
result = add.apply_async((1, 1), countdown=3)
assert result.status == 'PENDING'
assert result.ready() is False
assert result.get() == 2
end = perf_counter()
assert result.status == 'SUCCESS'
assert result.ready() is True
# Difference between calling the task and result must be bigger than 3 secs
assert (end - start) > 3

start = perf_counter()
# Schedule task to be executed at time now + 3 seconds
result = add.apply_async((2, 2), eta=datetime.utcnow() + timedelta(seconds=3))
assert result.status == 'PENDING'
assert result.ready() is False
assert result.get() == 4
end = perf_counter()
assert result.status == 'SUCCESS'
assert result.ready() is True
# Difference between calling the task and result must be bigger than 3 secs
assert (end - start) > 3

def test_fail(self, manager):
"""Tests that the failing task propagates back correct exception."""
result = fail.delay()
with pytest.raises(ExpectedException):
assert result.status == 'FAILURE'
assert result.ready() is True
assert result.failed() is True
assert result.successful() is False

def test_wrong_arguments(self, manager):
"""Tests that proper exceptions are raised when task is called with wrong arguments."""
with pytest.raises(TypeError):

with pytest.raises(TypeError):
add(5, 5, wrong_arg=5)

with pytest.raises(TypeError):

with pytest.raises(TypeError):
add.delay(5, wrong_arg=5)

# Tasks with typing=False are not checked but execution should fail
result = add_not_typed.delay(5)
with pytest.raises(TypeError):
assert result.status == 'FAILURE'

result = add_not_typed.delay(5, wrong_arg=5)
with pytest.raises(TypeError):
assert result.status == 'FAILURE'

def test_retry(self, manager):
"""Tests retrying of task."""
# Tests when max. retries is reached
result = retry.delay()
for _ in range(5):
status = result.status
if status != 'PENDING':
assert status == 'RETRY'
with pytest.raises(ExpectedException):
assert result.status == 'FAILURE'

# Tests when task is retried but after returns correct result
result = retry.delay(return_value='bar')
for _ in range(5):
status = result.status
if status != 'PENDING':
assert status == 'RETRY'
assert result.get() == 'bar'
assert result.status == 'SUCCESS'

def test_task_accepted(self, manager, sleep=1):
r1 = sleeping.delay(sleep)
Expand Down

0 comments on commit a192f9c

Please sign in to comment.