Skip to content

Commit

Permalink
feat(metric): add recovery metrics (risingwavelabs#9022)
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed Apr 6, 2023
1 parent ea53cf3 commit ba2f6e1
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 105 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

66 changes: 59 additions & 7 deletions grafana/risingwave-dev-dashboard.dashboard.py
Expand Up @@ -14,6 +14,7 @@
panels = Panels(datasource)
logging.basicConfig(level=logging.WARN)


def section_cluster_node(panels):
return [
panels.row("Cluster Node"),
Expand Down Expand Up @@ -65,6 +66,49 @@ def section_cluster_node(panels):
]


def section_recovery_node(panels):
return [
panels.row("Recovery"),
panels.timeseries_ops(
"Recovery Successful Rate",
"The rate of successful recovery attempts",
[
panels.target(f"sum(rate({metric('recovery_latency_count')}[$__rate_interval])) by (instance)",
"{{instance}}")
],
["last"],
),
panels.timeseries_count(
"Failed recovery attempts",
"Total number of failed reocovery attempts",
[
panels.target(f"sum({metric('recovery_failure_cnt')}) by (instance)",
"{{instance}}")
],
["last"],
),
panels.timeseries_latency(
"Recovery latency",
"Time spent in a successful recovery attempt",
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('recovery_latency_bucket')}[$__rate_interval])) by (le, instance))",
f"recovery latency p{legend}" +
" - {{instance}}",
),
[50, 90, 99, "max"],
),
panels.target(
f"sum by (le) (rate({metric('recovery_latency_sum')}[$__rate_interval])) / sum by (le) (rate({metric('recovery_latency_count')}[$__rate_interval]))",
"recovery latency avg",
),
],
["last"],
)
]


def section_compaction(outer_panels):
panels = outer_panels.sub_panel()
return [
Expand Down Expand Up @@ -330,7 +374,7 @@ def section_compaction(outer_panels):
],
),

panels.timeseries_count(
panels.timeseries_count(
"Hummock Sstable Stat",
"Avg count gotten from sstable_distinct_epoch_count, for observing sstable_distinct_epoch_count",
[
Expand All @@ -344,7 +388,7 @@ def section_compaction(outer_panels):
panels.timeseries_latency(
"Hummock Remote Read Duration",
"Total time of operations which read from remote storage when enable prefetch",
[
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('state_store_remote_read_time_per_task_bucket')}[$__rate_interval])) by (le, job, instance, table_id))",
Expand Down Expand Up @@ -501,7 +545,7 @@ def section_object_storage(outer_panels):
"Estimated S3 Cost (Monthly)",
"This metric uses the total size of data in S3 at this second to derive the cost of storing data "
"for a whole month. The price is 0.023 USD per GB. Please checkout AWS's pricing model for more "
"accurate calculation.",
"accurate calculation.",
[
panels.target(
f"sum({metric('storage_level_total_file_size')}) by (instance) * 0.023 / 1000 / 1000",
Expand Down Expand Up @@ -1153,6 +1197,7 @@ def section_batch_exchange(outer_panels):
),
]


def section_frontend(outer_panels):
panels = outer_panels.sub_panel()
return [
Expand Down Expand Up @@ -1184,7 +1229,7 @@ def section_frontend(outer_panels):
"",
[
panels.target(f"{metric('distributed_running_query_num')}",
"The number of running query in distributed execution mode"),
"The number of running query in distributed execution mode"),
],
["last"],
),
Expand All @@ -1193,7 +1238,7 @@ def section_frontend(outer_panels):
"",
[
panels.target(f"{metric('distributed_rejected_query_counter')}",
"The number of rejected query in distributed execution mode"),
"The number of rejected query in distributed execution mode"),
],
["last"],
),
Expand All @@ -1202,7 +1247,7 @@ def section_frontend(outer_panels):
"",
[
panels.target(f"{metric('distributed_completed_query_counter')}",
"The number of completed query in distributed execution mode"),
"The number of completed query in distributed execution mode"),
],
["last"],
),
Expand Down Expand Up @@ -1745,6 +1790,7 @@ def section_hummock_tiered_cache(outer_panels):
)
]


def section_hummock_manager(outer_panels):
panels = outer_panels.sub_panel()
total_key_size_filter = "metric='total_key_size'"
Expand Down Expand Up @@ -1891,6 +1937,7 @@ def section_hummock_manager(outer_panels):
)
]


def section_backup_manager(outer_panels):
panels = outer_panels.sub_panel()
return [
Expand All @@ -1916,7 +1963,7 @@ def section_backup_manager(outer_panels):
f"histogram_quantile({quantile}, sum(rate({metric('backup_job_latency_bucket')}[$__rate_interval])) by (le, state))",
f"Job Process Time p{legend}" +
" - {{state}}",
),
),
[50, 99, 999, "max"],
),
],
Expand All @@ -1925,6 +1972,7 @@ def section_backup_manager(outer_panels):
)
]


def grpc_metrics_target(panels, name, filter):
return panels.timeseries_latency_small(
f"{name} latency",
Expand Down Expand Up @@ -2166,6 +2214,7 @@ def section_grpc_hummock_meta_client(outer_panels):
),
]


def section_memory_manager(outer_panels):
panels = outer_panels.sub_panel()
return [
Expand Down Expand Up @@ -2236,6 +2285,7 @@ def section_memory_manager(outer_panels):
),
]


def section_connector_node(outer_panels):
panels = outer_panels.sub_panel()
return [
Expand All @@ -2256,6 +2306,7 @@ def section_connector_node(outer_panels):
)
]


templating = Templating()
if namespace_filter_enabled:
templating = Templating(
Expand Down Expand Up @@ -2295,6 +2346,7 @@ def section_connector_node(outer_panels):
version=dashboard_version,
panels=[
*section_cluster_node(panels),
*section_recovery_node(panels),
*section_streaming(panels),
*section_streaming_actors(panels),
*section_streaming_exchange(panels),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

166 changes: 90 additions & 76 deletions src/meta/src/barrier/recovery.rs
Expand Up @@ -24,7 +24,8 @@ use risingwave_pb::common::{ActorInfo, WorkerNode, WorkerType};
use risingwave_pb::stream_plan::barrier::Mutation;
use risingwave_pb::stream_plan::AddMutation;
use risingwave_pb::stream_service::{
BroadcastActorInfoTableRequest, BuildActorsRequest, ForceStopActorsRequest, UpdateActorsRequest,
BarrierCompleteResponse, BroadcastActorInfoTableRequest, BuildActorsRequest,
ForceStopActorsRequest, UpdateActorsRequest,
};
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tracing::{debug, error, warn};
Expand Down Expand Up @@ -119,90 +120,103 @@ where
.await
.expect("clean dirty fragments");
let retry_strategy = Self::get_retry_strategy();
let (new_epoch, _responses) = tokio_retry::Retry::spawn(retry_strategy, || async {
let mut info = self.resolve_actor_info_for_recovery().await;
let mut new_epoch = prev_epoch.next();

// Migrate actors in expired CN to newly joined one.
let migrated = self.migrate_actors(&info).await.inspect_err(|err| {
error!(err = ?err, "migrate actors failed");
})?;
if migrated {
info = self.resolve_actor_info_for_recovery().await;
}

// Reset all compute nodes, stop and drop existing actors.
self.reset_compute_nodes(&info).await.inspect_err(|err| {
error!(err = ?err, "reset compute nodes failed");
})?;

// update and build all actors.
self.update_actors(&info).await.inspect_err(|err| {
error!(err = ?err, "update actors failed");
})?;
self.build_actors(&info).await.inspect_err(|err| {
error!(err = ?err, "build_actors failed");
})?;

// get split assignments for all actors
let source_split_assignments = self.source_manager.list_assignments().await;
let command = Command::Plain(Some(Mutation::Add(AddMutation {
// Actors built during recovery is not treated as newly added actors.
actor_dispatchers: Default::default(),
added_actors: Default::default(),
actor_splits: build_actor_connector_splits(&source_split_assignments),
})));

let prev_epoch = new_epoch;
new_epoch = prev_epoch.next();
// checkpoint, used as init barrier to initialize all executors.
let command_ctx = Arc::new(CommandContext::new(
self.fragment_manager.clone(),
self.snapshot_manager.clone(),
self.env.stream_client_pool_ref(),
info,
prev_epoch,
new_epoch,
command,
true,
self.source_manager.clone(),
));

#[cfg(not(all(test, feature = "failpoints")))]
{
use risingwave_common::util::epoch::INVALID_EPOCH;

let mce = self
.hummock_manager
.get_current_version()
.await
.max_committed_epoch;

if mce != INVALID_EPOCH {
command_ctx.wait_epoch_commit(mce).await?;
// We take retry into consideration because this is the latency user sees for a cluster to
// get recovered.
let recovery_timer = self.metrics.recovery_latency.start_timer();
let (new_epoch, _responses) = tokio_retry::Retry::spawn(retry_strategy, || async {
let recovery_result: MetaResult<(Epoch, Vec<BarrierCompleteResponse>)> = try {
let mut info = self.resolve_actor_info_for_recovery().await;
let mut new_epoch = prev_epoch.next();

// Migrate actors in expired CN to newly joined one.
let migrated = self.migrate_actors(&info).await.inspect_err(|err| {
error!(err = ?err, "migrate actors failed");
})?;
if migrated {
info = self.resolve_actor_info_for_recovery().await;
}
}

let (barrier_complete_tx, mut barrier_complete_rx) =
tokio::sync::mpsc::unbounded_channel();
self.inject_barrier(command_ctx.clone(), &barrier_complete_tx)
.await;
match barrier_complete_rx.recv().await.unwrap().result {
Ok(response) => {
if let Err(err) = command_ctx.post_collect().await {
error!(err = ?err, "post_collect failed");
return Err(err);
// Reset all compute nodes, stop and drop existing actors.
self.reset_compute_nodes(&info).await.inspect_err(|err| {
error!(err = ?err, "reset compute nodes failed");
})?;

// update and build all actors.
self.update_actors(&info).await.inspect_err(|err| {
error!(err = ?err, "update actors failed");
})?;
self.build_actors(&info).await.inspect_err(|err| {
error!(err = ?err, "build_actors failed");
})?;

// get split assignments for all actors
let source_split_assignments = self.source_manager.list_assignments().await;
let command = Command::Plain(Some(Mutation::Add(AddMutation {
// Actors built during recovery is not treated as newly added actors.
actor_dispatchers: Default::default(),
added_actors: Default::default(),
actor_splits: build_actor_connector_splits(&source_split_assignments),
})));

let prev_epoch = new_epoch;
new_epoch = prev_epoch.next();
// checkpoint, used as init barrier to initialize all executors.
let command_ctx = Arc::new(CommandContext::new(
self.fragment_manager.clone(),
self.snapshot_manager.clone(),
self.env.stream_client_pool_ref(),
info,
prev_epoch,
new_epoch,
command,
true,
self.source_manager.clone(),
));

#[cfg(not(all(test, feature = "failpoints")))]
{
use risingwave_common::util::epoch::INVALID_EPOCH;

let mce = self
.hummock_manager
.get_current_version()
.await
.max_committed_epoch;

if mce != INVALID_EPOCH {
command_ctx.wait_epoch_commit(mce).await?;
}
Ok((new_epoch, response))
}
Err(err) => {
error!(err = ?err, "inject_barrier failed");
Err(err)
}

let (barrier_complete_tx, mut barrier_complete_rx) =
tokio::sync::mpsc::unbounded_channel();
self.inject_barrier(command_ctx.clone(), &barrier_complete_tx)
.await;
let res = match barrier_complete_rx.recv().await.unwrap().result {
Ok(response) => {
if let Err(err) = command_ctx.post_collect().await {
error!(err = ?err, "post_collect failed");
Err(err)
} else {
Ok((new_epoch, response))
}
}
Err(err) => {
error!(err = ?err, "inject_barrier failed");
Err(err)
}
};
res?
};
if recovery_result.is_err() {
self.metrics.recovery_failure_cnt.inc();
}
recovery_result
})
.await
.expect("Retry until recovery success.");
recovery_timer.observe_duration();
tracing::info!("recovery success");

new_epoch
Expand Down

0 comments on commit ba2f6e1

Please sign in to comment.