Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,22 @@ A compute object is hydrated on a given replica when it has fully processed the
| `replica_id` | [`text`] | The ID of a cluster replica. |
| `hydrated` | [`boolean`] | Whether the compute object is hydrated on the replica. |

### `mz_compute_operator_hydration_statuses`

The `mz_compute_operator_hydration_statuses` table describes the dataflow operator hydration status of compute objects (indexes or materialized views).

A dataflow operator is hydrated on a given replica when it has fully processed the initial snapshot of data available in its inputs.

<!-- RELATION_SPEC mz_internal.mz_compute_operator_hydration_statuses -->
| Field | Type | Meaning |
| ----------------------- | ----------- | -------- |
| `object_id` | [`text`] | The ID of a compute object. Corresponds to [`mz_catalog.mz_indexes.id`](../mz_catalog#mz_indexes) or [`mz_catalog.mz_materialized_views.id`](../mz_catalog#mz_materialized_views). |
| `physical_plan_node_id` | [`uint8`] | The ID of a node in the physical plan of the compute object. Corresponds to a `node_id` displayed in the output of `EXPLAIN PHYSICAL PLAN WITH (node_ids)`. |
| `replica_id` | [`text`] | The ID of a cluster replica. |
| `hydrated` | [`boolean`] | Whether the node is hydrated on the replica. |

<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_compute_operator_hydration_statuses_per_worker -->

### `mz_frontiers`

The `mz_frontiers` table describes the frontiers of each source, sink, table,
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters {
enable_mz_join_core: Some(config.enable_mz_join_core()),
enable_jemalloc_profiling: Some(config.enable_jemalloc_profiling()),
enable_columnation_lgalloc: Some(config.enable_columnation_lgalloc()),
enable_operator_hydration_status_logging: Some(
config.enable_compute_operator_hydration_status_logging(),
),
persist: persist_config(config),
tracing: tracing_config(config),
grpc_client: grpc_client_config(config),
Expand Down
31 changes: 31 additions & 0 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1946,6 +1946,20 @@ pub static MZ_COMPUTE_HYDRATION_STATUSES: Lazy<BuiltinSource> = Lazy::new(|| Bui
is_retained_metrics_object: false,
access: vec![PUBLIC_SELECT],
});
pub static MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES_PER_WORKER: Lazy<BuiltinSource> =
Lazy::new(|| BuiltinSource {
name: "mz_compute_operator_hydration_statuses_per_worker",
schema: MZ_INTERNAL_SCHEMA,
data_source: IntrospectionType::ComputeOperatorHydrationStatus,
desc: RelationDesc::empty()
.with_column("object_id", ScalarType::String.nullable(false))
.with_column("physical_plan_node_id", ScalarType::UInt64.nullable(false))
.with_column("replica_id", ScalarType::String.nullable(false))
.with_column("worker_id", ScalarType::UInt64.nullable(false))
.with_column("hydrated", ScalarType::Bool.nullable(false)),
is_retained_metrics_object: false,
access: vec![PUBLIC_SELECT],
});

pub static MZ_DATABASES: Lazy<BuiltinTable> = Lazy::new(|| BuiltinTable {
name: "mz_databases",
Expand Down Expand Up @@ -4178,6 +4192,21 @@ HAVING pg_catalog.sum(count) != 0",
access: vec![PUBLIC_SELECT],
});

pub static MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES: Lazy<BuiltinView> = Lazy::new(|| BuiltinView {
name: "mz_compute_operator_hydration_statuses",
schema: MZ_INTERNAL_SCHEMA,
column_defs: None,
sql: "
SELECT
object_id,
physical_plan_node_id,
replica_id,
bool_and(hydrated) AS hydrated
FROM mz_internal.mz_compute_operator_hydration_statuses_per_worker
GROUP BY object_id, physical_plan_node_id, replica_id",
access: vec![PUBLIC_SELECT],
});

pub static MZ_MESSAGE_COUNTS_PER_WORKER: Lazy<BuiltinView> = Lazy::new(|| BuiltinView {
name: "mz_message_counts_per_worker",
schema: MZ_INTERNAL_SCHEMA,
Expand Down Expand Up @@ -6711,10 +6740,12 @@ pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
Builtin::View(&MZ_GLOBAL_FRONTIERS),
Builtin::Source(&MZ_COMPUTE_DEPENDENCIES),
Builtin::Source(&MZ_COMPUTE_HYDRATION_STATUSES),
Builtin::Source(&MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES_PER_WORKER),
Builtin::View(&MZ_HYDRATION_STATUSES),
Builtin::View(&MZ_MATERIALIZATION_LAG),
Builtin::View(&MZ_COMPUTE_ERROR_COUNTS_PER_WORKER),
Builtin::View(&MZ_COMPUTE_ERROR_COUNTS),
Builtin::View(&MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES),
Builtin::Source(&MZ_CLUSTER_REPLICA_FRONTIERS),
Builtin::Source(&MZ_CLUSTER_REPLICA_HEARTBEATS),
Builtin::Index(&MZ_SHOW_DATABASES_IND),
Expand Down
18 changes: 12 additions & 6 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,9 +651,11 @@ where
}

for (type_, updates) in updates_by_type {
self.storage
.record_introspection_updates(type_, updates)
.await;
if !updates.is_empty() {
self.storage
.record_introspection_updates(type_, updates)
.await;
}
}
}
}
Expand Down Expand Up @@ -704,13 +706,17 @@ where

#[tracing::instrument(level = "debug", skip(self))]
async fn maintain(&mut self) {
// Record pending introspection updates.
self.record_introspection_updates().await;

// Perform instance maintenance work.
for instance in self.compute.instances.values_mut() {
instance.activate(self.storage).maintain();
}

// Record pending introspection updates.
//
// It's beneficial to do this as the last maintenance step because previous steps can cause
// dropping of state, which can can cause introspection retractions, which lower the volume
// of data we have to record.
self.record_introspection_updates().await;
}
}

Expand Down
160 changes: 132 additions & 28 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use futures::{future, StreamExt};
use mz_build_info::BuildInfo;
use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::plan::NodeId;
use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, PersistSinkConnection};
use mz_compute_types::sources::SourceInstanceDesc;
use mz_expr::RowSetFinishing;
Expand All @@ -46,7 +47,8 @@ use crate::protocol::command::{
};
use crate::protocol::history::ComputeCommandHistory;
use crate::protocol::response::{
ComputeResponse, CopyToResponse, PeekResponse, SubscribeBatch, SubscribeResponse,
ComputeResponse, CopyToResponse, OperatorHydrationStatus, PeekResponse, StatusResponse,
SubscribeBatch, SubscribeResponse,
};
use crate::service::{ComputeClient, ComputeGrpcClient};

Expand Down Expand Up @@ -444,6 +446,34 @@ impl<T: Timestamp> Instance<T> {
}
}

/// Update the tracked hydration status for an operator according to a received status update.
fn update_operator_hydration_status(
&mut self,
replica_id: ReplicaId,
status: OperatorHydrationStatus,
) {
let Some(replica) = self.replicas.get_mut(&replica_id) else {
tracing::error!(
%replica_id, ?status,
"status update for an unknown replica"
);
return;
};
let Some(collection) = replica.collections.get_mut(&status.collection_id) else {
tracing::error!(
%replica_id, ?status,
"status update for an unknown collection"
);
return;
};

collection.hydration_state.operator_hydrated(
status.lir_id,
status.worker_id,
status.hydrated,
);
}

/// Clean up collection state that is not needed anymore.
///
/// Three conditions need to be true before we can remove state for a collection:
Expand Down Expand Up @@ -1496,6 +1526,10 @@ where
ComputeResponse::SubscribeResponse(id, response) => {
self.handle_subscribe_response(id, response, replica_id)
}
ComputeResponse::Status(response) => {
self.handle_status_response(response, replica_id);
None
}
}
}

Expand Down Expand Up @@ -1685,6 +1719,14 @@ where
}
}
}

fn handle_status_response(&mut self, response: StatusResponse, replica_id: ReplicaId) {
match response {
StatusResponse::OperatorHydration(status) => self
.compute
.update_operator_hydration_status(replica_id, status),
}
}
}

impl<'a, T> ActiveInstance<'a, T>
Expand Down Expand Up @@ -1850,12 +1892,12 @@ impl<T> ReplicaState<T> {
/// Add a collection to the replica state.
fn add_collection(&mut self, id: GlobalId, as_of: Antichain<T>) {
let metrics = self.metrics.for_collection(id);
let hydration_flag = HydrationFlag::new(self.id, id, self.introspection_tx.clone());
let hydration_state = HydrationState::new(self.id, id, self.introspection_tx.clone());
let mut state = ReplicaCollectionState {
metrics,
created_at: Instant::now(),
as_of,
hydration_flag,
hydration_state,
};

// We need to consider the edge case where the as-of is the empty frontier. Such an as-of
Expand Down Expand Up @@ -1893,14 +1935,14 @@ struct ReplicaCollectionState<T> {
created_at: Instant,
/// As-of frontier with which this collection was installed on the replica.
as_of: Antichain<T>,
/// Tracks whether this collection is hydrated, i.e., it has produced some initial output.
hydration_flag: HydrationFlag,
/// Tracks hydration state for this collection.
hydration_state: HydrationState,
}

impl<T> ReplicaCollectionState<T> {
/// Returns whether this collection is hydrated.
fn hydrated(&self) -> bool {
self.hydration_flag.hydrated
self.hydration_state.hydrated
}

/// Marks the collection as hydrated and updates metrics and introspection accordingly.
Expand All @@ -1910,22 +1952,29 @@ impl<T> ReplicaCollectionState<T> {
metrics.initial_output_duration_seconds.set(duration);
}

self.hydration_flag.set();
self.hydration_state.collection_hydrated();
}
}

/// A wrapper type that maintains hydration introspection for a given replica and collection, and
/// ensures that reported introspection data is retracted when the flag is dropped.
/// Maintains both global and operator-level hydration introspection for a given replica and
/// collection, and ensures that reported introspection data is retracted when the flag is dropped.
#[derive(Debug)]
struct HydrationFlag {
struct HydrationState {
/// The ID of the replica.
replica_id: ReplicaId,
/// The ID of the compute collection.
collection_id: GlobalId,
/// Whether the collection is hydrated.
hydrated: bool,
/// Operator-level hydration state.
/// (lir_id, worker_id) -> hydrated
operators: BTreeMap<(NodeId, usize), bool>,
/// A channel through which introspection updates are delivered.
introspection_tx: crossbeam_channel::Sender<IntrospectionUpdates>,
}

impl HydrationFlag {
/// Create a new unset `HydrationFlag` and update introspection.
impl HydrationState {
/// Create a new `HydrationState` and initialize introspection.
fn new(
replica_id: ReplicaId,
collection_id: GlobalId,
Expand All @@ -1935,55 +1984,110 @@ impl HydrationFlag {
replica_id,
collection_id,
hydrated: false,
operators: Default::default(),
introspection_tx,
};

let insertion = self_.row();
self_.send(vec![(insertion, 1)]);
let insertion = self_.row_for_collection();
self_.send(
IntrospectionType::ComputeHydrationStatus,
vec![(insertion, 1)],
);

self_
}

/// Mark the collection as hydrated and update introspection.
fn set(&mut self) {
/// Update the collection as hydrated.
fn collection_hydrated(&mut self) {
if self.hydrated {
return; // nothing to do
}

let retraction = self.row();
let retraction = self.row_for_collection();
self.hydrated = true;
let insertion = self.row();
let insertion = self.row_for_collection();

self.send(
IntrospectionType::ComputeHydrationStatus,
vec![(retraction, -1), (insertion, 1)],
);
}

/// Update the given (lir_id, worker_id) pair as hydrated.
fn operator_hydrated(&mut self, lir_id: NodeId, worker_id: usize, hydrated: bool) {
let retraction = self.row_for_operator(lir_id, worker_id);
self.operators.insert((lir_id, worker_id), hydrated);
let insertion = self.row_for_operator(lir_id, worker_id);

if retraction == insertion {
return; // no change
}

self.send(vec![(retraction, -1), (insertion, 1)]);
let updates = retraction
.map(|r| (r, -1))
.into_iter()
.chain(insertion.map(|r| (r, 1)))
.collect();
self.send(IntrospectionType::ComputeOperatorHydrationStatus, updates);
}

fn row(&self) -> Row {
/// Return a `Row` reflecting the current collection hydration status.
fn row_for_collection(&self) -> Row {
Row::pack_slice(&[
Datum::String(&self.collection_id.to_string()),
Datum::String(&self.replica_id.to_string()),
Datum::from(self.hydrated),
])
}

fn send(&self, updates: Vec<(Row, Diff)>) {
let result = self
.introspection_tx
.send((IntrospectionType::ComputeHydrationStatus, updates));
/// Return a `Row` reflecting the current hydration status of the identified operator.
///
/// Returns `None` if the identified operator is not tracked.
fn row_for_operator(&self, lir_id: NodeId, worker_id: usize) -> Option<Row> {
self.operators.get(&(lir_id, worker_id)).map(|hydrated| {
Row::pack_slice(&[
Datum::String(&self.collection_id.to_string()),
Datum::UInt64(lir_id),
Datum::String(&self.replica_id.to_string()),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::from(*hydrated),
])
})
}

fn send(&self, introspection_type: IntrospectionType, updates: Vec<(Row, Diff)>) {
let result = self.introspection_tx.send((introspection_type, updates));

if result.is_err() {
// The global controller holds on to the `introspection_rx`. So when we get here that
// probably means that the controller was dropped and the process is shutting down, in
// which case we don't care about introspection updates anymore.
tracing::info!(
"discarding `ComputeHydrationStatus` update because the receiver disconnected"
?introspection_type,
"discarding introspection update because the receiver disconnected"
);
}
}
}

impl Drop for HydrationFlag {
impl Drop for HydrationState {
fn drop(&mut self) {
let retraction = self.row();
self.send(vec![(retraction, -1)]);
// Retract collection hydration status.
let retraction = self.row_for_collection();
self.send(
IntrospectionType::ComputeHydrationStatus,
vec![(retraction, -1)],
);

// Retract operator-level hydration status.
let operators: Vec<_> = self.operators.keys().collect();
let updates: Vec<_> = operators
.into_iter()
.flat_map(|(lir_id, worker_id)| self.row_for_operator(*lir_id, *worker_id))
.map(|r| (r, -1))
.collect();
if !updates.is_empty() {
self.send(IntrospectionType::ComputeOperatorHydrationStatus, updates)
}
}
}
Loading