/
conftest.py
110 lines (80 loc) · 2.25 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import pytest
import celery
from celery_pubsub import subscribe, unsubscribe
if celery.__version__ < "4.0.0": # pragma: no cover
celery.current_app.conf.update(
CELERY_ALWAYS_EAGER=True,
)
task = celery.task
@pytest.fixture
def celery_worker():
pass
else: # pragma: no cover
task = celery.shared_task
@pytest.fixture
def celery_config():
return {
"broker_url": "memory://",
"result_backend": "rpc://",
"broker_transport_options": {"polling_interval": 0.05},
}
if celery.__version__ >= "5.0.0":
pytest_plugins = ["celery.contrib.pytest"]
@pytest.fixture(scope="session")
def job_a():
@task(name="job_a")
def job(*args, **kwargs):
print("job_a: {} {}".format(args, kwargs))
return "a"
return job
@pytest.fixture(scope="session")
def job_b():
@task(name="job_b")
def job(*args, **kwargs):
print("job_b: {} {}".format(args, kwargs))
return "b"
return job
@pytest.fixture(scope="session")
def job_c():
@task(name="job_c")
def job(*args, **kwargs):
print("job_c: {} {}".format(args, kwargs))
return "c"
return job
@pytest.fixture(scope="session")
def job_d():
@task(name="job_d")
def job(*args, **kwargs):
print("job_d: {} {}".format(args, kwargs))
return "d"
return job
@pytest.fixture(scope="session")
def job_e():
@task(name="job_e")
def job(*args, **kwargs):
print("job_e: {} {}".format(args, kwargs))
return "e"
return job
@pytest.fixture(scope="session")
def job_f():
@task(name="job_f")
def job(*args, **kwargs):
print("job_f: {} {}".format(args, kwargs))
return "f"
return job
@pytest.fixture(scope="session")
def job_g():
@task(name="job_g")
def job(*args, **kwargs):
print("job_g: {} {}".format(args, kwargs))
return "g"
return job
@pytest.fixture(scope="session")
def subscriber(job_a, job_b, job_c, job_d, job_e, job_f, job_g):
subscribe("index.high", job_a)
subscribe("index.low", job_b)
subscribe("index", job_c)
subscribe("index.#", job_d)
subscribe("#", job_e)
subscribe("index.*.test", job_f)
subscribe("index.#.test", job_g)