Skip to content

Commit

Permalink
Fix recovery issue in transaction and reprocessing message in consumer (
Browse files Browse the repository at this point in the history
#49)

* Fixing issues #47 and #48

* fix linting
  • Loading branch information
patkivikram committed Nov 30, 2020
1 parent 4906f30 commit be1e6db
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
29 changes: 26 additions & 3 deletions faust/tables/recovery.py
Expand Up @@ -712,7 +712,12 @@ async def _slurp_changelogs(self) -> None:
buffer_sizes = self.buffer_sizes
processing_times = self._processing_times

def _maybe_signal_recovery_end() -> None:
async def _maybe_signal_recovery_end(timeout=False, timeout_count=0) -> None:
# lets wait at least 2 consecutive cycles for the queue to be
# empty to avoid race conditions between
# the aiokafka consumer position and draining of the queue
if timeout and self.app.in_transaction and timeout_count > 1:
await detect_aborted_tx()
if not self.active_remaining_total():
# apply anything stuck in the buffers
self.flush_buffers()
Expand All @@ -722,18 +727,36 @@ def _maybe_signal_recovery_end() -> None:
logger.debug("Setting recovery end")
self.signal_recovery_end.set()

async def detect_aborted_tx():
highwaters = self.active_highwaters
offsets = self.active_offsets
for tp, highwater in highwaters.items():
if (
highwater is not None
and offsets[tp] is not None
and offsets[tp] < highwater
):
if await self.app.consumer.position(tp) >= highwater:
logger.info(f"Aborted tx until highwater for {tp}")
offsets[tp] = highwater

timeout_count = 0
while not self.should_stop:
self.signal_recovery_end.clear()
try:
event: EventT = await asyncio.wait_for(
changelog_queue.get(), timeout=5.0
)
except asyncio.TimeoutError:
timeout_count += 1
if self.should_stop:
return
_maybe_signal_recovery_end()
await _maybe_signal_recovery_end(
timeout=True, timeout_count=timeout_count
)
continue
now = monotonic()
timeout_count = 0
message = event.message
tp = message.tp
offset = message.offset
Expand Down Expand Up @@ -781,7 +804,7 @@ def _maybe_signal_recovery_end() -> None:
processing_times.popleft()
self._last_active_event_processed_at = now_after

_maybe_signal_recovery_end()
await _maybe_signal_recovery_end()

if not self.standby_remaining_total():
logger.debug("Completed standby partition fetch")
Expand Down
17 changes: 11 additions & 6 deletions faust/transport/consumer.py
Expand Up @@ -71,6 +71,7 @@
)
from weakref import WeakSet

from aiokafka.errors import ProducerFenced
from mode import Service, ServiceT, flight_recorder, get_logger
from mode.threads import MethodQueue, QueueServiceThread
from mode.utils.futures import notify
Expand Down Expand Up @@ -334,11 +335,15 @@ async def commit(
by_transactional_id[transactional_id][tp] = offset

if by_transactional_id:
await producer.commit_transactions(
by_transactional_id,
group_id,
start_new_transaction=start_new_transaction,
)
try:
await producer.commit_transactions(
by_transactional_id,
group_id,
start_new_transaction=start_new_transaction,
)
except ProducerFenced as pf:
logger.warning(f"ProducerFenced {pf}")
await self.app.crash(pf)
return True

def key_partition(self, topic: str, key: bytes) -> TP:
Expand Down Expand Up @@ -1031,7 +1036,7 @@ def _new_offset(self, tp: TP) -> Optional[int]:
acked[: len(batch)] = []
self._acked_index[tp].difference_update(batch)
# return the highest commit offset
return batch[-1]
return batch[-1] + 1
return None

async def on_task_error(self, exc: BaseException) -> None:
Expand Down

0 comments on commit be1e6db

Please sign in to comment.