Skip to content

Commit d166668

Browse files
committed
KAFKA-4937: Batch offset fetches in the Consumer
1 parent 2f262b9 commit d166668

File tree

4 files changed

+39
-34
lines changed

4 files changed

+39
-34
lines changed

kafka/consumer/fetcher.py

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,13 @@ def reset_offsets_if_needed(self, partitions, timeout_ms=None):
161161
Raises:
162162
KafkaTimeoutError if timeout_ms provided
163163
"""
164-
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout resetting offsets')
164+
needs_offset_reset = set()
165165
for tp in partitions:
166-
# TODO: If there are several offsets to reset, we could submit offset requests in parallel
167166
if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp):
168-
self._reset_offset(tp, timeout_ms=inner_timeout_ms())
167+
needs_offset_reset.add(tp)
168+
169+
if needs_offset_reset:
170+
self._reset_offsets(needs_offset_reset, timeout_ms=timeout_ms)
169171

170172
def _clean_done_fetch_futures(self):
171173
while True:
@@ -191,25 +193,28 @@ def update_fetch_positions(self, partitions, timeout_ms=None):
191193
partition and no reset policy is available
192194
KafkaTimeoutError if timeout_ms provided.
193195
"""
194-
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout updating fetch positions')
196+
needs_offset_reset = set()
195197
# reset the fetch position to the committed position
196198
for tp in partitions:
197199
if not self._subscriptions.is_assigned(tp) or self._subscriptions.has_valid_position(tp):
198200
continue
199201

200202
if self._subscriptions.is_offset_reset_needed(tp):
201-
self._reset_offset(tp, timeout_ms=inner_timeout_ms())
203+
needs_offset_reset.add(tp)
202204
elif self._subscriptions.assignment[tp].committed is None:
203205
# there's no committed position, so we need to reset with the
204206
# default strategy
205207
self._subscriptions.need_offset_reset(tp)
206-
self._reset_offset(tp, timeout_ms=inner_timeout_ms())
208+
needs_offset_reset.add(tp)
207209
else:
208210
committed = self._subscriptions.assignment[tp].committed.offset
209211
log.debug("Resetting offset for partition %s to the committed"
210212
" offset %s", tp, committed)
211213
self._subscriptions.seek(tp, committed)
212214

215+
if needs_offset_reset:
216+
self._reset_offsets(needs_offset_reset, timeout_ms=timeout_ms)
217+
213218
def get_offsets_by_times(self, timestamps, timeout_ms):
214219
offsets = self._retrieve_offsets(timestamps, timeout_ms)
215220
for tp in timestamps:
@@ -232,37 +237,36 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
232237
offsets[tp] = offsets[tp].offset
233238
return offsets
234239

235-
def _reset_offset(self, partition, timeout_ms=None):
236-
"""Reset offsets for the given partition using the offset reset strategy.
240+
def _reset_offsets(self, partitions, timeout_ms=None):
241+
"""Reset offsets for the given partitions using the offset reset strategy.
237242
238243
Arguments:
239-
partition (TopicPartition): the partition that needs reset offset
244+
partitions ([TopicPartition]): the partitions that need offsets reset
240245
241246
Raises:
242247
NoOffsetForPartitionError: if no offset reset strategy is defined
243248
KafkaTimeoutError if timeout_ms provided
244249
"""
245-
timestamp = self._subscriptions.assignment[partition].reset_strategy
246-
if timestamp is OffsetResetStrategy.EARLIEST:
247-
strategy = 'earliest'
248-
elif timestamp is OffsetResetStrategy.LATEST:
249-
strategy = 'latest'
250-
else:
251-
raise NoOffsetForPartitionError(partition)
250+
offset_resets = dict()
251+
for tp in partitions:
252+
ts = self._subscriptions.assignment[tp].reset_strategy
253+
if not ts:
254+
raise NoOffsetForPartitionError(tp)
255+
offset_resets[tp] = ts
252256

253-
log.debug("Resetting offset for partition %s to offset %s.",
254-
partition, strategy)
255-
offsets = self._retrieve_offsets({partition: timestamp}, timeout_ms=timeout_ms)
257+
offsets = self._retrieve_offsets(offset_resets, timeout_ms=timeout_ms)
256258

257-
if partition in offsets:
258-
offset = offsets[partition].offset
259+
for partition in partitions:
260+
if partition not in offsets:
261+
raise NoOffsetForPartitionError(partition)
259262

260263
# we might lose the assignment while fetching the offset,
261264
# so check it is still active
262265
if self._subscriptions.is_assigned(partition):
266+
offset = offsets[partition].offset
267+
log.debug("Resetting offset for partition %s to offset %s.",
268+
partition, offset)
263269
self._subscriptions.seek(partition, offset)
264-
else:
265-
log.debug("Could not find offset for partition %s since it is probably deleted" % (partition,))
266270

267271
def _retrieve_offsets(self, timestamps, timeout_ms=None):
268272
"""Fetch offset for each partition passed in ``timestamps`` map.

kafka/consumer/group.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,8 @@ def position(self, partition, timeout_ms=None):
760760
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
761761
position = self._subscription.assignment[partition].position
762762
if position is None:
763-
self._update_fetch_positions([partition], timeout_ms=timeout_ms)
763+
# batch update fetch positions for any partitions without a valid position
764+
self._update_fetch_positions(self._subscription.assigned_partitions(), timeout_ms=timeout_ms)
764765
position = self._subscription.assignment[partition].position
765766
return position.offset if position else None
766767

kafka/consumer/subscription_state.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ def __init__(self):
402402
self.has_valid_position = False # whether we have valid position
403403
self.paused = False # whether this partition has been paused by the user
404404
self.awaiting_reset = False # whether we are awaiting reset
405-
self.reset_strategy = None # the reset strategy if awaitingReset is set
405+
self.reset_strategy = None # the reset strategy if awaiting_reset is set
406406
self._position = None # OffsetAndMetadata exposed to the user
407407
self.highwater = None
408408
self.drop_pending_record_batch = False

test/test_fetcher.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -109,46 +109,46 @@ def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version):
109109

110110

111111
def test_update_fetch_positions(fetcher, topic, mocker):
112-
mocker.patch.object(fetcher, '_reset_offset')
112+
mocker.patch.object(fetcher, '_reset_offsets')
113113
partition = TopicPartition(topic, 0)
114114

115115
# unassigned partition
116116
fetcher.update_fetch_positions([TopicPartition('fizzbuzz', 0)])
117-
assert fetcher._reset_offset.call_count == 0
117+
assert fetcher._reset_offsets.call_count == 0
118118

119119
# fetchable partition (has offset, not paused)
120120
fetcher.update_fetch_positions([partition])
121-
assert fetcher._reset_offset.call_count == 0
121+
assert fetcher._reset_offsets.call_count == 0
122122

123123
# partition needs reset, no committed offset
124124
fetcher._subscriptions.need_offset_reset(partition)
125125
fetcher._subscriptions.assignment[partition].awaiting_reset = False
126126
fetcher.update_fetch_positions([partition])
127-
fetcher._reset_offset.assert_called_with(partition, timeout_ms=None)
127+
fetcher._reset_offsets.assert_called_with(set([partition]), timeout_ms=None)
128128
assert fetcher._subscriptions.assignment[partition].awaiting_reset is True
129129
fetcher.update_fetch_positions([partition])
130-
fetcher._reset_offset.assert_called_with(partition, timeout_ms=None)
130+
fetcher._reset_offsets.assert_called_with(set([partition]), timeout_ms=None)
131131

132132
# partition needs reset, has committed offset
133-
fetcher._reset_offset.reset_mock()
133+
fetcher._reset_offsets.reset_mock()
134134
fetcher._subscriptions.need_offset_reset(partition)
135135
fetcher._subscriptions.assignment[partition].awaiting_reset = False
136136
fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, '', -1)
137137
mocker.patch.object(fetcher._subscriptions, 'seek')
138138
fetcher.update_fetch_positions([partition])
139-
assert fetcher._reset_offset.call_count == 0
139+
assert fetcher._reset_offsets.call_count == 0
140140
fetcher._subscriptions.seek.assert_called_with(partition, 123)
141141

142142

143-
def test__reset_offset(fetcher, mocker):
143+
def test__reset_offsets(fetcher, mocker):
144144
tp = TopicPartition("topic", 0)
145145
fetcher._subscriptions.subscribe(topics=["topic"])
146146
fetcher._subscriptions.assign_from_subscribed([tp])
147147
fetcher._subscriptions.need_offset_reset(tp)
148148
mocked = mocker.patch.object(fetcher, '_retrieve_offsets')
149149

150150
mocked.return_value = {tp: OffsetAndTimestamp(1001, None, -1)}
151-
fetcher._reset_offset(tp)
151+
fetcher._reset_offsets([tp])
152152
assert not fetcher._subscriptions.assignment[tp].awaiting_reset
153153
assert fetcher._subscriptions.assignment[tp].position.offset == 1001
154154

0 commit comments

Comments
 (0)