Skip to content

Commit

Permalink
Add a feature flag for active dataflow cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
teskje committed Mar 30, 2023
1 parent a3cbc07 commit 9950736
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6032,9 +6032,13 @@ impl Catalog {
/// Return the current compute configuration, derived from the system configuration.
pub fn compute_config(&self) -> ComputeParameters {
let config = self.system_config();
let enable_active_dataflow_cancellation =
self.unsafe_mode() || config.enable_active_dataflow_cancellation();

ComputeParameters {
max_result_size: Some(config.max_result_size()),
dataflow_max_inflight_bytes: Some(config.dataflow_max_inflight_bytes()),
enable_active_dataflow_cancellation: Some(enable_active_dataflow_cancellation),
persist: self.persist_config(),
}
}
Expand Down
1 change: 1 addition & 0 deletions src/compute-client/src/protocol/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ message ProtoComputeParameters {
optional uint32 max_result_size = 1;
mz_persist_client.cfg.ProtoPersistParameters persist = 2;
optional uint64 dataflow_max_inflight_bytes = 3;
optional bool enable_active_dataflow_cancellation = 4;
}
12 changes: 12 additions & 0 deletions src/compute-client/src/protocol/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ pub struct ComputeParameters {
pub max_result_size: Option<u32>,
/// The maximum number of in-flight bytes emitted by persist_sources feeding dataflows.
pub dataflow_max_inflight_bytes: Option<usize>,
/// Whether the active dataflow cancellation feature should be used.
///
/// TODO(teskje): Remove once this feature is rolled out everywhere.
pub enable_active_dataflow_cancellation: Option<bool>,
/// Persist client configuration.
pub persist: PersistParameters,
}
Expand All @@ -375,6 +379,7 @@ impl ComputeParameters {
let ComputeParameters {
max_result_size,
dataflow_max_inflight_bytes,
enable_active_dataflow_cancellation,
persist,
} = other;

Expand All @@ -384,6 +389,9 @@ impl ComputeParameters {
if dataflow_max_inflight_bytes.is_some() {
self.dataflow_max_inflight_bytes = dataflow_max_inflight_bytes;
}
if enable_active_dataflow_cancellation.is_some() {
self.enable_active_dataflow_cancellation = enable_active_dataflow_cancellation;
}
self.persist.update(persist);
}

Expand All @@ -398,6 +406,7 @@ impl RustType<ProtoComputeParameters> for ComputeParameters {
ProtoComputeParameters {
max_result_size: self.max_result_size.into_proto(),
dataflow_max_inflight_bytes: self.dataflow_max_inflight_bytes.into_proto(),
enable_active_dataflow_cancellation: self.enable_active_dataflow_cancellation,
persist: Some(self.persist.into_proto()),
}
}
Expand All @@ -406,6 +415,9 @@ impl RustType<ProtoComputeParameters> for ComputeParameters {
Ok(Self {
max_result_size: proto.max_result_size.into_rust()?,
dataflow_max_inflight_bytes: proto.dataflow_max_inflight_bytes.into_rust()?,
enable_active_dataflow_cancellation: proto
.enable_active_dataflow_cancellation
.into_rust()?,
persist: proto
.persist
.into_rust_if_some("ProtoComputeParameters::persist")?,
Expand Down
13 changes: 11 additions & 2 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ pub struct ComputeState {
max_result_size: u32,
/// Maximum number of in-flight bytes emitted by persist_sources feeding dataflows.
pub dataflow_max_inflight_bytes: usize,
/// Whether to enable the active dataflow cancellation feature.
enable_active_dataflow_cancellation: bool,
/// Metrics for this replica.
pub metrics: ComputeMetrics,
}
Expand All @@ -122,6 +124,7 @@ impl ComputeState {
command_history: Default::default(),
max_result_size: u32::MAX,
dataflow_max_inflight_bytes: usize::MAX,
enable_active_dataflow_cancellation: false,
metrics,
}
}
Expand Down Expand Up @@ -191,6 +194,7 @@ impl<'a, A: Allocate> ActiveComputeState<'a, A> {
let ComputeParameters {
max_result_size,
dataflow_max_inflight_bytes,
enable_active_dataflow_cancellation,
persist,
} = params;

Expand All @@ -200,6 +204,9 @@ impl<'a, A: Allocate> ActiveComputeState<'a, A> {
if let Some(v) = dataflow_max_inflight_bytes {
self.compute_state.dataflow_max_inflight_bytes = v;
}
if let Some(v) = enable_active_dataflow_cancellation {
self.compute_state.enable_active_dataflow_cancellation = v;
}

persist.apply(self.compute_state.persist_clients.cfg())
}
Expand Down Expand Up @@ -360,8 +367,10 @@ impl<'a, A: Allocate> ActiveComputeState<'a, A> {
.dataflow_indexes
.remove(&id)
.expect("Dropped compute collection with no dataflow");
if let Ok(index) = Rc::try_unwrap(dataflow_index) {
self.timely_worker.drop_dataflow(index);
if self.compute_state.enable_active_dataflow_cancellation {
if let Ok(index) = Rc::try_unwrap(dataflow_index) {
self.timely_worker.drop_dataflow(index);
}
}

// Remove removing frontier tracking and logging.
Expand Down
16 changes: 16 additions & 0 deletions src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,15 @@ pub const AUTO_ROUTE_INTROSPECTION_QUERIES: ServerVar<bool> = ServerVar {
safe: true,
};

pub const ENABLE_ACTIVE_DATAFLOW_CANCELLATION: ServerVar<bool> = ServerVar {
name: UncasedStr::new("enable_active_dataflow_cancellation"),
value: &false,
description:
"Feature flag indicating whether active dataflow cancellation is enabled (Materialize).",
internal: true,
safe: true,
};

/// Represents the input to a variable.
///
/// Each variable has different rules for how it handles each style of input.
Expand Down Expand Up @@ -1360,6 +1369,7 @@ impl Default for SystemVars {
.with_var(&ENABLE_WITH_MUTUALLY_RECURSIVE)
.with_var(&ENABLE_RBAC_CHECKS)
.with_var(&ENABLE_AUTO_ROUTE_INTROSPECTION_QUERIES)
.with_var(&ENABLE_ACTIVE_DATAFLOW_CANCELLATION)
}
}

Expand Down Expand Up @@ -1659,6 +1669,11 @@ impl SystemVars {
pub fn enable_auto_route_introspection_queries(&self) -> bool {
*self.expect_value(&ENABLE_AUTO_ROUTE_INTROSPECTION_QUERIES)
}

/// Returns the `enable_active_dataflow_cancellation` configuration parameter.
pub fn enable_active_dataflow_cancellation(&self) -> bool {
*self.expect_value(&ENABLE_ACTIVE_DATAFLOW_CANCELLATION)
}
}

/// A `Var` represents a configuration parameter of an arbitrary type.
Expand Down Expand Up @@ -2603,6 +2618,7 @@ impl From<TransactionIsolationLevel> for IsolationLevel {
pub fn is_compute_config_var(name: &str) -> bool {
name == MAX_RESULT_SIZE.name()
|| name == DATAFLOW_MAX_INFLIGHT_BYTES.name()
|| name == ENABLE_ACTIVE_DATAFLOW_CANCELLATION.name()
|| is_persist_config_var(name)
}

Expand Down

0 comments on commit 9950736

Please sign in to comment.