Skip to content

Commit

Permalink
kvserver: add cross-zone snapshot byte metrics to StoreMetrics
Browse files Browse the repository at this point in the history
Previously, we [added](#104111) cross-region snapshot byte metrics to track the
aggregate of snapshot bytes sent from and received at a store across different
regions. We should add metrics to track cross-zone snapshots as well.

To improve this issue, this commit adds two new store metrics -
```
range.snapshots.cross-zone.sent-bytes
range.snapshots.cross-zone.rcvd-bytes
```
These metrics track the aggregate of snapshot bytes sent from and received at a
store across different zones within the same region if the zone and region tier
keys are properly configured across nodes.

To ensure accurate metrics and consistent error logging, it is important to
follow the assumption when configuring locality tiers across nodes:
1. For cross-region metrics, region tier keys must be present.
2. For cross-zone metrics, zone tier keys must be present. It is also essential
to maintain consistency in the zone tier key across nodes. (e.g. using different
keys, such as “az” and “zone”, can lead to unexpected behavior).
3. Within a node locality, region and zone tier keys should be unique.
4. Ensure consistent configuration of region and zone tiers across nodes.

If all nodes configure both region and zone tiers:
Cross-region and cross-zone metrics can be used to track following information:
a. Aggregate of cross-region, cross-zone activities:
`range.snapshots.cross-region.(sent|received)-bytes`
b. Aggregate of same-region, cross-zone activities:
`range.snapshots.cross-zone.(sent|received)-bytes`
c. Aggregate of same-region, same-zone activities:
`range.snapshots.(sent|received)-bytes` - a - b
d. Cross-region, same zone activities will be considered as misconfigured, and
an error will be logged.

If all nodes have zone tiers configured, but not regions: cross-zone metrics
will still be accurate. Problems arise if some nodes have region tier keys
configured while others do not.

If all nodes have region tiers configured: cross-region metrics will be accurate
regardless of the nodes’ zone tier setup.

Part of: #103983

Release note (ops change): Two new store metrics -
`range.snapshots.cross-zone.sent-bytes` and
`range.snapshots.cross-zone.rcvd-bytes` - are now added to track the aggregate
of snapshot bytes sent from and received at a store across different zones.

For accurate metrics, follow these assumptions:
- Configure region and zone tier keys consistently across nodes.
- Within a node locality, ensure unique region and zone tier keys.
- Maintain consistent configuration of region and zone tiers across nodes.
  • Loading branch information
wenyihu6 committed Jun 9, 2023
1 parent 94d4083 commit 8f3e328
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 92 deletions.
18 changes: 7 additions & 11 deletions pkg/kv/kvserver/allocator/storepool/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1373,19 +1373,15 @@ func (sp *StorePool) getNodeLocality(nodeID roachpb.NodeID) roachpb.Locality {
return sp.getNodeLocalityWithString(nodeID).locality
}

// IsCrossRegion takes in two replicas and compares the locality of them based
// on their replica node IDs. It returns (bool, error) indicating whether the
// two replicas’ nodes are in different regions and if any errors occurred
// during the lookup process.
func (sp *StorePool) IsCrossRegion(
// IsCrossRegionCrossZone takes in two replicas and compares the locality of
// them based on their replica node IDs. It returns (bool, error, bool, error)
// where the boolean values indicate whether the two replicas' nodes are in
// different regions, different zones, along with any lookup errors.
func (sp *StorePool) IsCrossRegionCrossZone(
firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor,
) (bool, error) {
isCrossRegion, err := sp.getNodeLocality(firstReplica.NodeID).IsCrossRegion(
) (bool, error, bool, error) {
return sp.getNodeLocality(firstReplica.NodeID).IsCrossRegionCrossZone(
sp.getNodeLocality(secReplica.NodeID))
if err != nil {
return false, err
}
return isCrossRegion, nil
}

// IsStoreReadyForRoutineReplicaTransfer returns true iff the store's node is
Expand Down
27 changes: 27 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,29 @@ var (
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRangeSnapShotCrossZoneSentBytes = metric.Metadata{
Name: "range.snapshots.cross-zone.sent-bytes",
Help: `Number of snapshot bytes sent cross zone within same region or if
region tiers are not configured. This count increases for each snapshot sent
between different zones within the same region. However, if the region tiers
are not configured, this count may also include snapshot data sent between
different regions. Ensuring consistent configuration of region and zone
tiers across nodes helps to accurately monitor the data transmitted.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRangeSnapShotCrossZoneRcvdBytes = metric.Metadata{
Name: "range.snapshots.cross-zone.rcvd-bytes",
Help: `Number of snapshot bytes received cross zone within same region or if
region tiers are not configured. This count increases for each snapshot
received between different zones within the same region. However, if the
region tiers are not configured, this count may also include snapshot data
received between different regions. Ensuring consistent configuration of
region and zone tiers across nodes helps to accurately monitor the data
transmitted.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRangeSnapshotSendQueueLength = metric.Metadata{
Name: "range.snapshots.send-queue",
Help: "Number of snapshots queued to send",
Expand Down Expand Up @@ -2195,6 +2218,8 @@ type StoreMetrics struct {
RangeSnapshotRecvUnusable *metric.Counter
RangeSnapShotCrossRegionSentBytes *metric.Counter
RangeSnapShotCrossRegionRcvdBytes *metric.Counter
RangeSnapShotCrossZoneSentBytes *metric.Counter
RangeSnapShotCrossZoneRcvdBytes *metric.Counter

// Range snapshot queue metrics.
RangeSnapshotSendQueueLength *metric.Gauge
Expand Down Expand Up @@ -2819,6 +2844,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RangeSnapshotRecvUnusable: metric.NewCounter(metaRangeSnapshotRecvUnusable),
RangeSnapShotCrossRegionSentBytes: metric.NewCounter(metaRangeSnapShotCrossRegionSentBytes),
RangeSnapShotCrossRegionRcvdBytes: metric.NewCounter(metaRangeSnapShotCrossRegionRcvdBytes),
RangeSnapShotCrossZoneSentBytes: metric.NewCounter(metaRangeSnapShotCrossZoneSentBytes),
RangeSnapShotCrossZoneRcvdBytes: metric.NewCounter(metaRangeSnapShotCrossZoneRcvdBytes),
RangeSnapshotSendQueueLength: metric.NewGauge(metaRangeSnapshotSendQueueLength),
RangeSnapshotRecvQueueLength: metric.NewGauge(metaRangeSnapshotRecvQueueLength),
RangeSnapshotSendInProgress: metric.NewGauge(metaRangeSnapshotSendInProgress),
Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3149,9 +3149,8 @@ func (r *Replica) followerSendSnapshot(
r.store.metrics.DelegateSnapshotSendBytes.Inc(inc)
}
r.store.metrics.RangeSnapshotSentBytes.Inc(inc)
if r.store.shouldIncrementCrossRegionSnapshotMetrics(ctx, req.CoordinatorReplica, req.RecipientReplica) {
r.store.metrics.RangeSnapShotCrossRegionSentBytes.Inc(inc)
}
r.store.updateCrossLocalitySnapshotMetrics(
ctx, req.CoordinatorReplica, req.RecipientReplica, inc, true /* isSent */)

switch header.Priority {
case kvserverpb.SnapshotRequest_RECOVERY:
Expand Down
97 changes: 47 additions & 50 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2188,14 +2188,11 @@ type snapshotBytesMetrics struct {
rcvdBytes int64
}

// getSnapshotBytesMetrics returns metrics on the number of snapshot bytes sent
// and received by a server. tc and serverIdx specify the index of the target
// server on the TestCluster TC. The function returns the total number of
// snapshot bytes sent/received, as well as a map with granular metrics on the
// number of snapshot bytes sent and received for each type of snapshot. The
// return value is of the form (totalBytes, granularMetrics), where totalBytes
// is a `snapshotBytesMetrics` struct containing the total bytes sent/received,
// and granularMetrics is the map mentioned above.
// getSnapshotBytesMetrics retrieves the count of each snapshot metric specified
// in the metricsName associated with the target serverIdx server and returns
// the result as a map. The keys in the map correspond to the strings in input
// metricsName. The corresponding value is a `snapshotBytesMetrics` struct
// containing the total bytes sent/received of the metric.
func getSnapshotBytesMetrics(
t *testing.T, tc *testcluster.TestCluster, serverIdx int, metricsName []string,
) map[string]snapshotBytesMetrics {
Expand All @@ -2216,13 +2213,9 @@ func getSnapshotBytesMetrics(
return metrics
}

// getSnapshotMetricsDiff returns the delta between snapshot byte metrics
// recorded at different times. Metrics can be recorded using the
// getSnapshotBytesMetrics helper function, and the delta is returned in the
// form (totalBytes, granularMetrics). totalBytes is a
// snapshotBytesMetrics struct containing the difference in total bytes
// sent/received, and granularMetrics is the map of snapshotBytesMetrics structs
// containing deltas for each type of snapshot.
// getSnapshotMetricsDiff returns the difference between the values of
// corresponding snapshot metrics in two maps. Assumption: beforeMap and
// afterMap contain the same set of keys.
func getSnapshotMetricsDiff(
beforeMap map[string]snapshotBytesMetrics, afterMap map[string]snapshotBytesMetrics,
) map[string]snapshotBytesMetrics {
Expand Down Expand Up @@ -2296,12 +2289,12 @@ func getExpectedSnapshotSizeBytes(
}

// This test verifies the accuracy of snapshot metrics -
// `range.snapshots.[rebalancing|cross-region].rcvd-bytes` and
// `range.snapshots.[rebalancing|cross-region].sent-bytes`. It involves adding
// two new replicas on different nodes within the cluster, resulting in two
// learner snapshots sent cross region. The test then compares the metrics prior
// to and after sending the snapshot to verify the accuracy.
func TestRebalancingAndCrossRegionSnapshotMetrics(t *testing.T) {
// `range.snapshots.[rebalancing|cross-region|cross-zone].rcvd-bytes` and
// `range.snapshots.[rebalancing|cross-region|cross-zone].sent-bytes`. It
// involves adding two new replicas on different nodes within the cluster,
// resulting in two learner snapshots sent across. The test then compares the
// metrics prior to and after sending the snapshot to verify the accuracy.
func TestRebalancingAndCrossRegionZoneSnapshotMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -2325,25 +2318,22 @@ func TestRebalancingAndCrossRegionSnapshotMetrics(t *testing.T) {
}

// The initial setup ensures the correct configuration for three nodes (with
// different localities), single-range. Note that server[2] is configured
// without the inclusion of a "region" tier key.
// different localities), single-range.
const numNodes = 3
serverArgs := make(map[int]base.TestServerArgs)

// The servers localities are configured so that the first snapshot sent from
// server0 to server1 is cross-region. The second snapshot sent from server0
// to server2 is cross-zone within same region.
serverLocality := [numNodes]roachpb.Locality{
{Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}},
{Tiers: []roachpb.Tier{{Key: "region", Value: "us-west"}, {Key: "az", Value: "us-west-1"}}},
{Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-2"}}},
}
for i := 0; i < numNodes; i++ {
if i == 2 {
serverArgs[i] = base.TestServerArgs{
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "zone", Value: fmt.Sprintf("us-east-%va", i)}},
},
Knobs: knobs,
}
} else {
serverArgs[i] = base.TestServerArgs{
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: fmt.Sprintf("us-east-%v", i)}},
},
Knobs: knobs,
}
serverArgs[i] = base.TestServerArgs{
Locality: serverLocality[i],
Knobs: knobs,
}
}

Expand Down Expand Up @@ -2393,16 +2383,16 @@ func TestRebalancingAndCrossRegionSnapshotMetrics(t *testing.T) {
return snapshotLength
}

metrics := []string{".rebalancing", ".recovery", ".unknown", ".cross-region", ""}
metrics := []string{".rebalancing", ".recovery", ".unknown", ".cross-region", ".cross-zone", ""}
// Record the snapshot metrics before anything has been sent / received.
senderBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */, metrics)
firstReceiverBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */, metrics)
secReceiverBefore := getSnapshotBytesMetrics(t, tc, 2 /* serverIdx */, metrics)

// The first replica is added as a non-voter to help avoid issues in stress
// testing. A possible explanation is - if the first replica was added as a
// non-voter, it can be stuck in a state to receive the snapshot. This can
// cause failure to reach quorum during the second snapshot transfer.
// The first replica is added as a non-voter to help avoid failure in stress
// testing. A possible explanation in the failure is - if the first replica
// was added as a voter, it can be stuck in a state to receive the snapshot.
// This can cause failure to reach quorum during the second snapshot transfer.
firstSnapshotLength := sendSnapshotToServer(1, tc.AddNonVoters)
secSnapshotLength := sendSnapshotToServer(2, tc.AddVoters)
totalSnapshotLength := firstSnapshotLength + secSnapshotLength
Expand All @@ -2418,10 +2408,13 @@ func TestRebalancingAndCrossRegionSnapshotMetrics(t *testing.T) {
".rebalancing": {sentBytes: totalSnapshotLength, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
// Assert that the cross-region metrics should remain unchanged (since
// server[2]'s locality does not include a "region" tier key).
// The first snapshot was sent from server0 to server1, so it is
// cross-region.
".cross-region": {sentBytes: firstSnapshotLength, rcvdBytes: 0},
"": {sentBytes: totalSnapshotLength, rcvdBytes: 0},
// The second snapshot was sent from server0 to server2, so it is
// cross-zone within same region.
".cross-zone": {sentBytes: secSnapshotLength, rcvdBytes: 0},
"": {sentBytes: totalSnapshotLength, rcvdBytes: 0},
}
require.Equal(t, senderExpected, senderDelta)
})
Expand All @@ -2432,10 +2425,13 @@ func TestRebalancingAndCrossRegionSnapshotMetrics(t *testing.T) {
firstReceiverMetricsAfter := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */, metrics)
firstReceiverDelta := getSnapshotMetricsDiff(firstReceiverBefore, firstReceiverMetricsAfter)
firstReceiverExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: firstSnapshotLength},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
".rebalancing": {sentBytes: 0, rcvdBytes: firstSnapshotLength},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
// The first snapshot was sent from server0 to server1, so it is
// cross-region.
".cross-region": {sentBytes: 0, rcvdBytes: firstSnapshotLength},
".cross-zone": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: 0, rcvdBytes: firstSnapshotLength},
}
require.Equal(t, firstReceiverExpected, firstReceiverDelta)
Expand All @@ -2450,9 +2446,10 @@ func TestRebalancingAndCrossRegionSnapshotMetrics(t *testing.T) {
".rebalancing": {sentBytes: 0, rcvdBytes: secSnapshotLength},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
// Assert that the cross-region metrics should remain unchanged (since
// server[2]'s locality does not include a "region" tier key).
// The second snapshot was sent from server0 to server2, so it is
// cross-zone within same region.
".cross-region": {sentBytes: 0, rcvdBytes: 0},
".cross-zone": {sentBytes: 0, rcvdBytes: secSnapshotLength},
"": {sentBytes: 0, rcvdBytes: secSnapshotLength},
}
require.Equal(t, secReceiverExpected, secReceiverDelta)
Expand Down
68 changes: 54 additions & 14 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,17 +972,60 @@ func (s *Store) checkSnapshotOverlapLocked(
return nil
}

// shouldIncrementCrossRegionSnapshotMetrics returns true if the two replicas
// given are cross-region, and false otherwise.
func (s *Store) shouldIncrementCrossRegionSnapshotMetrics(
// shouldIncrementCrossLocalitySnapshotMetrics returns (bool, bool) - indicating
// if the two given replicas are cross-region and cross-zone respectively.
func (s *Store) shouldIncrementCrossLocalitySnapshotMetrics(
ctx context.Context, firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor,
) bool {
isCrossRegion, err := s.cfg.StorePool.IsCrossRegion(firstReplica, secReplica)
if err != nil {
log.VEventf(ctx, 2, "unable to determine if snapshot is cross region %v", err)
return false
) (bool, bool) {
isCrossRegion, regionErr, isCrossZone, zoneErr := s.cfg.StorePool.IsCrossRegionCrossZone(
firstReplica, secReplica)
if regionErr != nil {
log.VEventf(ctx, 2, "unable to determine if snapshot is cross region %v", regionErr)
}
if zoneErr != nil {
log.VEventf(ctx, 2, "unable to determine if snapshot is cross zone %v", zoneErr)
}
return isCrossRegion, isCrossZone
}

// updateCrossLocalitySnapshotMetrics updates the snapshot metrics in a more
// meaningful way. Cross-region metrics monitor activities across different
// regions. Cross-zone metrics monitor any cross-zone activities within the same
// region or if the region tiers are not configured.
func (s *Store) updateCrossLocalitySnapshotMetrics(
ctx context.Context,
firstReplica roachpb.ReplicaDescriptor,
secReplica roachpb.ReplicaDescriptor,
inc int64,
isSent bool,
) {
isCrossRegion, isCrossZone := s.shouldIncrementCrossLocalitySnapshotMetrics(ctx, firstReplica, secReplica)
if isSent {
if isCrossRegion {
if !isCrossZone {
log.VEventf(ctx, 2, "unexpected: cross region but same zone")
} else {
s.metrics.RangeSnapShotCrossRegionSentBytes.Inc(inc)
}
} else {
if isCrossZone {
s.metrics.RangeSnapShotCrossZoneSentBytes.Inc(inc)
}
}
} else {
// isReceived
if isCrossRegion {
if !isCrossZone {
log.VEventf(ctx, 2, "unexpected: cross region but same zone")
} else {
s.metrics.RangeSnapShotCrossRegionRcvdBytes.Inc(inc)
}
} else {
if isCrossZone {
s.metrics.RangeSnapShotCrossZoneRcvdBytes.Inc(inc)
}
}
}
return isCrossRegion
}

// receiveSnapshot receives an incoming snapshot via a pre-opened GRPC stream.
Expand Down Expand Up @@ -1101,11 +1144,8 @@ func (s *Store) receiveSnapshot(

recordBytesReceived := func(inc int64) {
s.metrics.RangeSnapshotRcvdBytes.Inc(inc)

if s.shouldIncrementCrossRegionSnapshotMetrics(
ctx, header.RaftMessageRequest.FromReplica, header.RaftMessageRequest.ToReplica) {
s.metrics.RangeSnapShotCrossRegionRcvdBytes.Inc(inc)
}
s.updateCrossLocalitySnapshotMetrics(
ctx, header.RaftMessageRequest.FromReplica, header.RaftMessageRequest.ToReplica, inc, false /* isSent */)

switch header.Priority {
case kvserverpb.SnapshotRequest_RECOVERY:
Expand Down
Loading

0 comments on commit 8f3e328

Please sign in to comment.