Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discard message after rebalance #223

Merged

Conversation

ekerstens
Copy link
Contributor

Description

Fixes #221

This PR introduces two changes to avoid processing of events fetched before a rebalance:

  • cancel the Conductor callback when consumer.stop_flow is called
  • Add the generation_id to Message to support skipping events with a generation_id not matching the current generation_id.

@@ -158,6 +160,9 @@ cdef class StreamIterator:
offset = message.offset
consumer = self.consumer

if message.generation_id != self.app.consumer_generation_id:
return None, self._skipped_value, stream_state
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log this?

@@ -974,6 +974,9 @@ async def _py_aiter(self) -> AsyncIterator[T_co]:
tp = message.tp
offset = message.offset

if message.generation_id != self.app.consumer_generation_id:
value = skipped_value
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log this? It will be easier to debug if this were to happen

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea

@codecov-commenter
Copy link

codecov-commenter commented Nov 19, 2021

Codecov Report

Merging #223 (46cc9c7) into master (5077210) will decrease coverage by 0.03%.
The diff coverage is 55.55%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #223      +/-   ##
==========================================
- Coverage   94.40%   94.37%   -0.04%     
==========================================
  Files         100      100              
  Lines       10740    10749       +9     
  Branches     1214     1215       +1     
==========================================
+ Hits        10139    10144       +5     
- Misses        536      539       +3     
- Partials       65       66       +1     
Impacted Files Coverage Δ
faust/agents/agent.py 98.06% <ø> (ø)
faust/channels.py 100.00% <ø> (ø)
faust/streams.py 97.72% <0.00%> (-0.82%) ⬇️
faust/transport/drivers/aiokafka.py 51.96% <ø> (ø)
faust/transport/consumer.py 95.79% <100.00%> (+0.03%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 5077210...46cc9c7. Read the comment docs.

@patkivikram patkivikram merged commit a28fe3f into faust-streaming:master Nov 29, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Prefetched events cause out of order issue after rebalance.
3 participants