Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: add x-region, x-zone Raft msg metrics to Store #105122

Merged
merged 1 commit into from Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/client_raft_helpers_test.go
Expand Up @@ -454,3 +454,16 @@ func dropRaftMessagesFrom(
},
})
}

// getMapsDiff returns the difference between the values of corresponding
wenyihu6 marked this conversation as resolved.
Show resolved Hide resolved
// metrics in two maps. Assumption: beforeMap and afterMap contain the same set
// of keys.
func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 {
diffMap := make(map[string]int64)
for metricName, beforeValue := range beforeMap {
if v, ok := afterMap[metricName]; ok {
diffMap[metricName] = v - beforeValue
}
}
return diffMap
}
98 changes: 98 additions & 0 deletions pkg/kv/kvserver/client_raft_test.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -6835,3 +6836,100 @@ func TestRaftPreVoteUnquiesceDeadLeader(t *testing.T) {
}, 5*time.Second, 500*time.Millisecond)
t.Logf("n2 is leader")
}

// TestStoreMetricsOnIncomingOutgoingMsg verifies that HandleRaftRequest() and
// HandleRaftRequestSent() correctly update metrics for incoming and outgoing
// raft messages.
func TestStoreMetricsOnIncomingOutgoingMsg(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Unix(0, 123)))
cfg := kvserver.TestStoreConfig(clock)
var stopper *stop.Stopper
stopper, _, _, cfg.StorePool, _ = storepool.CreateTestStorePool(ctx, cfg.Settings,
liveness.TestTimeUntilNodeDead, false, /* deterministic */
func() int { return 1 }, /* nodeCount */
livenesspb.NodeLivenessStatus_DEAD)
defer stopper.Stop(ctx)

// Create a noop store and request.
node := roachpb.NodeDescriptor{NodeID: roachpb.NodeID(1)}
eng := storage.NewDefaultInMemForTesting()
stopper.AddCloser(eng)
cfg.Transport = kvserver.NewDummyRaftTransport(cfg.Settings, cfg.AmbientCtx.Tracer)
store := kvserver.NewStore(ctx, cfg, eng, &node)
store.Ident = &roachpb.StoreIdent{
ClusterID: uuid.Nil,
StoreID: 1,
NodeID: 1,
}
request := &kvserverpb.RaftMessageRequest{
RangeID: 1,
FromReplica: roachpb.ReplicaDescriptor{},
ToReplica: roachpb.ReplicaDescriptor{},
Message: raftpb.Message{
From: 1,
To: 2,
Type: raftpb.MsgTimeoutNow,
Term: 1,
},
}

metricsNames := []string{
"raft.rcvd.bytes",
"raft.rcvd.cross_region.bytes",
"raft.rcvd.cross_zone.bytes",
"raft.sent.bytes",
"raft.sent.cross_region.bytes",
"raft.sent.cross_zone.bytes"}
stream := noopRaftMessageResponseStream{}
expectedSize := int64(request.Size())

t.Run("received raft message", func(t *testing.T) {
before, metricsErr := store.Metrics().GetStoreMetrics(metricsNames)
if metricsErr != nil {
t.Error(metricsErr)
}
if err := store.HandleRaftRequest(context.Background(), request, stream); err != nil {
t.Fatalf("HandleRaftRequest returned err %s", err)
}
after, metricsErr := store.Metrics().GetStoreMetrics(metricsNames)
if metricsErr != nil {
t.Error(metricsErr)
}
actual := getMapsDiff(before, after)
expected := map[string]int64{
"raft.rcvd.bytes": expectedSize,
"raft.rcvd.cross_region.bytes": 0,
"raft.rcvd.cross_zone.bytes": 0,
"raft.sent.bytes": 0,
"raft.sent.cross_region.bytes": 0,
"raft.sent.cross_zone.bytes": 0,
}
require.Equal(t, expected, actual)
})

t.Run("sent raft message", func(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subtests need to be independent of each other, I think these ones aren't. So either make this just one test (remove the subtests) or set up the environment anew in each subtest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking it back, they are independent due to the use of diffs. So this is "fine", I still think my top-level comment about merging the tests makes sense - could always have overlooked something.

before, metricsErr := store.Metrics().GetStoreMetrics(metricsNames)
if metricsErr != nil {
t.Error(metricsErr)
}
store.HandleRaftRequestSent(context.Background(),
request.FromReplica.NodeID, request.ToReplica.NodeID, int64(request.Size()))
after, metricsErr := store.Metrics().GetStoreMetrics(metricsNames)
if metricsErr != nil {
t.Error(metricsErr)
}
actual := getMapsDiff(before, after)
expected := map[string]int64{
"raft.rcvd.bytes": 0,
"raft.rcvd.cross_region.bytes": 0,
"raft.rcvd.cross_zone.bytes": 0,
"raft.sent.bytes": expectedSize,
"raft.sent.cross_region.bytes": 0,
"raft.sent.cross_zone.bytes": 0,
}
require.Equal(t, expected, actual)
})
}
59 changes: 59 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -230,6 +231,51 @@ func (s *Store) RaftSchedulerPriorityIDs() []roachpb.RangeID {
return s.scheduler.PriorityIDs()
}

// GetStoreMetric retrieves the count of the store metric whose metadata name
// matches with the given name parameter. If the specified metric cannot be
// found, the function will return an error.
func (sm *StoreMetrics) GetStoreMetric(name string) (int64, error) {
var c int64
var found bool
sm.registry.Each(func(n string, v interface{}) {
if name == n {
switch t := v.(type) {
case *metric.Counter:
c = t.Count()
found = true
case *metric.Gauge:
c = t.Value()
found = true
}
}
})
if !found {
return -1, errors.Errorf("cannot find metric for %s", name)
}
return c, nil
}

// GetStoreMetrics fetches the count of each specified Store metric from the
// `metricNames` parameter and returns the result as a map. The keys in the map
// represent the metric metadata names, while the corresponding values indicate
// the count of each metric. If any of the specified metric cannot be found or
// is not a counter, the function will return an error.
//
// Assumption: 1. The metricNames parameter should consist of string literals
// that match the metadata names used for metric counters. 2. Each metric name
// provided in `metricNames` must exist, unique and be a counter type.
func (sm *StoreMetrics) GetStoreMetrics(metricsNames []string) (map[string]int64, error) {
metrics := make(map[string]int64)
for _, metricName := range metricsNames {
count, err := sm.GetStoreMetric(metricName)
if err != nil {
return map[string]int64{}, errors.Errorf("cannot find metric for %s", metricName)
}
metrics[metricName] = count
}
return metrics, nil
}

func NewTestStorePool(cfg StoreConfig) *storepool.StorePool {
liveness.TimeUntilNodeDead.Override(context.Background(), &cfg.Settings.SV, liveness.TestTimeUntilNodeDeadOff)
return storepool.NewStorePool(
Expand Down Expand Up @@ -603,3 +649,16 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) {
}
}
}

// getMapsDiff returns the difference between the values of corresponding
// metrics in two maps. Assumption: beforeMap and afterMap contain the same set
// of keys.
func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 {
diffMap := make(map[string]int64)
for metricName, beforeValue := range beforeMap {
if v, ok := afterMap[metricName]; ok {
diffMap[metricName] = v - beforeValue
}
}
return diffMap
}
124 changes: 115 additions & 9 deletions pkg/kv/kvserver/metrics.go
Expand Up @@ -1381,6 +1381,59 @@ handling consumes writes.
Unit: metric.Unit_BYTES,
}

metaRaftRcvdBytes = metric.Metadata{
Name: "raft.rcvd.bytes",
Help: `Number of bytes in Raft messages received by this store. Note
that this does not include raft snapshot received.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRaftRcvdCrossRegionBytes = metric.Metadata{
Name: "raft.rcvd.cross_region.bytes",
Help: `Number of bytes received by this store for cross region Raft messages
(when region tiers are configured). Note that this does not include raft
snapshot received.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRaftRcvdCrossZoneBytes = metric.Metadata{
Name: "raft.rcvd.cross_zone.bytes",
Help: `Number of bytes received by this store for cross zone, same region
Raft messages (when region and zone tiers are configured). If region tiers
are not configured, this count may include data sent between different
regions. To ensure accurate monitoring of transmitted data, it is important
to set up a consistent locality configuration across nodes. Note that this
does not include raft snapshot received.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRaftSentBytes = metric.Metadata{
Name: "raft.sent.bytes",
Help: `Number of bytes in Raft messages sent by this store. Note that
this does not include raft snapshot sent.`,
Measurement: "Bytes",
wenyihu6 marked this conversation as resolved.
Show resolved Hide resolved
Unit: metric.Unit_BYTES,
}
metaRaftSentCrossRegionBytes = metric.Metadata{
Name: "raft.sent.cross_region.bytes",
Help: `Number of bytes sent by this store for cross region Raft messages
(when region tiers are configured). Note that this does not include raft
snapshot sent.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRaftSentCrossZoneBytes = metric.Metadata{
Name: "raft.sent.cross_zone.bytes",
Help: `Number of bytes sent by this store for cross zone, same region Raft
messages (when region and zone tiers are configured). If region tiers are
not configured, this count may include data sent between different regions.
To ensure accurate monitoring of transmitted data, it is important to set up
a consistent locality configuration across nodes. Note that this does not
include raft snapshot sent.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}

metaRaftCoalescedHeartbeatsPending = metric.Metadata{
Name: "raft.heartbeats.pending",
Help: "Number of pending heartbeats and responses waiting to be coalesced",
Expand Down Expand Up @@ -2290,11 +2343,17 @@ type StoreMetrics struct {
// Raft message metrics.
//
// An array for conveniently finding the appropriate metric.
RaftRcvdMessages [maxRaftMsgType + 1]*metric.Counter
RaftRcvdDropped *metric.Counter
RaftRcvdDroppedBytes *metric.Counter
RaftRcvdQueuedBytes *metric.Gauge
RaftRcvdSteppedBytes *metric.Counter
RaftRcvdMessages [maxRaftMsgType + 1]*metric.Counter
RaftRcvdDropped *metric.Counter
RaftRcvdDroppedBytes *metric.Counter
RaftRcvdQueuedBytes *metric.Gauge
RaftRcvdSteppedBytes *metric.Counter
RaftRcvdBytes *metric.Counter
RaftRcvdCrossRegionBytes *metric.Counter
RaftRcvdCrossZoneBytes *metric.Counter
RaftSentBytes *metric.Counter
RaftSentCrossRegionBytes *metric.Counter
RaftSentCrossZoneBytes *metric.Counter

// Raft log metrics.
RaftLogFollowerBehindCount *metric.Gauge
Expand Down Expand Up @@ -2962,10 +3021,16 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
raftpb.MsgTransferLeader: metric.NewCounter(metaRaftRcvdTransferLeader),
raftpb.MsgTimeoutNow: metric.NewCounter(metaRaftRcvdTimeoutNow),
},
RaftRcvdDropped: metric.NewCounter(metaRaftRcvdDropped),
RaftRcvdDroppedBytes: metric.NewCounter(metaRaftRcvdDroppedBytes),
RaftRcvdQueuedBytes: metric.NewGauge(metaRaftRcvdQueuedBytes),
RaftRcvdSteppedBytes: metric.NewCounter(metaRaftRcvdSteppedBytes),
RaftRcvdDropped: metric.NewCounter(metaRaftRcvdDropped),
RaftRcvdDroppedBytes: metric.NewCounter(metaRaftRcvdDroppedBytes),
RaftRcvdQueuedBytes: metric.NewGauge(metaRaftRcvdQueuedBytes),
RaftRcvdSteppedBytes: metric.NewCounter(metaRaftRcvdSteppedBytes),
RaftRcvdBytes: metric.NewCounter(metaRaftRcvdBytes),
RaftRcvdCrossRegionBytes: metric.NewCounter(metaRaftRcvdCrossRegionBytes),
RaftRcvdCrossZoneBytes: metric.NewCounter(metaRaftRcvdCrossZoneBytes),
RaftSentBytes: metric.NewCounter(metaRaftSentBytes),
RaftSentCrossRegionBytes: metric.NewCounter(metaRaftSentCrossRegionBytes),
RaftSentCrossZoneBytes: metric.NewCounter(metaRaftSentCrossZoneBytes),

// Raft log metrics.
RaftLogFollowerBehindCount: metric.NewGauge(metaRaftLogFollowerBehindCount),
Expand Down Expand Up @@ -3272,6 +3337,47 @@ func (sm *StoreMetrics) updateCrossLocalityMetricsOnSnapshotRcvd(
}
}

// updateCrossLocalityMetricsOnIncomingRaftMsg updates store metrics for raft
// messages that have been received via HandleRaftRequest. In the cases of
// messages containing heartbeats or heartbeat_resps, they capture the byte
// count of requests with coalesced heartbeats before any uncoalescing happens.
// The metrics being updated include 1. total byte count of messages received 2.
// cross-region metrics, which monitor activities across different regions, and
// 3. cross-zone metrics, which monitor activities across different zones within
// the same region or in cases where region tiers are not configured.
func (sm *StoreMetrics) updateCrossLocalityMetricsOnIncomingRaftMsg(
comparisonResult roachpb.LocalityComparisonType, msgSize int64,
) {
sm.RaftRcvdBytes.Inc(msgSize)
switch comparisonResult {
case roachpb.LocalityComparisonType_CROSS_REGION:
sm.RaftRcvdCrossRegionBytes.Inc(msgSize)
case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE:
sm.RaftRcvdCrossZoneBytes.Inc(msgSize)
}
}

// updateCrossLocalityMetricsOnOutgoingRaftMsg updates store metrics for raft
// messages that are about to be sent via raftSendQueue. In the cases of
// messages containing heartbeats or heartbeat_resps, they capture the byte
// count of requests with coalesced heartbeats. The metrics being updated
// include 1. total byte count of messages sent 2. cross-region metrics, which
// monitor activities across different regions, and 3. cross-zone metrics, which
// monitor activities across different zones within the same region or in cases
// where region tiers are not configured. Note that these metrics may include
// messages that get dropped by `SendAsync` due to a full outgoing queue.
func (sm *StoreMetrics) updateCrossLocalityMetricsOnOutgoingRaftMsg(
comparisonResult roachpb.LocalityComparisonType, msgSize int64,
) {
sm.RaftSentBytes.Inc(msgSize)
switch comparisonResult {
case roachpb.LocalityComparisonType_CROSS_REGION:
sm.RaftSentCrossRegionBytes.Inc(msgSize)
case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE:
sm.RaftSentCrossZoneBytes.Inc(msgSize)
}
}

func (sm *StoreMetrics) updateEnvStats(stats storage.EnvStats) {
sm.EncryptionAlgorithm.Update(int64(stats.EncryptionType))
}
Expand Down