Skip to content

Commit

Permalink
feat(grafana): report local barrier manager progress (risingwavelabs#…
Browse files Browse the repository at this point in the history
…12001)

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Sep 5, 2023
1 parent e4b9a08 commit 579405e
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 75 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions grafana/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def next_one_third_width_graph(self):


class Panels:
# Common options for timeseries panels
common_options = {
"fillOpacity": 10,
"interval": "1s",
Expand Down
11 changes: 11 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,17 @@ def section_streaming(panels):
),
],
),
panels.timeseries_ops(
"Earliest In-Flight Barrier Progress",
"The number of actors that have processed the earliest in-flight barriers per second. "
"This metric helps users to detect potential congestion or stuck in the system.",
[
panels.target(
f"rate({metric('stream_barrier_manager_progress')}[$__rate_interval])",
"{{instance}}",
),
],
),
]


Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ pub struct StreamingMetrics {
pub barrier_inflight_latency: Histogram,
/// The duration of sync to storage.
pub barrier_sync_latency: Histogram,
/// The progress made by the earliest in-flight barriers in the local barrier manager.
pub barrier_manager_progress: IntCounter,

pub sink_commit_duration: HistogramVec,

Expand Down Expand Up @@ -606,6 +608,14 @@ impl StreamingMetrics {
exponential_buckets(0.1, 1.5, 16).unwrap() // max 43s
);
let barrier_sync_latency = register_histogram_with_registry!(opts, registry).unwrap();

let barrier_manager_progress = register_int_counter_with_registry!(
"stream_barrier_manager_progress",
"The number of actors that have processed the earliest in-flight barriers",
registry
)
.unwrap();

let sink_commit_duration = register_histogram_vec_with_registry!(
"sink_commit_duration",
"Duration of commit op in sink",
Expand Down Expand Up @@ -769,6 +779,7 @@ impl StreamingMetrics {
arrangement_backfill_upstream_output_row_count,
barrier_inflight_latency,
barrier_sync_latency,
barrier_manager_progress,
sink_commit_duration,
lru_current_watermark_time_ms,
lru_physical_now_ms,
Expand Down
133 changes: 60 additions & 73 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use tokio::sync::oneshot;
use super::progress::ChainState;
use super::CollectResult;
use crate::error::{StreamError, StreamResult};
use crate::executor::monitor::GLOBAL_STREAMING_METRICS;
use crate::executor::Barrier;
use crate::task::ActorId;

Expand Down Expand Up @@ -84,85 +85,71 @@ impl ManagedBarrierState {

/// Notify if we have collected barriers from all actor ids. The state must be `Issued`.
fn may_notify(&mut self, curr_epoch: u64) {
let to_notify = match self.epoch_barrier_state_map.get(&curr_epoch) {
Some(BarrierState {
inner:
ManagedBarrierStateInner::Issued {
remaining_actors, ..
},
..
}) => remaining_actors.is_empty(),
_ => unreachable!(),
};
// Report if there's progress on the earliest in-flight barrier.
if self.epoch_barrier_state_map.keys().next() == Some(&curr_epoch) {
GLOBAL_STREAMING_METRICS.barrier_manager_progress.inc();
}

if to_notify {
while let Some((
_,
BarrierState {
inner: barrier_inner,
..
},
)) = self.epoch_barrier_state_map.first_key_value()
{
match barrier_inner {
ManagedBarrierStateInner::Issued {
remaining_actors, ..
} => {
if !remaining_actors.is_empty() {
break;
}
}
_ => break,
}
let (epoch, barrier_state) = self.epoch_barrier_state_map.pop_first().unwrap();
let create_mview_progress = self
.create_mview_progress
.remove(&epoch)
.unwrap_or_default()
.into_iter()
.map(|(actor, state)| CreateMviewProgress {
chain_actor_id: actor,
done: matches!(state, ChainState::Done),
consumed_epoch: match state {
ChainState::ConsumingUpstream(consumed_epoch, _) => consumed_epoch,
ChainState::Done => epoch,
},
consumed_rows: match state {
ChainState::ConsumingUpstream(_, consumed_rows) => consumed_rows,
ChainState::Done => 0,
},
})
.collect();
while let Some(entry) = self.epoch_barrier_state_map.first_entry() {
let to_notify = matches!(
&entry.get().inner,
ManagedBarrierStateInner::Issued {
remaining_actors, ..
} if remaining_actors.is_empty(),
);

let kind = barrier_state.kind;
match kind {
BarrierKind::Unspecified => unreachable!(),
BarrierKind::Initial => tracing::info!(
epoch = barrier_state.prev_epoch,
"ignore sealing data for the first barrier"
),
BarrierKind::Barrier | BarrierKind::Checkpoint => {
dispatch_state_store!(&self.state_store, state_store, {
state_store.seal_epoch(barrier_state.prev_epoch, kind.is_checkpoint());
});
}
if !to_notify {
break;
}

let (epoch, barrier_state) = entry.remove_entry();
let create_mview_progress = self
.create_mview_progress
.remove(&epoch)
.unwrap_or_default()
.into_iter()
.map(|(actor, state)| CreateMviewProgress {
chain_actor_id: actor,
done: matches!(state, ChainState::Done),
consumed_epoch: match state {
ChainState::ConsumingUpstream(consumed_epoch, _) => consumed_epoch,
ChainState::Done => epoch,
},
consumed_rows: match state {
ChainState::ConsumingUpstream(_, consumed_rows) => consumed_rows,
ChainState::Done => 0,
},
})
.collect();

let kind = barrier_state.kind;
match kind {
BarrierKind::Unspecified => unreachable!(),
BarrierKind::Initial => tracing::info!(
epoch = barrier_state.prev_epoch,
"ignore sealing data for the first barrier"
),
BarrierKind::Barrier | BarrierKind::Checkpoint => {
dispatch_state_store!(&self.state_store, state_store, {
state_store.seal_epoch(barrier_state.prev_epoch, kind.is_checkpoint());
});
}
}

match barrier_state.inner {
ManagedBarrierStateInner::Issued {
collect_notifier, ..
} => {
// Notify about barrier finishing.
let result = CollectResult {
create_mview_progress,
kind,
};
if collect_notifier.unwrap().send(Ok(result)).is_err() {
warn!("failed to notify barrier collection with epoch {}", epoch)
}
match barrier_state.inner {
ManagedBarrierStateInner::Issued {
collect_notifier, ..
} => {
// Notify about barrier finishing.
let result = CollectResult {
create_mview_progress,
kind,
};
if collect_notifier.unwrap().send(Ok(result)).is_err() {
warn!("failed to notify barrier collection with epoch {}", epoch)
}
_ => unreachable!(),
}
_ => unreachable!(),
}
}
}
Expand Down

0 comments on commit 579405e

Please sign in to comment.