Skip to content

Commit

Permalink
Fix error messages in faust app #166
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram committed Jul 6, 2021
1 parent bcbec27 commit b06e579
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions faust/transport/drivers/aiokafka.py
Expand Up @@ -722,8 +722,10 @@ def verify_event_path(self, now: float, tp: TP) -> None:
secs_since_started = now - self.time_started

if monitor is not None: # need for .stream_inbound_time
highwater = self.highwater(tp)
committed_offset = parent._committed_offset.get(tp)
aiotp = TopicPartition(tp.topic, tp.partition)
tp_state = self._ensure_consumer()._fetcher._subscriptions.subscription.assignment.state_value(aiotp)
highwater = tp_state.highwater
committed_offset = tp_state.position
has_acks = acks_enabled_for(tp.topic)
if highwater is None:
if secs_since_started >= self.tp_stream_timeout_secs:
Expand Down

0 comments on commit b06e579

Please sign in to comment.