Skip to content

Commit

Permalink
fix for consumer errors in app #166
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram committed Jun 29, 2021
1 parent 6f8f0b7 commit 75a1231
Showing 1 changed file with 3 additions and 4 deletions.
7 changes: 3 additions & 4 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,13 +848,12 @@ async def _commit_livelock_detector(self) -> None: # pragma: no cover
await self.sleep(interval)
async for sleep_time in self.itertimer(interval, name="livelock"):
if not self.app.rebalancing:
await self.app.loop.run_in_executor(
None, self.verify_all_partitions_active
)
await self.verify_all_partitions_active()

def verify_all_partitions_active(self) -> None:
async def verify_all_partitions_active(self) -> None:
now = monotonic()
for tp in self.assignment():
await self.sleep(0)
if not self.should_stop:
self.verify_event_path(now, tp)

Expand Down

0 comments on commit 75a1231

Please sign in to comment.