-
-
Notifications
You must be signed in to change notification settings - Fork 4k
/
test_ingest_consumer_kafka.py
212 lines (172 loc) · 6.4 KB
/
test_ingest_consumer_kafka.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import datetime
import logging
import random
import time
import uuid
import msgpack
import pytest
from django.conf import settings
from sentry import eventstore
from sentry.conf.types.kafka_definition import Topic
from sentry.consumers import get_stream_processor
from sentry.event_manager import EventManager
from sentry.eventstore.processing import event_processing_store
from sentry.testutils.pytest.fixtures import django_db_all
from sentry.testutils.skips import requires_kafka, requires_snuba
from sentry.utils import json
from sentry.utils.batching_kafka_consumer import create_topics
from sentry.utils.kafka_config import get_topic_definition
pytestmark = [requires_snuba, requires_kafka]
logger = logging.getLogger(__name__)
# Poll this amount of times (for 0.1 sec each) at most to wait for messages
MAX_POLL_ITERATIONS = 100
# Block size for shared memory of the multiprocessing kafka consumer strategy.
# Any reasonable value will do for tests.
DEFAULT_BLOCK_SIZE = int(32 * 1e6)
@pytest.fixture
def get_test_message(default_project):
"""
creates a test message to be inserted in a kafka queue
"""
def inner(type, project=default_project):
now = datetime.datetime.now()
# the event id should be 32 digits
event_id = uuid.uuid4().hex
message_text = f"some message {event_id}"
project_id = project.id # must match the project id set up by the test fixtures
if type == "transaction":
event = {
"type": "transaction",
"timestamp": now.isoformat(),
"start_timestamp": now.isoformat(),
"event_id": event_id,
"spans": [],
"contexts": {
"trace": {
"parent_span_id": "8988cec7cc0779c1",
"type": "trace",
"op": "foobar",
"trace_id": "a7d67cf796774551a95be6543cacd459",
"span_id": "babaae0d4b7512d9",
"status": "ok",
}
},
}
elif type == "event":
event = {
"message": message_text,
"extra": {"the_id": event_id},
"project": project_id,
"event_id": event_id,
}
else:
raise ValueError(type)
em = EventManager(event, project=project)
em.normalize()
normalized_event = dict(em.get_data())
message = {
"type": "event",
"start_time": time.time(),
"event_id": event_id,
"project_id": int(project_id),
"payload": json.dumps(normalized_event),
}
val = msgpack.packb(message)
return val, event_id
return inner
@pytest.fixture
def random_group_id():
return f"test-consumer-{random.randint(0, 2 ** 16)}"
@django_db_all(transaction=True)
def test_ingest_consumer_reads_from_topic_and_calls_celery_task(
task_runner,
kafka_producer,
kafka_admin,
default_project,
get_test_message,
random_group_id,
):
topic = Topic.INGEST_EVENTS
topic_event_name = get_topic_definition(topic)["real_topic_name"]
admin = kafka_admin(settings)
admin.delete_topic(topic_event_name)
producer = kafka_producer(settings)
create_topics("default", [topic_event_name])
message, event_id = get_test_message(type="event")
producer.produce(topic_event_name, message)
transaction_message, transaction_event_id = get_test_message(type="transaction")
producer.produce(topic_event_name, transaction_message)
consumer = get_stream_processor(
"ingest-events",
consumer_args=["--max-batch-size=2", "--max-batch-time-ms=5000", "--processes=10"],
topic=None,
cluster=None,
group_id=random_group_id,
auto_offset_reset="earliest",
strict_offset_reset=False,
)
with task_runner():
i = 0
while i < MAX_POLL_ITERATIONS:
transaction_message = eventstore.backend.get_event_by_id(
default_project.id, transaction_event_id
)
message = eventstore.backend.get_event_by_id(default_project.id, event_id)
if transaction_message and message:
break
consumer._run_once()
i += 1
# check that we got the messages
assert message.data["event_id"] == event_id
assert message.data["extra"]["the_id"] == event_id
assert transaction_message.data["event_id"] == transaction_event_id
assert transaction_message.data["spans"] == []
assert transaction_message.data["contexts"]["trace"]
@django_db_all(transaction=True)
def test_ingest_consumer_gets_event_unstuck(
task_runner,
kafka_producer,
kafka_admin,
default_project,
get_test_message,
random_group_id,
):
topic = Topic.INGEST_EVENTS
topic_event_name = get_topic_definition(topic)["real_topic_name"]
admin = kafka_admin(settings)
admin.delete_topic(topic_event_name)
producer = kafka_producer(settings)
create_topics("default", [topic_event_name])
message1, event_id1 = get_test_message(type="event")
producer.produce(topic_event_name, message1)
message2, event_id2 = get_test_message(type="event")
producer.produce(topic_event_name, message2)
# an event is "stuck" when it is in the processing store, so lets fake that:
event_processing_store.store({"project": default_project.id, "event_id": event_id2})
consumer = get_stream_processor(
"ingest-events",
consumer_args=[
"--max-batch-size=2",
"--max-batch-time-ms=5000",
"--processes=10",
"--reprocess-only-stuck-events",
],
topic=None,
cluster=None,
group_id=random_group_id,
auto_offset_reset="earliest",
strict_offset_reset=False,
)
with task_runner():
i = 0
while i < MAX_POLL_ITERATIONS:
message = eventstore.backend.get_event_by_id(default_project.id, event_id2)
if message:
break
consumer._run_once()
i += 1
# check that we got the messages
assert message.data["event_id"] == event_id2
assert message.data["extra"]["the_id"] == event_id2
# the first event was never "stuck", so we expect it to be skipped
assert not eventstore.backend.get_event_by_id(default_project.id, event_id1)