Skip to content

Commit

Permalink
kvserver: add x-region, x-zone Raft msg metrics to Store
Browse files Browse the repository at this point in the history
Previously, there were no metrics to observe cross-region, cross-zone traffic in
raft messages requests sent / received at each store.

To improve this issue, this commit adds six new store metrics -

```
"raft.rcvd.bytes"
"raft.sent.bytes"
"raft.rcvd.cross_region.bytes"
"raft.sent.cross_region.bytes"
"raft.rcvd.cross_zone.bytes"
"raft.sent.cross_zone.bytes"
```

The first two metrics track the total byte of raft messages received and sent in
a store. Additionally, there are four metrics to track the aggregate byte count
of cross-region, cross-zone Raft messages sent and received in the store.

Note that these metrics capture the byte count of requests immediately upon
message reception and just prior to message transmission. In the case of
messages containing heartbeats or heartbeat_resps, they capture the byte count
of requests with coalesced heartbeats.

To facilitate metrics updating, this commit also introduces a new raft message
handler interface `OutgoingRaftMessageHandler`. This interface captures outgoing
messages right before they are sent to `raftSendQueue`. Note that the message
may not be successfully queued if the outgoing queue is full.

Resolves: #103983

Release note (ops change): Six new metrics -
"raft.rcvd.bytes"
"raft.sent.bytes"
"raft.rcvd.cross_region.bytes"
"raft.sent.cross_region.bytes"
"raft.rcvd.cross_zone.bytes"
"raft.sent.cross_zone.bytes" - have now been added for each store.

For accurate metrics, follow these assumptions:
- Configure region and zone tier keys consistently across nodes.
- Within a node locality, ensure unique region and zone tier keys.
- Maintain consistent configuration of region and zone tiers across nodes.
  • Loading branch information
wenyihu6 committed Jun 30, 2023
1 parent 66c9f93 commit 5479d92
Show file tree
Hide file tree
Showing 10 changed files with 460 additions and 29 deletions.
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/client_raft_helpers_test.go
Expand Up @@ -454,3 +454,16 @@ func dropRaftMessagesFrom(
},
})
}

// getMapsDiff returns the difference between the values of corresponding
// metrics in two maps. Assumption: beforeMap and afterMap contain the same set
// of keys.
func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 {
diffMap := make(map[string]int64)
for metricName, beforeValue := range beforeMap {
if v, ok := afterMap[metricName]; ok {
diffMap[metricName] = v - beforeValue
}
}
return diffMap
}
98 changes: 98 additions & 0 deletions pkg/kv/kvserver/client_raft_test.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -6835,3 +6836,100 @@ func TestRaftPreVoteUnquiesceDeadLeader(t *testing.T) {
}, 5*time.Second, 500*time.Millisecond)
t.Logf("n2 is leader")
}

// TestStoreMetricsOnIncomingOutgoingMsg verifies that HandleRaftRequest() and
// HandleRaftRequestSent() correctly update metrics for incoming and outgoing
// raft messages.
func TestStoreMetricsOnIncomingOutgoingMsg(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Unix(0, 123)))
cfg := kvserver.TestStoreConfig(clock)
var stopper *stop.Stopper
stopper, _, _, cfg.StorePool, _ = storepool.CreateTestStorePool(ctx, cfg.Settings,
liveness.TestTimeUntilNodeDead, false, /* deterministic */
func() int { return 1 }, /* nodeCount */
livenesspb.NodeLivenessStatus_DEAD)
defer stopper.Stop(ctx)

// Create a noop store and request.
node := roachpb.NodeDescriptor{NodeID: roachpb.NodeID(1)}
eng := storage.NewDefaultInMemForTesting()
stopper.AddCloser(eng)
cfg.Transport = kvserver.NewDummyRaftTransport(cfg.Settings, cfg.AmbientCtx.Tracer)
store := kvserver.NewStore(ctx, cfg, eng, &node)
store.Ident = &roachpb.StoreIdent{
ClusterID: uuid.Nil,
StoreID: 1,
NodeID: 1,
}
request := &kvserverpb.RaftMessageRequest{
RangeID: 1,
FromReplica: roachpb.ReplicaDescriptor{},
ToReplica: roachpb.ReplicaDescriptor{},
Message: raftpb.Message{
From: 1,
To: 2,
Type: raftpb.MsgTimeoutNow,
Term: 1,
},
}

metricsNames := []string{
"raft.rcvd.bytes",
"raft.rcvd.cross_region.bytes",
"raft.rcvd.cross_zone.bytes",
"raft.sent.bytes",
"raft.sent.cross_region.bytes",
"raft.sent.cross_zone.bytes"}
stream := noopRaftMessageResponseStream{}
expectedSize := int64(request.Size())

t.Run("received raft message", func(t *testing.T) {
before, metricsErr := store.Metrics().GetStoreMetrics(metricsNames)
if metricsErr != nil {
t.Error(metricsErr)
}
if err := store.HandleRaftRequest(context.Background(), request, stream); err != nil {
t.Fatalf("HandleRaftRequest returned err %s", err)
}
after, metricsErr := store.Metrics().GetStoreMetrics(metricsNames)
if metricsErr != nil {
t.Error(metricsErr)
}
actual := getMapsDiff(before, after)
expected := map[string]int64{
"raft.rcvd.bytes": expectedSize,
"raft.rcvd.cross_region.bytes": 0,
"raft.rcvd.cross_zone.bytes": 0,
"raft.sent.bytes": 0,
"raft.sent.cross_region.bytes": 0,
"raft.sent.cross_zone.bytes": 0,
}
require.Equal(t, expected, actual)
})

t.Run("sent raft message", func(t *testing.T) {
before, metricsErr := store.Metrics().GetStoreMetrics(metricsNames)
if metricsErr != nil {
t.Error(metricsErr)
}
store.HandleRaftRequestSent(context.Background(),
request.FromReplica.NodeID, request.ToReplica.NodeID, int64(request.Size()))
after, metricsErr := store.Metrics().GetStoreMetrics(metricsNames)
if metricsErr != nil {
t.Error(metricsErr)
}
actual := getMapsDiff(before, after)
expected := map[string]int64{
"raft.rcvd.bytes": 0,
"raft.rcvd.cross_region.bytes": 0,
"raft.rcvd.cross_zone.bytes": 0,
"raft.sent.bytes": expectedSize,
"raft.sent.cross_region.bytes": 0,
"raft.sent.cross_zone.bytes": 0,
}
require.Equal(t, expected, actual)
})
}
59 changes: 59 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -230,6 +231,51 @@ func (s *Store) RaftSchedulerPriorityIDs() []roachpb.RangeID {
return s.scheduler.PriorityIDs()
}

// GetStoreMetric retrieves the count of the store metric whose metadata name
// matches with the given name parameter. If the specified metric cannot be
// found, the function will return an error.
func (sm *StoreMetrics) GetStoreMetric(name string) (int64, error) {
var c int64
var found bool
sm.registry.Each(func(n string, v interface{}) {
if name == n {
switch t := v.(type) {
case *metric.Counter:
c = t.Count()
found = true
case *metric.Gauge:
c = t.Value()
found = true
}
}
})
if !found {
return -1, errors.Errorf("cannot find metric for %s", name)
}
return c, nil
}

// GetStoreMetrics fetches the count of each specified Store metric from the
// `metricNames` parameter and returns the result as a map. The keys in the map
// represent the metric metadata names, while the corresponding values indicate
// the count of each metric. If any of the specified metric cannot be found or
// is not a counter, the function will return an error.
//
// Assumption: 1. The metricNames parameter should consist of string literals
// that match the metadata names used for metric counters. 2. Each metric name
// provided in `metricNames` must exist, unique and be a counter type.
func (sm *StoreMetrics) GetStoreMetrics(metricsNames []string) (map[string]int64, error) {
metrics := make(map[string]int64)
for _, metricName := range metricsNames {
count, err := sm.GetStoreMetric(metricName)
if err != nil {
return map[string]int64{}, errors.Errorf("cannot find metric for %s", metricName)
}
metrics[metricName] = count
}
return metrics, nil
}

func NewTestStorePool(cfg StoreConfig) *storepool.StorePool {
liveness.TimeUntilNodeDead.Override(context.Background(), &cfg.Settings.SV, liveness.TestTimeUntilNodeDeadOff)
return storepool.NewStorePool(
Expand Down Expand Up @@ -603,3 +649,16 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) {
}
}
}

// getMapsDiff returns the difference between the values of corresponding
// metrics in two maps. Assumption: beforeMap and afterMap contain the same set
// of keys.
func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 {
diffMap := make(map[string]int64)
for metricName, beforeValue := range beforeMap {
if v, ok := afterMap[metricName]; ok {
diffMap[metricName] = v - beforeValue
}
}
return diffMap
}
124 changes: 115 additions & 9 deletions pkg/kv/kvserver/metrics.go
Expand Up @@ -1381,6 +1381,59 @@ handling consumes writes.
Unit: metric.Unit_BYTES,
}

metaRaftRcvdBytes = metric.Metadata{
Name: "raft.rcvd.bytes",
Help: `Number of bytes in Raft messages received by this store. Note
that this does not include raft snapshot received.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRaftRcvdCrossRegionBytes = metric.Metadata{
Name: "raft.rcvd.cross_region.bytes",
Help: `Number of bytes received by this store for cross region Raft messages
(when region tiers are configured). Note that this does not include raft
snapshot received.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRaftRcvdCrossZoneBytes = metric.Metadata{
Name: "raft.rcvd.cross_zone.bytes",
Help: `Number of bytes received by this store for cross zone, same region
Raft messages (when region and zone tiers are configured). If region tiers
are not configured, this count may include data sent between different
regions. To ensure accurate monitoring of transmitted data, it is important
to set up a consistent locality configuration across nodes. Note that this
does not include raft snapshot received.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRaftSentBytes = metric.Metadata{
Name: "raft.sent.bytes",
Help: `Number of bytes in Raft messages sent by this store. Note that
this does not include raft snapshot sent.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRaftSentCrossRegionBytes = metric.Metadata{
Name: "raft.sent.cross_region.bytes",
Help: `Number of bytes sent by this store for cross region Raft messages
(when region tiers are configured). Note that this does not include raft
snapshot sent.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRaftSentCrossZoneBytes = metric.Metadata{
Name: "raft.sent.cross_zone.bytes",
Help: `Number of bytes sent by this store for cross zone, same region Raft
messages (when region and zone tiers are configured). If region tiers are
not configured, this count may include data sent between different regions.
To ensure accurate monitoring of transmitted data, it is important to set up
a consistent locality configuration across nodes. Note that this does not
include raft snapshot sent.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}

metaRaftCoalescedHeartbeatsPending = metric.Metadata{
Name: "raft.heartbeats.pending",
Help: "Number of pending heartbeats and responses waiting to be coalesced",
Expand Down Expand Up @@ -2290,11 +2343,17 @@ type StoreMetrics struct {
// Raft message metrics.
//
// An array for conveniently finding the appropriate metric.
RaftRcvdMessages [maxRaftMsgType + 1]*metric.Counter
RaftRcvdDropped *metric.Counter
RaftRcvdDroppedBytes *metric.Counter
RaftRcvdQueuedBytes *metric.Gauge
RaftRcvdSteppedBytes *metric.Counter
RaftRcvdMessages [maxRaftMsgType + 1]*metric.Counter
RaftRcvdDropped *metric.Counter
RaftRcvdDroppedBytes *metric.Counter
RaftRcvdQueuedBytes *metric.Gauge
RaftRcvdSteppedBytes *metric.Counter
RaftRcvdBytes *metric.Counter
RaftRcvdCrossRegionBytes *metric.Counter
RaftRcvdCrossZoneBytes *metric.Counter
RaftSentBytes *metric.Counter
RaftSentCrossRegionBytes *metric.Counter
RaftSentCrossZoneBytes *metric.Counter

// Raft log metrics.
RaftLogFollowerBehindCount *metric.Gauge
Expand Down Expand Up @@ -2962,10 +3021,16 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
raftpb.MsgTransferLeader: metric.NewCounter(metaRaftRcvdTransferLeader),
raftpb.MsgTimeoutNow: metric.NewCounter(metaRaftRcvdTimeoutNow),
},
RaftRcvdDropped: metric.NewCounter(metaRaftRcvdDropped),
RaftRcvdDroppedBytes: metric.NewCounter(metaRaftRcvdDroppedBytes),
RaftRcvdQueuedBytes: metric.NewGauge(metaRaftRcvdQueuedBytes),
RaftRcvdSteppedBytes: metric.NewCounter(metaRaftRcvdSteppedBytes),
RaftRcvdDropped: metric.NewCounter(metaRaftRcvdDropped),
RaftRcvdDroppedBytes: metric.NewCounter(metaRaftRcvdDroppedBytes),
RaftRcvdQueuedBytes: metric.NewGauge(metaRaftRcvdQueuedBytes),
RaftRcvdSteppedBytes: metric.NewCounter(metaRaftRcvdSteppedBytes),
RaftRcvdBytes: metric.NewCounter(metaRaftRcvdBytes),
RaftRcvdCrossRegionBytes: metric.NewCounter(metaRaftRcvdCrossRegionBytes),
RaftRcvdCrossZoneBytes: metric.NewCounter(metaRaftRcvdCrossZoneBytes),
RaftSentBytes: metric.NewCounter(metaRaftSentBytes),
RaftSentCrossRegionBytes: metric.NewCounter(metaRaftSentCrossRegionBytes),
RaftSentCrossZoneBytes: metric.NewCounter(metaRaftSentCrossZoneBytes),

// Raft log metrics.
RaftLogFollowerBehindCount: metric.NewGauge(metaRaftLogFollowerBehindCount),
Expand Down Expand Up @@ -3272,6 +3337,47 @@ func (sm *StoreMetrics) updateCrossLocalityMetricsOnSnapshotRcvd(
}
}

// updateCrossLocalityMetricsOnIncomingRaftMsg updates store metrics for raft
// messages that have been received via HandleRaftRequest. In the cases of
// messages containing heartbeats or heartbeat_resps, they capture the byte
// count of requests with coalesced heartbeats before any uncoalescing happens.
// The metrics being updated include 1. total byte count of messages received 2.
// cross-region metrics, which monitor activities across different regions, and
// 3. cross-zone metrics, which monitor activities across different zones within
// the same region or in cases where region tiers are not configured.
func (sm *StoreMetrics) updateCrossLocalityMetricsOnIncomingRaftMsg(
comparisonResult roachpb.LocalityComparisonType, msgSize int64,
) {
sm.RaftRcvdBytes.Inc(msgSize)
switch comparisonResult {
case roachpb.LocalityComparisonType_CROSS_REGION:
sm.RaftRcvdCrossRegionBytes.Inc(msgSize)
case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE:
sm.RaftRcvdCrossZoneBytes.Inc(msgSize)
}
}

// updateCrossLocalityMetricsOnOutgoingRaftMsg updates store metrics for raft
// messages that are about to be sent via raftSendQueue. In the cases of
// messages containing heartbeats or heartbeat_resps, they capture the byte
// count of requests with coalesced heartbeats. The metrics being updated
// include 1. total byte count of messages sent 2. cross-region metrics, which
// monitor activities across different regions, and 3. cross-zone metrics, which
// monitor activities across different zones within the same region or in cases
// where region tiers are not configured. Note that these metrics may include
// messages that get dropped by `SendAsync` due to a full outgoing queue.
func (sm *StoreMetrics) updateCrossLocalityMetricsOnOutgoingRaftMsg(
comparisonResult roachpb.LocalityComparisonType, msgSize int64,
) {
sm.RaftSentBytes.Inc(msgSize)
switch comparisonResult {
case roachpb.LocalityComparisonType_CROSS_REGION:
sm.RaftSentCrossRegionBytes.Inc(msgSize)
case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE:
sm.RaftSentCrossZoneBytes.Inc(msgSize)
}
}

func (sm *StoreMetrics) updateEnvStats(stats storage.EnvStats) {
sm.EncryptionAlgorithm.Update(int64(stats.EncryptionType))
}
Expand Down

0 comments on commit 5479d92

Please sign in to comment.