Skip to content

Commit

Permalink
compute: use drop_dataflow
Browse files Browse the repository at this point in the history
  • Loading branch information
teskje committed Mar 30, 2023
1 parent 24c5793 commit a3cbc07
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 100 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,6 @@ postgres-protocol = { git = "https://github.com/MaterializeInc/rust-postgres" }
tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres" }
serde-value = { git = "https://github.com/MaterializeInc/serde-value.git" }
vte = { git = "https://github.com/alacritty/vte" }

[patch."https://github.com/TimelyDataflow/timely-dataflow"]
timely = { git = "https://github.com/antiguru/timely-dataflow", branch = "reach_drop" }
3 changes: 0 additions & 3 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ disallowed-methods = [
{ path = "differential_dataflow::operators::reduce::join::Join::antijoin", reason = "use the `differential_dataflow::operators::join::Join::join_core` function instead" },
{ path = "differential_dataflow::operators::reduce::join::Join::join_map", reason = "use the `differential_dataflow::operators::join::Join::join_core` function instead" },
{ path = "differential_dataflow::operators::reduce::join::Join::semijoin", reason = "use the `differential_dataflow::operators::join::Join::join_core` function instead" },

# Prevent access to Timely APIs with untested semantics.
{ path = "timely::worker::Worker::drop_dataflow", reason = "Might break logging dataflows, check with #team-compute." },
]

disallowed-macros = [
Expand Down
142 changes: 100 additions & 42 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,13 @@ pub struct ComputeState {
/// Tracks the frontier information that has been sent over `response_tx`.
pub reported_frontiers: BTreeMap<GlobalId, ReportedFrontier>,
/// Collections that were recently dropped and whose removal needs to be reported.
pub dropped_collections: Vec<GlobalId>,
dropped_collections: Vec<GlobalId>,
/// Maps exported collections to dataflow indexes.
///
/// Used for dropping dataflows when collections are dropped.
/// Dataflow indexes are wrapped in `Rc`s and can be shared between collection keys, to reflect
/// the possibility that a single dataflow can have multiple exports.
dataflow_indexes: BTreeMap<GlobalId, Rc<usize>>,
/// The logger, from Timely's logging framework, if logs are enabled.
pub compute_logger: Option<logging::compute::Logger>,
/// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
Expand All @@ -87,14 +93,39 @@ pub struct ComputeState {
/// History of commands received by this workers and all its peers.
pub command_history: ComputeCommandHistory,
/// Max size in bytes of any result.
pub max_result_size: u32,
max_result_size: u32,
/// Maximum number of in-flight bytes emitted by persist_sources feeding dataflows.
pub dataflow_max_inflight_bytes: usize,
/// Metrics for this replica.
pub metrics: ComputeMetrics,
}

impl ComputeState {
/// Construct a new `ComputeState`.
pub fn new(
traces: TraceManager,
persist_clients: Arc<PersistClientCache>,
metrics: ComputeMetrics,
) -> Self {
Self {
traces,
sink_tokens: Default::default(),
subscribe_response_buffer: Default::default(),
sink_write_frontiers: Default::default(),
flow_control_probes: Default::default(),
pending_peeks: Default::default(),
reported_frontiers: Default::default(),
dropped_collections: Default::default(),
dataflow_indexes: Default::default(),
compute_logger: None,
persist_clients,
command_history: Default::default(),
max_result_size: u32::MAX,
dataflow_max_inflight_bytes: usize::MAX,
metrics,
}
}

/// Return whether a collection with the given ID exists.
pub fn collection_exists(&self, id: GlobalId) -> bool {
self.traces.get(&id).is_some() || self.sink_tokens.contains_key(&id)
Expand All @@ -115,8 +146,6 @@ pub struct ActiveComputeState<'a, A: Allocate> {
pub struct SinkToken {
/// The underlying token.
pub token: Box<dyn Any>,
/// Whether the sink token is keeping a subscribe alive.
pub is_subscribe: bool,
}

impl<'a, A: Allocate> ActiveComputeState<'a, A> {
Expand Down Expand Up @@ -192,9 +221,9 @@ impl<'a, A: Allocate> ActiveComputeState<'a, A> {
.map(|(idx_id, (idx, _))| (*idx_id, idx.on_id));
let exported_ids = index_ids.chain(sink_ids);

let dataflow_index = self.timely_worker.next_dataflow_index();
let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());

// Initialize frontiers for each object, and optionally log their construction.
// Initialize compute state for each exported object, and optionally log its construction.
for (object_id, collection_id) in exported_ids {
let reported_frontier = ReportedFrontier::NotReported {
lower: dataflow.as_of.clone().unwrap(),
Expand All @@ -205,15 +234,29 @@ impl<'a, A: Allocate> ActiveComputeState<'a, A> {
.insert(object_id, reported_frontier)
{
error!(
"existing frontier {frontier:?} for newly created dataflow id {object_id}"
?object_id,
?frontier,
"existing reported frontier for newly created collection"
);
}

if let Some(index) = self
.compute_state
.dataflow_indexes
.insert(object_id, Rc::clone(&dataflow_index))
{
error!(
?object_id,
?index,
"existing dataflow index for newly created collection"
);
}

// Log dataflow construction, frontier construction, and any dependencies.
if let Some(logger) = self.compute_state.compute_logger.as_mut() {
logger.log(ComputeEvent::Export {
id: object_id,
dataflow_index,
dataflow_index: *dataflow_index,
});
logger.log(ComputeEvent::Frontier {
id: object_id,
Expand All @@ -237,40 +280,7 @@ impl<'a, A: Allocate> ActiveComputeState<'a, A> {
for (id, frontier) in list {
if frontier.is_empty() {
// Indicates that we may drop `id`, as there are no more valid times to read.

let is_subscribe = self.compute_state.sink_tokens.contains_key(&id)
&& !self.compute_state.sink_write_frontiers.contains_key(&id);

// Sink-specific work:
self.compute_state.sink_write_frontiers.remove(&id);
self.compute_state.sink_tokens.remove(&id);
// Index-specific work:
self.compute_state.traces.del_trace(&id);
self.compute_state.flow_control_probes.remove(&id);

// Work common to sinks and indexes (removing frontier tracking and cleaning up logging).
let prev_frontier = self
.compute_state
.reported_frontiers
.remove(&id)
.expect("Dropped compute collection with no frontier");
if let Some(logger) = self.compute_state.compute_logger.as_mut() {
logger.log(ComputeEvent::ExportDropped { id });
if let Some(time) = prev_frontier.logging_time() {
logger.log(ComputeEvent::Frontier { id, time, diff: -1 });
}
}

// We need to emit a final response reporting the dropping of this collection,
// unless:
// * The collection is a subscribe, in which case we will emit a
// `SubscribeResponse::Dropped` independently.
// * The collection has already advanced to the empty frontier, in which case
// the final `FrontierUppers` response already serves the purpose of reporting
// the end of the dataflow.
if !is_subscribe && !prev_frontier.is_empty() {
self.compute_state.dropped_collections.push(id);
}
self.drop_collection(id);
} else {
self.compute_state
.traces
Expand Down Expand Up @@ -331,6 +341,54 @@ impl<'a, A: Allocate> ActiveComputeState<'a, A> {
}
}

fn drop_collection(&mut self, id: GlobalId) {
let is_subscribe = self.compute_state.sink_tokens.contains_key(&id)
&& !self.compute_state.sink_write_frontiers.contains_key(&id);

// Sink-specific work:
self.compute_state.sink_write_frontiers.remove(&id);
self.compute_state.sink_tokens.remove(&id);
// Index-specific work:
self.compute_state.traces.del_trace(&id);
self.compute_state.flow_control_probes.remove(&id);

// Work common to sinks and indexes:

// Drop the dataflow, if all its exports have been dropped.
let dataflow_index = self
.compute_state
.dataflow_indexes
.remove(&id)
.expect("Dropped compute collection with no dataflow");
if let Ok(index) = Rc::try_unwrap(dataflow_index) {
self.timely_worker.drop_dataflow(index);
}

// Remove removing frontier tracking and logging.
let prev_frontier = self
.compute_state
.reported_frontiers
.remove(&id)
.expect("Dropped compute collection with no frontier");
if let Some(logger) = self.compute_state.compute_logger.as_mut() {
logger.log(ComputeEvent::ExportDropped { id });
if let Some(time) = prev_frontier.logging_time() {
logger.log(ComputeEvent::Frontier { id, time, diff: -1 });
}
}

// We need to emit a final response reporting the dropping of this collection,
// unless:
// * The collection is a subscribe, in which case we will emit a
// `SubscribeResponse::Dropped` independently.
// * The collection has already advanced to the empty frontier, in which case
// the final `FrontierUppers` response already serves the purpose of reporting
// the end of the dataflow.
if !is_subscribe && !prev_frontier.is_empty() {
self.compute_state.dropped_collections.push(id);
}
}

/// Initializes timely dataflow logging and publishes as a view.
pub fn initialize_logging(&mut self, logging: &LoggingConfig) {
if self.compute_state.compute_logger.is_some() {
Expand Down
73 changes: 48 additions & 25 deletions src/compute/src/logging/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,32 +609,33 @@ where
import_id: GlobalId,
export_ids: Vec<GlobalId>,
) -> Self {
let mut previous_time = None;
// Prepare retraction events as `LogOnDrop`s, to ensure they are logged even when the
// dataflow is dropped before the input advances to the empty frontier.
let mut retractions = Vec::<LogOnDrop>::with_capacity(export_ids.len());

self.inspect_container(move |event| {
if let Err(frontier) = event {
if let Some(previous) = previous_time {
for &export_id in export_ids.iter() {
logger.log(ComputeEvent::ImportFrontier {
import_id,
export_id,
time: previous,
diff: -1,
});
}
}
if let Some(time) = frontier.get(0) {
for &export_id in export_ids.iter() {
logger.log(ComputeEvent::ImportFrontier {
import_id,
export_id,
time: *time,
diff: 1,
});
}
previous_time = Some(*time);
} else {
previous_time = None;
}
let Err(frontier) = event else { return };

// Log frontier retractions by dropping them.
retractions.drain(..).for_each(drop);

let Some(time) = frontier.get(0) else { return };
for &export_id in export_ids.iter() {
logger.log(ComputeEvent::ImportFrontier {
import_id,
export_id,
time: *time,
diff: 1,
});
retractions.push(LogOnDrop::new(
logger.clone(),
ComputeEvent::ImportFrontier {
import_id,
export_id,
time: *time,
diff: -1,
},
));
}
})
}
Expand All @@ -657,3 +658,25 @@ where
self
}
}

struct LogOnDrop {
logger: Logger,
event: Option<ComputeEvent>,
}

impl LogOnDrop {
fn new(logger: Logger, event: ComputeEvent) -> Self {
Self {
logger,
event: Some(event),
}
}
}

impl Drop for LogOnDrop {
fn drop(&mut self) {
if let Some(event) = self.event.take() {
self.logger.log(event);
}
}
}
4 changes: 0 additions & 4 deletions src/compute/src/render/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ where
sink_id,
SinkToken {
token: Box::new(needed_tokens),
is_subscribe: matches!(
sink.connection,
ComputeSinkConnection::Subscribe(_)
),
},
);
});
Expand Down
28 changes: 7 additions & 21 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,27 +387,13 @@ impl<'w, A: Allocate> Worker<'w, A> {
fn handle_command(&mut self, response_tx: &mut ResponseSender, cmd: ComputeCommand) {
match &cmd {
ComputeCommand::CreateInstance(_) => {
self.compute_state = Some(ComputeState {
traces: TraceManager::new(
self.trace_metrics.clone(),
self.timely_worker.index(),
),
sink_tokens: BTreeMap::new(),
subscribe_response_buffer: std::rc::Rc::new(
std::cell::RefCell::new(Vec::new()),
),
sink_write_frontiers: BTreeMap::new(),
flow_control_probes: BTreeMap::new(),
pending_peeks: BTreeMap::new(),
reported_frontiers: BTreeMap::new(),
dropped_collections: Vec::new(),
compute_logger: None,
persist_clients: Arc::clone(&self.persist_clients),
command_history: ComputeCommandHistory::default(),
max_result_size: u32::MAX,
dataflow_max_inflight_bytes: usize::MAX,
metrics: self.compute_metrics.clone(),
});
let traces =
TraceManager::new(self.trace_metrics.clone(), self.timely_worker.index());
self.compute_state = Some(ComputeState::new(
traces,
Arc::clone(&self.persist_clients),
self.compute_metrics.clone(),
));
}
_ => (),
}
Expand Down

0 comments on commit a3cbc07

Please sign in to comment.