Skip to content

Commit

Permalink
Merge branch 'master' into fix-twisted-doc-link
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram committed Jul 19, 2022
2 parents 7cb9936 + c4f5b18 commit 5ad9fca
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
8 changes: 8 additions & 0 deletions faust/tables/recovery.py
Expand Up @@ -470,6 +470,14 @@ async def _restart_recovery(self) -> None:
T(self.app.flow_control.resume)()
T(consumer.resume_flow)()
self._set_recovery_ended()

# The changelog partitions only in the active_tps set need to be resumed
active_only_partitions = active_tps - standby_tps
if active_only_partitions:
T(consumer.resume_partitions)(active_only_partitions)
T(self.app.flow_control.resume)()
T(consumer.resume_flow)()

self.log.info("Recovery complete")
if span:
span.set_tag("Recovery-Completed", True)
Expand Down
2 changes: 1 addition & 1 deletion tests/bench/base.py
Expand Up @@ -94,7 +94,7 @@ async def produce(self, max_latency: float, max_messages: int, **kwargs):
time_start = monotonic()
time_1st = monotonic()

def on_published(meta):
def on_published(meta, time_1st=time_1st):
print(f"1ST OK: {meta} AFTER {monotonic() - time_1st}s")

callback = on_published
Expand Down

0 comments on commit 5ad9fca

Please sign in to comment.