Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: write status updates in the controller, and add the "paused" status #23222

Merged
merged 2 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions misc/python/materialize/checks/all_checks/source_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def manipulate(self) -> list[Testdrive]:
$ postgres-execute connection=postgres://postgres:postgres@postgres
DROP PUBLICATION IF EXISTS source_errors_publicationA;
INSERT INTO source_errors_table VALUES (3);

# We sleep for a bit here to allow status updates to propagate to the storage controller
# in scenarios where environmentd is killed
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=5s
""",
"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
Expand Down
15 changes: 15 additions & 0 deletions src/storage-client/src/client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
syntax = "proto3";

import "proto/src/proto.proto";
import "proto/src/chrono.proto";
import "repr/src/antichain.proto";
import "repr/src/global_id.proto";
import "cluster-client/src/client.proto";
Expand Down Expand Up @@ -111,6 +112,19 @@ message ProtoStorageResponse {
repeated ProtoSinkStatisticsUpdate sink_updates = 2;
}

message ProtoStatusUpdate {
mz_repr.global_id.ProtoGlobalId id = 1;
string status = 2;
mz_proto.chrono.ProtoNaiveDateTime timestamp = 3;
optional string error = 4;
repeated string hints = 5;
map<string, string> namespaced_errors = 6;
}

message ProtoStatusUpdates {
repeated ProtoStatusUpdate updates = 1;
}

message ProtoDroppedIds {
repeated mz_repr.global_id.ProtoGlobalId ids = 1;
}
Expand All @@ -119,5 +133,6 @@ message ProtoStorageResponse {
ProtoFrontierUppersKind frontier_uppers = 1;
ProtoDroppedIds dropped_ids = 2;
ProtoStatisticsUpdates stats = 3;
ProtoStatusUpdates status_updates = 4;
}
}
59 changes: 57 additions & 2 deletions src/storage-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,49 @@ impl PackableStats for SinkStatisticsUpdate {
}
}

/// A source or sink status update.
///
/// Represents a status update for a given object type. The inner value for each
/// variant should be able to be packed into a status row that conforms to the schema
/// for the object's status history relation.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct StatusUpdate {
pub id: GlobalId,
pub status: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub error: Option<String>,
pub hints: BTreeSet<String>,
pub namespaced_errors: BTreeMap<String, String>,
}

impl RustType<proto_storage_response::ProtoStatusUpdate> for StatusUpdate {
fn into_proto(&self) -> proto_storage_response::ProtoStatusUpdate {
proto_storage_response::ProtoStatusUpdate {
id: Some(self.id.into_proto()),
status: self.status.clone(),
timestamp: Some(self.timestamp.into_proto()),
error: self.error.clone(),
hints: self.hints.iter().cloned().collect(),
namespaced_errors: self.namespaced_errors.clone(),
}
}

fn from_proto(
proto: proto_storage_response::ProtoStatusUpdate,
) -> Result<Self, TryFromProtoError> {
Ok(StatusUpdate {
id: proto.id.into_rust_if_some("ProtoStatusUpdate::id")?,
timestamp: proto
.timestamp
.into_rust_if_some("ProtoStatusUpdate::timestamp")?,
status: proto.status,
error: proto.error,
hints: proto.hints.into_iter().collect(),
namespaced_errors: proto.namespaced_errors,
})
}
}

/// Responses that the storage nature of a worker/dataflow can provide back to the coordinator.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum StorageResponse<T = mz_repr::Timestamp> {
Expand All @@ -367,14 +410,17 @@ pub enum StorageResponse<T = mz_repr::Timestamp> {

/// A list of statistics updates, currently only for sources.
StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
/// A list of status updates for sources and sinks. Periodically sent from
/// storage workers to convey the latest status information about an object.
StatusUpdates(Vec<StatusUpdate>),
}

impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
fn into_proto(&self) -> ProtoStorageResponse {
use proto_storage_response::Kind::*;
use proto_storage_response::{
ProtoDroppedIds, ProtoSinkStatisticsUpdate, ProtoSourceStatisticsUpdate,
ProtoStatisticsUpdates,
ProtoStatisticsUpdates, ProtoStatusUpdates,
};
ProtoStorageResponse {
kind: Some(match self {
Expand Down Expand Up @@ -412,13 +458,16 @@ impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
.collect(),
})
}
StorageResponse::StatusUpdates(updates) => StatusUpdates(ProtoStatusUpdates {
updates: updates.into_proto(),
guswynn marked this conversation as resolved.
Show resolved Hide resolved
}),
}),
}
}

fn from_proto(proto: ProtoStorageResponse) -> Result<Self, TryFromProtoError> {
use proto_storage_response::Kind::*;
use proto_storage_response::ProtoDroppedIds;
use proto_storage_response::{ProtoDroppedIds, ProtoStatusUpdates};
match proto.kind {
Some(DroppedIds(ProtoDroppedIds { ids })) => {
Ok(StorageResponse::DroppedIds(ids.into_rust()?))
Expand Down Expand Up @@ -464,6 +513,9 @@ impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
})
.collect::<Result<Vec<_>, TryFromProtoError>>()?,
)),
Some(StatusUpdates(ProtoStatusUpdates { updates })) => {
Ok(StorageResponse::StatusUpdates(updates.into_rust()?))
}
None => Err(TryFromProtoError::missing_field(
"ProtoStorageResponse::kind",
)),
Expand Down Expand Up @@ -660,6 +712,9 @@ where
sink_stats,
)))
}
StorageResponse::StatusUpdates(updates) => {
Some(Ok(StorageResponse::StatusUpdates(updates)))
}
}
}
}
Expand Down
Loading