Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[consensus] Update all consensus counters to new format #1663

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions common/channel/src/libra_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::Waker;

use crate::message_queues::{Counters, PerKeyQueue, QueueStyle};
use crate::message_queues::{PerKeyQueue, QueueStyle};
use failure::prelude::*;
use futures::async_await::FusedStream;
use futures::stream::Stream;
use futures::task::Context;
use futures::Poll;
use libra_metrics::IntCounterVec;
use std::hash::Hash;

/// SharedState is a data structure private to this module which is
Expand Down Expand Up @@ -121,7 +122,7 @@ impl<K: Eq + Hash + Clone, M> FusedStream for Receiver<K, M> {
pub fn new<K: Eq + Hash + Clone, M>(
queue_style: QueueStyle,
max_queue_size_per_key: usize,
counters: Option<Counters>,
counters: Option<&'static IntCounterVec>,
) -> (Sender<K, M>, Receiver<K, M>) {
let shared_state = Arc::new(Mutex::new(SharedState {
internal_queue: PerKeyQueue::new(queue_style, max_queue_size_per_key, counters),
Expand Down
18 changes: 6 additions & 12 deletions common/channel/src/message_queues.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use libra_metrics::IntCounter;
use libra_metrics::IntCounterVec;
use std::collections::{HashMap, VecDeque};
use std::hash::Hash;

Expand All @@ -16,12 +16,6 @@ pub enum QueueStyle {
FIFO,
}

pub struct Counters {
pub dropped_msgs_counter: &'static IntCounter,
pub enqueued_msgs_counter: &'static IntCounter,
pub dequeued_msgs_counter: &'static IntCounter,
}

/// PerKeyQueue maintains a queue of messages per key. It
/// is a bounded queue of messages per Key and the style (FIFO, LIFO) is
/// configurable. When a new message is added using `push`, it is added to
Expand All @@ -43,7 +37,7 @@ pub(crate) struct PerKeyQueue<K: Eq + Hash + Clone, T> {
round_robin_queue: VecDeque<K>,
/// Maximum number of messages to store per key
max_queue_size: usize,
counters: Option<Counters>,
counters: Option<&'static IntCounterVec>,
}

impl<K: Eq + Hash + Clone, T> PerKeyQueue<K, T> {
Expand All @@ -52,7 +46,7 @@ impl<K: Eq + Hash + Clone, T> PerKeyQueue<K, T> {
pub(crate) fn new(
queue_style: QueueStyle,
max_queue_size_per_key: usize,
counters: Option<Counters>,
counters: Option<&'static IntCounterVec>,
) -> Self {
assert!(
max_queue_size_per_key > 0,
Expand Down Expand Up @@ -87,7 +81,7 @@ impl<K: Eq + Hash + Clone, T> PerKeyQueue<K, T> {
/// add the key to round_robin_queue if it didnt already exist
pub(crate) fn push(&mut self, key: K, message: T) {
if let Some(c) = self.counters.as_ref() {
c.enqueued_msgs_counter.inc();
c.with_label_values(&["enqueued"]).inc();
}
let max_queue_size = self.max_queue_size;
let key_message_queue = self
Expand All @@ -101,7 +95,7 @@ impl<K: Eq + Hash + Clone, T> PerKeyQueue<K, T> {
// Push the message to the actual key message queue
if key_message_queue.len() == max_queue_size {
if let Some(c) = self.counters.as_ref() {
c.dropped_msgs_counter.inc()
c.with_label_values(&["dropped"]).inc();
}
match self.queue_style {
// Drop the newest message for FIFO
Expand Down Expand Up @@ -132,7 +126,7 @@ impl<K: Eq + Hash + Clone, T> PerKeyQueue<K, T> {
}
if message.is_some() {
if let Some(c) = self.counters.as_ref() {
c.dequeued_msgs_counter.inc();
c.with_label_values(&["dequeued"]).inc();
}
}
message
Expand Down
16 changes: 12 additions & 4 deletions consensus/src/chained_bft/event_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,15 @@ impl<T: Payload> EventProcessor<T> {
match waiting_success {
WaitingSuccess::WaitWasRequired { wait_duration, .. } => {
counters::VOTE_SUCCESS_WAIT_S.observe_duration(wait_duration);
counters::VOTE_WAIT_WAS_REQUIRED_COUNT.inc();
counters::VOTES_COUNT
.with_label_values(&["wait_was_required"])
.inc();
}
WaitingSuccess::NoWaitRequired { .. } => {
counters::VOTE_SUCCESS_WAIT_S.observe_duration(Duration::new(0, 0));
counters::VOTE_NO_WAIT_REQUIRED_COUNT.inc();
counters::VOTES_COUNT
.with_label_values(&["no_wait_required"])
.inc();
}
}
}
Expand All @@ -553,7 +557,9 @@ impl<T: Payload> EventProcessor<T> {
block_timestamp_us,
current_round_deadline);
counters::VOTE_FAILURE_WAIT_S.observe_duration(Duration::new(0, 0));
counters::VOTE_MAX_WAIT_EXCEEDED_COUNT.inc();
counters::VOTES_COUNT
.with_label_values(&["max_wait_exceeded"])
.inc();
}
WaitingError::WaitFailed {
current_duration_since_epoch,
Expand All @@ -565,7 +571,9 @@ impl<T: Payload> EventProcessor<T> {
block_timestamp_us,
current_duration_since_epoch);
counters::VOTE_FAILURE_WAIT_S.observe_duration(wait_duration);
counters::VOTE_WAIT_FAILED_COUNT.inc();
counters::VOTES_COUNT
.with_label_values(&["wait_failed"])
.inc();
}
};
return Err(waiting_error);
Expand Down
16 changes: 12 additions & 4 deletions consensus/src/chained_bft/liveness/proposal_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,19 @@ impl<T: Payload> ProposalGenerator<T> {
wait_duration,
} => {
counters::PROPOSAL_SUCCESS_WAIT_S.observe_duration(wait_duration);
counters::PROPOSAL_WAIT_WAS_REQUIRED_COUNT.inc();
counters::PROPOSALS_GENERATED_COUNT
.with_label_values(&["wait_was_required"])
.inc();
current_duration_since_epoch
}
WaitingSuccess::NoWaitRequired {
current_duration_since_epoch,
..
} => {
counters::PROPOSAL_SUCCESS_WAIT_S.observe_duration(Duration::new(0, 0));
counters::PROPOSAL_NO_WAIT_REQUIRED_COUNT.inc();
counters::PROPOSALS_GENERATED_COUNT
.with_label_values(&["no_wait_required"])
.inc();
current_duration_since_epoch
}
}
Expand All @@ -160,7 +164,9 @@ impl<T: Payload> ProposalGenerator<T> {
match waiting_error {
WaitingError::MaxWaitExceeded => {
counters::PROPOSAL_FAILURE_WAIT_S.observe_duration(Duration::new(0, 0));
counters::PROPOSAL_MAX_WAIT_EXCEEDED_COUNT.inc();
counters::PROPOSALS_GENERATED_COUNT
.with_label_values(&["max_wait_exceeded"])
.inc();
bail!(
"Waiting until parent block timestamp usecs {:?} would exceed the round duration {:?}, hence will not create a proposal for this round",
hqc.certified_block().timestamp_usecs(),
Expand All @@ -171,7 +177,9 @@ impl<T: Payload> ProposalGenerator<T> {
wait_duration,
} => {
counters::PROPOSAL_FAILURE_WAIT_S.observe_duration(wait_duration);
counters::PROPOSAL_WAIT_FAILED_COUNT.inc();
counters::PROPOSALS_GENERATED_COUNT
.with_label_values(&["wait_failed"])
.inc();
bail!(
"Even after waiting for {:?}, parent block timestamp usecs {:?} >= current timestamp usecs {:?}, will not create a proposal for this round",
wait_duration,
Expand Down
50 changes: 9 additions & 41 deletions consensus/src/chained_bft/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@

use crate::counters;
use bytes::Bytes;
use channel::{
self, libra_channel,
message_queues::{self, QueueStyle},
};
use channel::{self, libra_channel, message_queues::QueueStyle};
use consensus_types::block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse};
use consensus_types::epoch_retrieval::EpochRetrievalRequest;
use consensus_types::{
Expand Down Expand Up @@ -274,50 +271,21 @@ impl<T: Payload> NetworkTask<T> {
self_receiver: channel::Receiver<failure::Result<Event<ConsensusMsg>>>,
validators: Arc<ValidatorVerifier>,
) -> (NetworkTask<T>, NetworkReceivers<T>) {
let (proposal_tx, proposal_rx) = libra_channel::new(
QueueStyle::LIFO,
1,
Some(message_queues::Counters {
dropped_msgs_counter: &counters::PROPOSAL_DROPPED_MSGS,
enqueued_msgs_counter: &counters::PROPOSAL_ENQUEUED_MSGS,
dequeued_msgs_counter: &counters::PROPOSAL_DEQUEUED_MSGS,
}),
);
let (vote_tx, vote_rx) = libra_channel::new(
QueueStyle::LIFO,
1,
Some(message_queues::Counters {
dropped_msgs_counter: &counters::VOTES_DROPPED_MSGS,
enqueued_msgs_counter: &counters::VOTES_ENQUEUED_MSGS,
dequeued_msgs_counter: &counters::VOTES_DEQUEUED_MSGS,
}),
);
let (proposal_tx, proposal_rx) =
libra_channel::new(QueueStyle::LIFO, 1, Some(&counters::PROPOSAL_CHANNEL_MSGS));
let (vote_tx, vote_rx) =
libra_channel::new(QueueStyle::LIFO, 1, Some(&counters::VOTES_CHANNEL_MSGS));
let (block_request_tx, block_request_rx) = libra_channel::new(
QueueStyle::LIFO,
1,
Some(message_queues::Counters {
dropped_msgs_counter: &counters::BLOCK_RETRIEVAL_DROPPED_MSGS,
enqueued_msgs_counter: &counters::BLOCK_RETRIEVAL_ENQUEUED_MSGS,
dequeued_msgs_counter: &counters::BLOCK_RETRIEVAL_DEQUEUED_MSGS,
}),
);
let (sync_info_tx, sync_info_rx) = libra_channel::new(
QueueStyle::LIFO,
1,
Some(message_queues::Counters {
dropped_msgs_counter: &counters::SYNC_INFO_DROPPED_MSGS,
enqueued_msgs_counter: &counters::SYNC_INFO_ENQUEUED_MSGS,
dequeued_msgs_counter: &counters::SYNC_INFO_DEQUEUED_MSGS,
}),
Some(&counters::BLOCK_RETRIEVAL_CHANNEL_MSGS),
);
let (sync_info_tx, sync_info_rx) =
libra_channel::new(QueueStyle::LIFO, 1, Some(&counters::SYNC_INFO_CHANNEL_MSGS));
let (epoch_change_tx, epoch_change_rx) = libra_channel::new(
QueueStyle::LIFO,
1,
Some(message_queues::Counters {
dropped_msgs_counter: &counters::EPOCH_CHANGE_DROPPED_MSGS,
enqueued_msgs_counter: &counters::EPOCH_CHANGE_ENQUEUED_MSGS,
dequeued_msgs_counter: &counters::EPOCH_CHANGE_DEQUEUED_MSGS,
}),
Some(&counters::EPOCH_CHANGE_CHANNEL_MSGS),
);
let (different_epoch_tx, different_epoch_rx) =
libra_channel::new(QueueStyle::LIFO, 1, None);
Expand Down
Loading