/
aiokafka.py
1604 lines (1425 loc) · 58.5 KB
/
aiokafka.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Message transport using :pypi:`aiokafka`."""
import asyncio
import typing
from asyncio import Lock, QueueEmpty
from collections import deque
from functools import partial
from time import monotonic
from typing import (
Any,
Awaitable,
Callable,
ClassVar,
Iterable,
List,
Mapping,
MutableMapping,
Optional,
Set,
Tuple,
Type,
cast,
no_type_check,
)
import aiokafka
import aiokafka.abc
import opentracing
from aiokafka.consumer.group_coordinator import OffsetCommitRequest
from aiokafka.errors import (
CommitFailedError,
ConsumerStoppedError,
IllegalStateError,
KafkaError,
ProducerFenced,
)
from aiokafka.structs import OffsetAndMetadata, TopicPartition as _TopicPartition
from aiokafka.util import parse_kafka_version
from kafka import TopicPartition
from kafka.errors import (
NotControllerError,
TopicAlreadyExistsError as TopicExistsError,
for_code,
)
from kafka.partitioner import murmur2
from kafka.partitioner.default import DefaultPartitioner
from kafka.protocol.metadata import MetadataRequest_v1
from mode import Service, get_logger
from mode.threads import ServiceThread, WorkerThread
from mode.utils import text
from mode.utils.futures import StampedeWrapper
from mode.utils.objects import cached_property
from mode.utils.times import Seconds, humanize_seconds_ago, want_seconds
from mode.utils.typing import Deque
from opentracing.ext import tags
from yarl import URL
from faust.auth import GSSAPICredentials, SASLCredentials, SSLCredentials
from faust.exceptions import (
ConsumerNotStarted,
ImproperlyConfigured,
NotReady,
ProducerSendError,
)
from faust.transport import base
from faust.transport.consumer import (
ConsumerThread,
RecordMap,
ThreadDelegateConsumer,
ensure_TPset,
)
from faust.types import (
TP,
ConsumerMessage,
FutureMessage,
HeadersArg,
PendingMessage,
RecordMetadata,
)
from faust.types.auth import CredentialsT
from faust.types.transports import ConsumerT, PartitionerT, ProducerT
from faust.utils.kafka.protocol.admin import CreateTopicsRequest
from faust.utils.tracing import noop_span, set_current_span, traced_from_parent_span
__all__ = ["Consumer", "Producer", "Transport"]
# if not hasattr(aiokafka, '__robinhood__'): # pragma: no cover
# raise RuntimeError(
# 'Please install robinhood-aiokafka, not aiokafka')
logger = get_logger(__name__)
DEFAULT_GENERATION_ID = OffsetCommitRequest.DEFAULT_GENERATION_ID
TOPIC_LENGTH_MAX = 249
SLOW_PROCESSING_CAUSE_AGENT = """
The agent processing the stream is hanging (waiting for network, I/O or \
infinite loop).
""".strip()
SLOW_PROCESSING_CAUSE_STREAM = """
The stream has stopped processing events for some reason.
""".strip()
SLOW_PROCESSING_CAUSE_COMMIT = """
The commit handler background thread has stopped working (report as bug).
""".strip()
SLOW_PROCESSING_EXPLAINED = """
There are multiple possible explanations for this:
1) The processing of a single event in the stream
is taking too long.
The timeout for this is defined by the %(setting)s setting,
currently set to %(current_value)r. If you expect the time
required to process an event, to be greater than this then please
increase the timeout.
"""
SLOW_PROCESSING_NO_FETCH_SINCE_START = """
Aiokafka has not sent fetch request for %r since start (started %s)
""".strip()
SLOW_PROCESSING_NO_RESPONSE_SINCE_START = """
Aiokafka has not received fetch response for %r since start (started %s)
""".strip()
SLOW_PROCESSING_NO_RECENT_FETCH = """
Aiokafka stopped fetching from %r (last done %s)
""".strip()
SLOW_PROCESSING_NO_RECENT_RESPONSE = """
Broker stopped responding to fetch requests for %r (last responded %s)
""".strip()
SLOW_PROCESSING_NO_HIGHWATER_SINCE_START = """
Highwater not yet available for %r (started %s).
""".strip()
SLOW_PROCESSING_STREAM_IDLE_SINCE_START = """
Stream has not started processing %r (started %s).
""".strip()
SLOW_PROCESSING_STREAM_IDLE = """
Stream stopped processing, or is slow for %r (last inbound %s).
""".strip()
SLOW_PROCESSING_NO_COMMIT_SINCE_START = """
Has not committed %r at all since worker start (started %s).
""".strip()
SLOW_PROCESSING_NO_RECENT_COMMIT = """
Has not committed %r (last commit %s).
""".strip()
def __canon_host(host, default):
"""Ensure host is correctly formatted for aiokafka. That means IPv6
addresses must enclosed in squared brackets.
"""
if not host:
return default
if ":" in host:
return f"[{host}]"
return host
def server_list(urls: List[URL], default_port: int) -> List[str]:
"""Convert list of urls to list of servers accepted by :pypi:`aiokafka`."""
default_host = "127.0.0.1"
# Yarl strips [] from IPv6 adresses, and aiokafka expects them.
return [
f"{__canon_host(u.host, default_host)}:{u.port or default_port}" for u in urls
]
class ConsumerRebalanceListener(aiokafka.abc.ConsumerRebalanceListener): # type: ignore
# kafka's ridiculous class based callback interface makes this hacky.
def __init__(self, thread: ConsumerThread) -> None:
self._thread: ConsumerThread = thread
def on_partitions_revoked(self, revoked: Iterable[_TopicPartition]) -> Awaitable:
"""Call when partitions are being revoked."""
thread = self._thread
# XXX Must call app.on_rebalance_start as early as possible.
# we call this in the sync method, this way when we know
# that it will be called even if await never returns to the coroutine.
thread.app.on_rebalance_start()
# this way we should also get a warning if the coroutine
# is never awaited.
return thread.on_partitions_revoked(ensure_TPset(revoked))
async def on_partitions_assigned(self, assigned: Iterable[_TopicPartition]) -> None:
"""Call when partitions are being assigned."""
generation = self._thread._ensure_consumer()._coordinator.generation
# set the generation on the app
self._thread.app.consumer_generation_id = generation
await self._thread.on_partitions_assigned(ensure_TPset(assigned), generation)
class Consumer(ThreadDelegateConsumer):
"""Kafka consumer using :pypi:`aiokafka`."""
logger = logger
RebalanceListener: ClassVar[Type[ConsumerRebalanceListener]]
RebalanceListener = ConsumerRebalanceListener
consumer_stopped_errors: ClassVar[Tuple[Type[BaseException], ...]] = (
ConsumerStoppedError,
)
def _new_consumer_thread(self) -> ConsumerThread:
return AIOKafkaConsumerThread(self, loop=self.loop, beacon=self.beacon)
async def create_topic(
self,
topic: str,
partitions: int,
replication: int,
*,
config: Optional[Mapping[str, Any]] = None,
timeout: Seconds = 30.0,
retention: Optional[Seconds] = None,
compacting: Optional[bool] = None,
deleting: Optional[bool] = None,
ensure_created: bool = False,
) -> None:
"""Create/declare topic on server."""
await self._thread.create_topic(
topic,
partitions,
replication,
config=config,
timeout=timeout,
retention=retention,
compacting=compacting,
deleting=deleting,
ensure_created=ensure_created,
)
def _new_topicpartition(self, topic: str, partition: int) -> TP:
return cast(TP, _TopicPartition(topic, partition))
def _to_message(self, tp: TP, record: Any) -> ConsumerMessage:
timestamp: Optional[int] = record.timestamp
timestamp_s: float = cast(float, None)
if timestamp is not None:
timestamp_s = timestamp / 1000.0
return ConsumerMessage(
record.topic,
record.partition,
record.offset,
timestamp_s,
record.timestamp_type,
record.headers,
record.key,
record.value,
record.checksum,
record.serialized_key_size,
record.serialized_value_size,
tp,
generation_id=self.app.consumer_generation_id,
)
async def on_stop(self) -> None:
"""Call when consumer is stopping."""
await super().on_stop()
transport = cast(Transport, self.transport)
transport._topic_waiters.clear()
def verify_event_path(self, now: float, tp: TP) -> None:
return self._thread.verify_event_path(now, tp)
class ThreadedProducer(ServiceThread):
_producer: Optional[aiokafka.AIOKafkaProducer] = None
event_queue: Optional[asyncio.Queue] = None
_default_producer: Optional[aiokafka.AIOKafkaProducer] = None
_push_events_task: Optional[asyncio.Task] = None
app: None
stopped: bool
def __init__(
self,
default_producer,
app,
*,
executor: Any = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
thread_loop: Optional[asyncio.AbstractEventLoop] = None,
Worker: Type[WorkerThread] = None,
**kwargs: Any,
) -> None:
super().__init__(
executor=executor,
loop=loop,
thread_loop=thread_loop,
Worker=Worker,
**kwargs,
)
self._default_producer = default_producer
self.app = app
async def flush(self) -> None:
"""Wait for producer to finish transmitting all buffered messages."""
while True:
try:
msg = self.event_queue.get_nowait()
except QueueEmpty:
break
else:
await self.publish_message(msg)
if self._producer is not None:
await self._producer.flush()
def _new_producer(
self, transactional_id: Optional[str] = None
) -> aiokafka.AIOKafkaProducer:
return aiokafka.AIOKafkaProducer(
loop=self.thread_loop,
**{
**self._default_producer._settings_default(),
**self._default_producer._settings_auth(),
**self._default_producer._settings_extra(),
},
transactional_id=transactional_id,
)
async def on_start(self) -> None:
self.event_queue = asyncio.Queue()
producer = self._producer = self._new_producer()
await producer.start()
self.stopped = False
self._push_events_task = self.thread_loop.create_task(self.push_events())
async def on_thread_stop(self) -> None:
"""Call when producer thread is stopping."""
logger.info("Stopping producer thread")
await super().on_thread_stop()
self.stopped = True
# when method queue is stopped, we can stop the consumer
if self._producer is not None:
await self.flush()
await self._producer.stop()
if self._push_events_task is not None:
while not self._push_events_task.done():
await asyncio.sleep(0.1)
async def push_events(self):
while not self.stopped:
try:
event = await asyncio.wait_for(self.event_queue.get(), timeout=0.1)
except asyncio.TimeoutError:
continue
self.app.sensors.on_threaded_producer_buffer_processed(
app=self.app, size=self.event_queue.qsize()
)
await self.publish_message(event)
async def publish_message(
self, fut_other: FutureMessage, wait: bool = False
) -> Awaitable[RecordMetadata]:
"""Fulfill promise to publish message to topic."""
fut = FutureMessage(fut_other.message)
message: PendingMessage = fut.message
topic = message.channel.get_topic_name()
key: bytes = cast(bytes, message.key)
value: bytes = cast(bytes, message.value)
partition: Optional[int] = message.partition
timestamp: float = cast(float, message.timestamp)
headers: Optional[HeadersArg] = message.headers
logger.debug(
"send: topic=%r k=%r v=%r timestamp=%r partition=%r",
topic,
key,
value,
timestamp,
partition,
)
producer = self._producer
state = self.app.sensors.on_send_initiated(
producer,
topic,
message=message,
keysize=len(key) if key else 0,
valsize=len(value) if value else 0,
)
timestamp_ms = int(timestamp * 1000.0) if timestamp else timestamp
if headers is not None:
if isinstance(headers, Mapping):
headers = list(headers.items())
if wait:
ret: RecordMetadata = await producer.send_and_wait(
topic=topic,
key=key,
value=value,
partition=partition,
timestamp_ms=timestamp_ms,
headers=headers,
)
fut.message.channel._on_published(
message=fut, state=state, producer=producer
)
fut.set_result(ret)
return fut
else:
fut2 = cast(
asyncio.Future,
await producer.send(
topic=topic,
key=key,
value=value,
partition=partition,
timestamp_ms=timestamp_ms,
headers=headers,
),
)
callback = partial(
fut.message.channel._on_published,
message=fut,
state=state,
producer=producer,
)
fut2.add_done_callback(cast(Callable, callback))
return fut2
class AIOKafkaConsumerThread(ConsumerThread):
_consumer: Optional[aiokafka.AIOKafkaConsumer] = None
_pending_rebalancing_spans: Deque[opentracing.Span]
tp_last_committed_at: MutableMapping[TP, float]
time_started: float
tp_fetch_request_timeout_secs: float
tp_fetch_response_timeout_secs: float
tp_stream_timeout_secs: float
tp_commit_timeout_secs: float
def __post_init__(self) -> None:
consumer = cast(Consumer, self.consumer)
self._partitioner: PartitionerT = (
self.app.conf.producer_partitioner or DefaultPartitioner()
)
self._rebalance_listener = consumer.RebalanceListener(self)
self._pending_rebalancing_spans = deque()
self.tp_last_committed_at = {}
app = self.consumer.app
stream_processing_timeout = app.conf.stream_processing_timeout
self.tp_fetch_request_timeout_secs = stream_processing_timeout
self.tp_fetch_response_timeout_secs = stream_processing_timeout
self.tp_stream_timeout_secs = stream_processing_timeout
commit_livelock_timeout = app.conf.broker_commit_livelock_soft_timeout
self.tp_commit_timeout_secs = commit_livelock_timeout
async def on_start(self) -> None:
"""Call when consumer starts."""
self._consumer = self._create_consumer(loop=self.thread_loop)
self.time_started = monotonic()
await self._consumer.start()
async def on_thread_stop(self) -> None:
"""Call when consumer thread is stopping."""
# super stops thread method queue (QueueServiceThread.method_queue)
await super().on_thread_stop()
# when method queue is stopped, we can stop the consumer
if self._consumer is not None:
await self._consumer.stop()
def _create_consumer(
self, loop: asyncio.AbstractEventLoop
) -> aiokafka.AIOKafkaConsumer:
transport = cast(Transport, self.transport)
if self.app.client_only:
return self._create_client_consumer(transport, loop=loop)
else:
return self._create_worker_consumer(transport, loop=loop)
def _create_worker_consumer(
self, transport: "Transport", loop: asyncio.AbstractEventLoop
) -> aiokafka.AIOKafkaConsumer:
isolation_level: str = "read_uncommitted"
conf = self.app.conf
if self.consumer.in_transaction:
isolation_level = "read_committed"
self._assignor = self.app.assignor
auth_settings = credentials_to_aiokafka_auth(
conf.broker_credentials, conf.ssl_context
)
max_poll_interval = conf.broker_max_poll_interval or 0
request_timeout = conf.broker_request_timeout
session_timeout = conf.broker_session_timeout
rebalance_timeout = conf.broker_rebalance_timeout
if session_timeout > request_timeout:
raise ImproperlyConfigured(
f"Setting broker_session_timeout={session_timeout} "
f"cannot be greater than "
f"broker_request_timeout={request_timeout}"
)
return aiokafka.AIOKafkaConsumer(
loop=loop,
api_version=conf.consumer_api_version,
client_id=conf.broker_client_id,
group_id=conf.id,
# group_instance_id=conf.consumer_group_instance_id,
bootstrap_servers=server_list(transport.url, transport.default_port),
partition_assignment_strategy=[self._assignor],
enable_auto_commit=False,
auto_offset_reset=conf.consumer_auto_offset_reset,
max_poll_records=conf.broker_max_poll_records,
max_poll_interval_ms=int(max_poll_interval * 1000.0),
max_partition_fetch_bytes=conf.consumer_max_fetch_size,
fetch_max_wait_ms=1500,
request_timeout_ms=int(request_timeout * 1000.0),
check_crcs=conf.broker_check_crcs,
session_timeout_ms=int(session_timeout * 1000.0),
rebalance_timeout_ms=int(rebalance_timeout * 1000.0),
heartbeat_interval_ms=int(conf.broker_heartbeat_interval * 1000.0),
isolation_level=isolation_level,
metadata_max_age_ms=conf.consumer_metadata_max_age_ms,
connections_max_idle_ms=conf.consumer_connections_max_idle_ms,
# traced_from_parent_span=self.traced_from_parent_span,
# start_rebalancing_span=self.start_rebalancing_span,
# start_coordinator_span=self.start_coordinator_span,
# on_generation_id_known=self.on_generation_id_known,
# flush_spans=self.flush_spans,
**auth_settings,
)
def _create_client_consumer(
self, transport: "Transport", loop: asyncio.AbstractEventLoop
) -> aiokafka.AIOKafkaConsumer:
conf = self.app.conf
auth_settings = credentials_to_aiokafka_auth(
conf.broker_credentials, conf.ssl_context
)
max_poll_interval = conf.broker_max_poll_interval or 0
return aiokafka.AIOKafkaConsumer(
loop=loop,
client_id=conf.broker_client_id,
bootstrap_servers=server_list(transport.url, transport.default_port),
request_timeout_ms=int(conf.broker_request_timeout * 1000.0),
enable_auto_commit=True,
max_poll_records=conf.broker_max_poll_records,
max_poll_interval_ms=int(max_poll_interval * 1000.0),
auto_offset_reset=conf.consumer_auto_offset_reset,
check_crcs=conf.broker_check_crcs,
**auth_settings,
)
@cached_property
def trace_category(self) -> str:
return f"{self.app.conf.name}-_aiokafka"
def start_rebalancing_span(self) -> opentracing.Span:
return self._start_span("rebalancing", lazy=True)
def start_coordinator_span(self) -> opentracing.Span:
return self._start_span("coordinator")
def _start_span(self, name: str, *, lazy: bool = False) -> opentracing.Span:
tracer = self.app.tracer
if tracer is not None:
span = tracer.get_tracer(self.trace_category).start_span(
operation_name=name,
)
span.set_tag(tags.SAMPLING_PRIORITY, 1)
self.app._span_add_default_tags(span)
set_current_span(span)
if lazy:
self._transform_span_lazy(span)
return span
else:
return noop_span()
@no_type_check
def _transform_span_lazy(self, span: opentracing.Span) -> None:
# XXX slow
consumer = self
if typing.TYPE_CHECKING:
# MyPy completely disallows the statements below
# claiming it is an illegal dynamic baseclass.
# We know mypy, but do it anyway :D
pass
else:
cls = span.__class__
class LazySpan(cls):
def finish(self) -> None:
consumer._span_finish(span)
span._real_finish, span.finish = span.finish, LazySpan.finish
def _span_finish(self, span: opentracing.Span) -> None:
assert self._consumer is not None
if self._consumer._coordinator.generation == DEFAULT_GENERATION_ID:
self._on_span_generation_pending(span)
else:
self._on_span_generation_known(span)
def _on_span_generation_pending(self, span: opentracing.Span) -> None:
self._pending_rebalancing_spans.append(span)
def _on_span_generation_known(self, span: opentracing.Span) -> None:
if self._consumer:
coordinator = self._consumer._coordinator
coordinator_id = coordinator.coordinator_id
app_id = self.app.conf.id
generation = coordinator.generation
member_id = coordinator.member_id
try:
op_name = span.operation_name
set_tag = span.set_tag
except AttributeError: # pragma: no cover
pass # not a real span
else:
trace_id_str = f"reb-{app_id}-{generation}"
trace_id = murmur2(trace_id_str.encode())
span.context.trace_id = trace_id
if op_name.endswith(".REPLACE_WITH_MEMBER_ID"):
span.set_operation_name(f"rebalancing node {member_id}")
set_tag("kafka_generation", generation)
set_tag("kafka_member_id", member_id)
set_tag("kafka_coordinator_id", coordinator_id)
self.app._span_add_default_tags(span)
span._real_finish()
def _on_span_cancelled_early(self, span: opentracing.Span) -> None:
try:
op_name = span.operation_name
except AttributeError:
return
else:
span.set_operation_name(f"{op_name} (CANCELLED)")
span._real_finish()
def traced_from_parent_span(
self, parent_span: opentracing.Span, lazy: bool = False, **extra_context: Any
) -> Callable:
return traced_from_parent_span(
parent_span,
callback=self._transform_span_lazy if lazy else None,
**extra_context,
)
def flush_spans(self) -> None:
while self._pending_rebalancing_spans:
span = self._pending_rebalancing_spans.popleft()
self._on_span_cancelled_early(span)
def on_generation_id_known(self) -> None:
while self._pending_rebalancing_spans:
span = self._pending_rebalancing_spans.popleft()
self._on_span_generation_known(span)
def close(self) -> None:
"""Close consumer for graceful shutdown."""
if self._consumer is not None:
self._consumer._closed = True
asyncio.run_coroutine_threadsafe(
self._consumer._client.close(), self.app.loop
)
asyncio.run_coroutine_threadsafe(
self._consumer._coordinator.close(), self.app.loop
)
async def subscribe(self, topics: Iterable[str]) -> None:
"""Reset subscription (requires rebalance)."""
# XXX pattern does not work :/
await self.call_thread(
self._ensure_consumer().subscribe,
topics=set(topics),
listener=self._rebalance_listener,
)
async def seek_to_committed(self) -> Mapping[TP, int]:
"""Seek partitions to the last committed offset."""
return await self.call_thread(self._ensure_consumer().seek_to_committed)
async def commit(self, offsets: Mapping[TP, int]) -> bool:
"""Commit topic offsets."""
return await self.call_thread(self._commit, offsets)
async def _commit(self, offsets: Mapping[TP, int]) -> bool:
consumer = self._ensure_consumer()
now = monotonic()
try:
aiokafka_offsets = {
tp: OffsetAndMetadata(offset, "")
for tp, offset in offsets.items()
if tp in self.assignment()
}
self.tp_last_committed_at.update({tp: now for tp in aiokafka_offsets})
await consumer.commit(aiokafka_offsets)
except CommitFailedError as exc:
if "already rebalanced" in str(exc):
return False
self.log.exception("Committing raised exception: %r", exc)
await self.crash(exc)
return False
except IllegalStateError as exc:
self.log.exception(
"Got exception: %r\nCurrent assignment: %r", exc, self.assignment()
)
await self.crash(exc)
return False
except Exception as exc:
self.log.exception(
"Got exception: %r\nCurrent assignment: %r", exc, self.assignment()
)
await self.crash(exc)
return False
return True
def verify_event_path(self, now: float, tp: TP) -> None:
# long function ahead, but not difficult to test
# as it always returns as soon as some condition is met.
if self._verify_aiokafka_event_path(now, tp):
# already logged error.
return None
parent = cast(Consumer, self.consumer)
app = parent.app
monitor = app.monitor
acks_enabled_for = app.topics.acks_enabled_for
secs_since_started = now - self.time_started
if monitor is not None: # need for .stream_inbound_time
aiotp = TopicPartition(tp.topic, tp.partition)
tp_state = self._ensure_consumer()._fetcher._subscriptions.subscription.assignment.state_value( # noqa: E501
aiotp
)
highwater = tp_state.highwater
committed_offset = tp_state.position
has_acks = acks_enabled_for(tp.topic)
if highwater is None:
if secs_since_started >= self.tp_stream_timeout_secs:
# AIOKAFKA HAS NOT UPDATED HIGHWATER SINCE STARTING
self.log.error(
SLOW_PROCESSING_NO_HIGHWATER_SINCE_START,
tp,
humanize_seconds_ago(secs_since_started),
)
return None
if has_acks and committed_offset is not None:
if highwater > committed_offset:
inbound = monitor.stream_inbound_time.get(tp)
if inbound is None:
if secs_since_started >= self.tp_stream_timeout_secs:
# AIOKAFKA IS FETCHING BUT STREAM IS NOT
# PROCESSING EVENTS (no events at all since
# start).
self._log_slow_processing_stream(
SLOW_PROCESSING_STREAM_IDLE_SINCE_START,
tp,
humanize_seconds_ago(secs_since_started),
)
return None
secs_since_stream = now - inbound
if secs_since_stream >= self.tp_stream_timeout_secs:
# AIOKAFKA IS FETCHING, AND STREAM WAS WORKING
# BEFORE BUT NOW HAS STOPPED PROCESSING
# (or processing of an event in the stream takes
# longer than tp_stream_timeout_secs).
self._log_slow_processing_stream(
SLOW_PROCESSING_STREAM_IDLE,
tp,
humanize_seconds_ago(secs_since_stream),
)
return None
last_commit = self.tp_last_committed_at.get(tp)
if last_commit is None:
if secs_since_started >= self.tp_commit_timeout_secs:
# AIOKAFKA IS FETCHING AND STREAM IS PROCESSING
# BUT WE HAVE NOT COMMITTED ANYTHING SINCE WORKER
# START.
self._log_slow_processing_commit(
SLOW_PROCESSING_NO_COMMIT_SINCE_START,
tp,
humanize_seconds_ago(secs_since_started),
)
return None
else:
secs_since_commit = now - last_commit
if secs_since_commit >= self.tp_commit_timeout_secs:
# AIOKAFKA IS FETCHING AND STREAM IS PROCESSING
# BUT WE HAVE NOT COMITTED ANYTHING IN A WHILE
# (commit offset is not advancing).
self._log_slow_processing_commit(
SLOW_PROCESSING_NO_RECENT_COMMIT,
tp,
humanize_seconds_ago(secs_since_commit),
)
return None
def verify_recovery_event_path(self, now: float, tp: TP) -> None:
self._verify_aiokafka_event_path(now, tp)
def _verify_aiokafka_event_path(self, now: float, tp: TP) -> bool:
"""Verify that :pypi:`aiokafka` event path is working.
Returns :const:`True` if any error was logged.
"""
consumer = self._ensure_consumer()
secs_since_started = now - self.time_started
aiotp = TopicPartition(tp.topic, tp.partition)
assignment = consumer._fetcher._subscriptions.subscription.assignment
if not assignment and not assignment.active:
self.log.error(f"No active partitions for {tp}")
return True
poll_at = None
aiotp_state = assignment.state_value(aiotp)
if aiotp_state and aiotp_state.timestamp:
poll_at = aiotp_state.timestamp / 1000
if poll_at is None:
if secs_since_started >= self.tp_fetch_request_timeout_secs:
# NO FETCH REQUEST SENT AT ALL SINCE WORKER START
self.log.error(
SLOW_PROCESSING_NO_FETCH_SINCE_START,
tp,
humanize_seconds_ago(secs_since_started),
)
return True
secs_since_request = now - poll_at
if secs_since_request >= self.tp_fetch_request_timeout_secs:
# NO REQUEST SENT BY AIOKAFKA IN THE LAST n SECONDS
self.log.error(
SLOW_PROCESSING_NO_RECENT_FETCH,
tp,
humanize_seconds_ago(secs_since_request),
)
return True
return False
def _log_slow_processing_stream(self, msg: str, *args: Any) -> None:
app = self.consumer.app
self._log_slow_processing(
msg,
*args,
causes=[
SLOW_PROCESSING_CAUSE_STREAM,
SLOW_PROCESSING_CAUSE_AGENT,
],
setting="stream_processing_timeout",
current_value=app.conf.stream_processing_timeout,
)
def _log_slow_processing_commit(self, msg: str, *args: Any) -> None:
app = self.consumer.app
self._log_slow_processing(
msg,
*args,
causes=[SLOW_PROCESSING_CAUSE_COMMIT],
setting="broker_commit_livelock_soft_timeout",
current_value=app.conf.broker_commit_livelock_soft_timeout,
)
def _make_slow_processing_error(
self, msg: str, causes: Iterable[str], setting: str, current_value: float
) -> str:
return " ".join(
[
msg,
SLOW_PROCESSING_EXPLAINED
% {"setting": setting, "current_value": current_value},
text.enumeration(causes, start=2, sep="\n\n"),
]
)
def _log_slow_processing(
self,
msg: str,
*args: Any,
causes: Iterable[str],
setting: str,
current_value: float,
) -> None:
return self.log.error(
self._make_slow_processing_error(msg, causes, setting, current_value),
*args,
)
async def position(self, tp: TP) -> Optional[int]:
"""Return the current position for topic partition."""
return await self.call_thread(self._ensure_consumer().position, tp)
async def seek_to_beginning(self, *partitions: _TopicPartition) -> None:
"""Seek list of offsets to the first available offset."""
await self.call_thread(self._ensure_consumer().seek_to_beginning, *partitions)
async def seek_wait(self, partitions: Mapping[TP, int]) -> None:
"""Seek partitions to specific offset and wait for operation."""
consumer = self._ensure_consumer()
await self.call_thread(self._seek_wait, consumer, partitions)
async def _seek_wait(
self, consumer: Consumer, partitions: Mapping[TP, int]
) -> None:
for tp, offset in partitions.items():
self.log.dev("SEEK %r -> %r", tp, offset)
consumer.seek(tp, offset)
if offset > 0:
self.consumer._read_offset[tp] = offset
elif tp in self.consumer._read_offset.keys():
del self.consumer._read_offset[tp]
await asyncio.wait_for(
asyncio.gather(*[consumer.position(tp) for tp in partitions]),
timeout=self.app.conf.broker_request_timeout,
)
def seek(self, partition: TP, offset: int) -> None:
"""Seek partition to specific offset."""
self._ensure_consumer().seek(partition, offset)
def assignment(self) -> Set[TP]:
"""Return the current assignment."""
return ensure_TPset(self._ensure_consumer().assignment())
def highwater(self, tp: TP) -> int:
"""Return the last offset in a specific partition."""
if self.consumer.in_transaction:
return self._ensure_consumer().last_stable_offset(tp)
else:
return self._ensure_consumer().highwater(tp)
def topic_partitions(self, topic: str) -> Optional[int]:
"""Return the number of partitions configured for topic by name."""
if self._consumer is not None:
return self._consumer._coordinator._metadata_snapshot.get(topic)
return None
async def earliest_offsets(self, *partitions: TP) -> Mapping[TP, int]:
"""Return the earliest offsets for a list of partitions."""
return await self.call_thread(
self._ensure_consumer().beginning_offsets, partitions
)
async def highwaters(self, *partitions: TP) -> Mapping[TP, int]:
"""Return the last offsets for a list of partitions."""
return await self.call_thread(self._highwaters, partitions)
async def _highwaters(self, partitions: List[TP]) -> Mapping[TP, int]:
consumer = self._ensure_consumer()
if self.consumer.in_transaction:
return {tp: consumer.last_stable_offset(tp) for tp in partitions}
else:
return cast(Mapping[TP, int], await consumer.end_offsets(partitions))
def _ensure_consumer(self) -> aiokafka.AIOKafkaConsumer:
if self._consumer is None:
raise ConsumerNotStarted("Consumer thread not yet started")
return self._consumer
async def getmany(
self, active_partitions: Optional[Set[TP]], timeout: float
) -> RecordMap:
"""Fetch batch of messages from server."""
# Implementation for the Fetcher service.
_consumer = self._ensure_consumer()
# NOTE: Since we are enqueing the fetch request,
# we need to check when dequeued that we are not in a rebalancing
# state at that point to return early, or we
# will create a deadlock (fetch request starts after flow stopped)
return await self.call_thread(
self._fetch_records,
_consumer,
active_partitions,
timeout=timeout,
max_records=_consumer._max_poll_records,
)
async def _fetch_records(
self,
consumer: aiokafka.AIOKafkaConsumer,
active_partitions: Set[TP],
timeout: Optional[float] = None,
max_records: Optional[int] = None,
) -> RecordMap:
if not self.consumer.flow_active: