Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Recovery._resume_streams #217

Merged
merged 1 commit into from Nov 17, 2021

Conversation

ekerstens
Copy link
Contributor

Description

Fixes #216

During a rebalance, faust implements multiple branches for recovery to resume processing such as in Recovery.on_recovery_completed and Recovery._restart_recovery which roughly follow the pattern of:

  1. self.log.info("Seek stream partitions to committed offsets.")
  2. consumer.perform_seek()
  3. self.log.dev("Resume stream partitions")
  4. consumer.resume_partitions(
  5. consumer.resume_flow()
  6. self.app.flow_control.resume()

However, when the application doesn't use any tables, _restart_recovery calls _resume_streams which follows the pattern:

  1. consumer.resume_flow()
  2. app.flow_control.resume()
  3. self.log.info("Seek stream partitions to committed offsets.")
  4. consumer.perform_seek()
  5. self.log.dev("Resume stream partitions")
  6. consumer.resume_partitions(

In this case, app.flow_control.resume() is called before consumer.perform_seek(). This can cause some race conditions where the consumer starts fetching messages from where it left off before queues were cleared by app.flow_control.suspend(). Since suspend clears queues, if the consumer starts fetching again before seeking back to the last committed offset, a gap appears in the processing.

This change brings the order of calls in _resume_streams in line with elsewhere in the class.

@ekerstens ekerstens marked this pull request as draft November 12, 2021 01:05
@patkivikram
Copy link
Collaborator

is this ready for review?

@codecov-commenter
Copy link

codecov-commenter commented Nov 15, 2021

Codecov Report

Merging #217 (8a95d4c) into master (60a8696) will not change coverage.
The diff coverage is n/a.

Impacted file tree graph

@@           Coverage Diff           @@
##           master     #217   +/-   ##
=======================================
  Coverage   94.40%   94.40%           
=======================================
  Files         100      100           
  Lines       10740    10740           
  Branches     1214     1214           
=======================================
  Hits        10139    10139           
  Misses        536      536           
  Partials       65       65           

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 60a8696...8a95d4c. Read the comment docs.

@ekerstens
Copy link
Contributor Author

I am working on some validation on an application to make sure the entire issue is solved before marking the PR as ready for review.

@ekerstens ekerstens marked this pull request as ready for review November 17, 2021 01:02
@ekerstens
Copy link
Contributor Author

@patkivikram I've found a separate issue where an event can get stuck in the conductor during a rebalance but will open a separate issue/PR for that, this is ready for review.

@patkivikram
Copy link
Collaborator

patkivikram commented Nov 17, 2021

@patkivikram patkivikram merged commit e3bd128 into faust-streaming:master Nov 17, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Recovery without tables resumes from kafka before seeking to committed offsets
3 participants