Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist: in persist_source, do very fine-grained output activation #16390

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 29 additions & 9 deletions src/storage-client/src/source/persist_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,19 +355,11 @@ where
let mut buffer = Vec::new();

while let Some(event) = fetcher_input.next().await {
// Re-acquire the output handle on each invocation and drop it
// when we're done.
let mut output_handle = update_output.activate();
let mut consumed_part_output_handle = consumed_part_output.activate();

if let Event::Data(cap, data) = event {
// `LeasedBatchPart`es cannot be dropped at this point w/o
// panicking, so swap them to an owned version.
data.swap(&mut buffer);

let mut update_session = output_handle.session(&cap);
let mut consumed_part_session = consumed_part_output_handle.session(&cap);

for (_idx, part) in buffer.drain(..) {
let (consumed_part, fetched_part) = fetcher
.fetch_leased_part(fetcher.leased_part_from_exchangeable(part))
Expand Down Expand Up @@ -433,15 +425,43 @@ where
// we don't get to consolidate everything, but that's part of the
// tradeoff in tuning yield_fn.
differential_dataflow::consolidation::consolidate_updates(&mut updates);
update_session.give_vec(&mut updates);

{
// Do very fine-grained output activation/session
// creation to ensure that we don't hold activated
// outputs or sessions across await points, which
// would prevent messages from being flushed from
// the shared timely output buffer.
let mut output_handle = update_output.activate();
let mut update_session = output_handle.session(&cap);

update_session.give_vec(&mut updates);
}

// Force a yield to give back the timely thread, reactivating on our
// way out.
tokio::task::yield_now().await;
decode_start = Instant::now();
}
}
differential_dataflow::consolidation::consolidate_updates(&mut updates);

// Do very fine-grained output activation/session creation
// to ensure that we don't hold activated outputs or
// sessions across await points, which would prevent
// messages from being flushed from the shared timely output
// buffer.
let mut output_handle = update_output.activate();
let mut update_session = output_handle.session(&cap);
update_session.give_vec(&mut updates);

// Do very fine-grained output activation/session creation
// to ensure that we don't hold activated outputs or
// sessions across await points, which would prevent
// messages from being flushed from the shared timely output
// buffer.
let mut consumed_part_output_handle = consumed_part_output.activate();
let mut consumed_part_session = consumed_part_output_handle.session(&cap);
consumed_part_session.give(consumed_part.into_exchangeable_part());
}
}
Expand Down