diff --git a/src/storage-client/src/source/persist_source.rs b/src/storage-client/src/source/persist_source.rs index 87a06cf2ebf7..35cc020b0ec9 100644 --- a/src/storage-client/src/source/persist_source.rs +++ b/src/storage-client/src/source/persist_source.rs @@ -355,19 +355,11 @@ where let mut buffer = Vec::new(); while let Some(event) = fetcher_input.next().await { - // Re-acquire the output handle on each invocation and drop it - // when we're done. - let mut output_handle = update_output.activate(); - let mut consumed_part_output_handle = consumed_part_output.activate(); - if let Event::Data(cap, data) = event { // `LeasedBatchPart`es cannot be dropped at this point w/o // panicking, so swap them to an owned version. data.swap(&mut buffer); - let mut update_session = output_handle.session(&cap); - let mut consumed_part_session = consumed_part_output_handle.session(&cap); - for (_idx, part) in buffer.drain(..) { let (consumed_part, fetched_part) = fetcher .fetch_leased_part(fetcher.leased_part_from_exchangeable(part)) @@ -433,7 +425,19 @@ where // we don't get to consolidate everything, but that's part of the // tradeoff in tuning yield_fn. differential_dataflow::consolidation::consolidate_updates(&mut updates); - update_session.give_vec(&mut updates); + + { + // Do very fine-grained output activation/session + // creation to ensure that we don't hold activated + // outputs or sessions across await points, which + // would prevent messages from being flushed from + // the shared timely output buffer. + let mut output_handle = update_output.activate(); + let mut update_session = output_handle.session(&cap); + + update_session.give_vec(&mut updates); + } + // Force a yield to give back the timely thread, reactivating on our // way out. tokio::task::yield_now().await; @@ -441,7 +445,23 @@ where } } differential_dataflow::consolidation::consolidate_updates(&mut updates); + + // Do very fine-grained output activation/session creation + // to ensure that we don't hold activated outputs or + // sessions across await points, which would prevent + // messages from being flushed from the shared timely output + // buffer. + let mut output_handle = update_output.activate(); + let mut update_session = output_handle.session(&cap); update_session.give_vec(&mut updates); + + // Do very fine-grained output activation/session creation + // to ensure that we don't hold activated outputs or + // sessions across await points, which would prevent + // messages from being flushed from the shared timely output + // buffer. + let mut consumed_part_output_handle = consumed_part_output.activate(); + let mut consumed_part_session = consumed_part_output_handle.session(&cap); consumed_part_session.give(consumed_part.into_exchangeable_part()); } }