Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,12 @@ def _message_generator(self):
elif fetch_offset == position:
log.log(0, "Returning fetched records at offset %d for assigned"
" partition %s", position, tp)

# We can ignore any prior signal to drop pending message sets
# because we are starting from a fresh one where fetch_offset == position
# i.e., the user seek()'d to this position
self._subscriptions.assignment[tp].drop_pending_message_set = False

for msg in self._unpack_message_set(tp, messages):

# Because we are in a generator, it is possible for
Expand All @@ -436,9 +442,17 @@ def _message_generator(self):
" since it is no longer fetchable", tp)
break

# If there is a seek during message iteration,
# we should stop unpacking this message set and
# wait for a new fetch response that aligns with the
# new seek position
elif self._subscriptions.assignment[tp].drop_pending_message_set:
log.debug("Skipping remainder of message set for partition %s", tp)
self._subscriptions.assignment[tp].drop_pending_message_set = False
break

# Compressed messagesets may include earlier messages
# It is also possible that the user called seek()
elif msg.offset != self._subscriptions.assignment[tp].position:
elif msg.offset < self._subscriptions.assignment[tp].position:
log.debug("Skipping message offset: %s (expecting %s)",
msg.offset,
self._subscriptions.assignment[tp].position)
Expand Down
2 changes: 2 additions & 0 deletions kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ def __init__(self):
self.reset_strategy = None # the reset strategy if awaitingReset is set
self._position = None # offset exposed to the user
self.highwater = None
self.drop_pending_message_set = False

def _set_position(self, offset):
assert self.has_valid_position, 'Valid position required'
Expand All @@ -371,6 +372,7 @@ def seek(self, offset):
self.awaiting_reset = False
self.reset_strategy = None
self.has_valid_position = True
self.drop_pending_message_set = True

def pause(self):
self.paused = True
Expand Down