Skip to content

Commit

Permalink
Merge #112993
Browse files Browse the repository at this point in the history
112993: kvclient: sort by region before sorting by latency r=nvanbenschoten a=andrewbaptist

Follower reads attempt to be efficient by sorting to the nearest follower first. The nearest follower can be either the closest by latency or by locality. There is a complex relationship between dollar cost, latency and throughput for different nodes, and just sorting by latency is not ideal in many cases. This PR changes the behavior to first sort by the locality definition and then the latency. In most cases this won't make a difference to the user, but in some cases this behavior can be better.

Epic: none

Release note (performance improvement): Sorting by locality can improve the performance of follower reads.

Co-authored-by: Andrew Baptist <baptist@cockroachlabs.com>
  • Loading branch information
craig[bot] and andrewbaptist committed Nov 16, 2023
2 parents 009476b + 50dab16 commit 23d8585
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 28 deletions.
11 changes: 11 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,17 @@ var FollowerReadsUnhealthy = settings.RegisterBoolSetting(
true,
)

// sortByLocalityFirst controls whether we sort by locality before sorting by
// latency. If it is set to false we will only look at the latency values.
// TODO(baptist): Remove this in 25.1 once we have validated that we don't need
// to fall back to the previous behavior of only sorting by latency.
var sortByLocalityFirst = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"kv.dist_sender.sort_locality_first.enabled",
"sort followers by locality before sorting by latency",
true,
)

func max(a, b int64) int64 {
if a > b {
return a
Expand Down
29 changes: 14 additions & 15 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5398,17 +5398,21 @@ func TestDistSenderComputeNetworkCost(t *testing.T) {
return desc
}

makeLocality := func(region string) roachpb.Locality {
return roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "az", Value: fmt.Sprintf("az%d", rand.Intn(10))},
{Key: "region", Value: region},
{Key: "dc", Value: fmt.Sprintf("dc%d", rand.Intn(10))},
},
}
}

makeNodeDescriptor := func(nodeID int, region string) roachpb.NodeDescriptor {
return roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(nodeID),
Address: util.UnresolvedAddr{},
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "az", Value: fmt.Sprintf("az%d", rand.Intn(10))},
{Key: "region", Value: region},
{Key: "dc", Value: fmt.Sprintf("dc%d", rand.Intn(10))},
},
},
NodeID: roachpb.NodeID(nodeID),
Address: util.UnresolvedAddr{},
Locality: makeLocality(region),
}
}

Expand All @@ -5417,12 +5421,7 @@ func TestDistSenderComputeNetworkCost(t *testing.T) {
ReplicaDescriptor: roachpb.ReplicaDescriptor{
ReplicaID: roachpb.ReplicaID(replicaID),
},
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "az", Value: fmt.Sprintf("az%d", rand.Intn(10))},
{Key: "region", Value: region},
{Key: "dc", Value: fmt.Sprintf("dc%d", rand.Intn(10))},
}},
Locality: makeLocality(region),
}
}

Expand Down
22 changes: 18 additions & 4 deletions pkg/kv/kvclient/kvcoord/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ func (rs ReplicaSlice) OptimizeReplicaOrder(
shuffle.Shuffle(rs)
return
}
followerReadsUnhealthy := FollowerReadsUnhealthy.Get(&st.SV)
sortByLocalityFirst := sortByLocalityFirst.Get(&st.SV)
// Populate the health, tier match length and locality before the sort loop.
for i := range rs {
rs[i].tierMatchLength = locality.SharedPrefix(rs[i].Locality)
Expand All @@ -224,7 +226,7 @@ func (rs ReplicaSlice) OptimizeReplicaOrder(
}
}

if !FollowerReadsUnhealthy.Get(&st.SV) {
if !followerReadsUnhealthy {
rs[i].healthy = healthFn(rs[i].NodeID)
}
}
Expand All @@ -236,15 +238,27 @@ func (rs ReplicaSlice) OptimizeReplicaOrder(
return rs[i].healthy
}

// If the region is different choose the closer one.
// If the setting is true(default) consider locality before latency.
if sortByLocalityFirst {
// If the region is different choose the closer one.
if rs[i].tierMatchLength != rs[j].tierMatchLength {
return rs[i].tierMatchLength > rs[j].tierMatchLength
}
}

// Use latency if they are different. The local node has a latency of -1
// so will sort before any other node.
if rs[i].latency != rs[j].latency {
return rs[i].latency < rs[j].latency
}

// If the region is different choose the closer one.
if rs[i].tierMatchLength != rs[j].tierMatchLength {
return rs[i].tierMatchLength > rs[j].tierMatchLength
// If the setting is false, sort locality after latency.
if !sortByLocalityFirst {
// If the region is different choose the closer one.
if rs[i].tierMatchLength != rs[j].tierMatchLength {
return rs[i].tierMatchLength > rs[j].tierMatchLength
}
}

// If everything else is equal sort by node id.
Expand Down
46 changes: 43 additions & 3 deletions pkg/kv/kvclient/kvcoord/replica_slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
// only identified by their node. If multiple replicas are on different
// stores of the same node, the node only appears once in this list (as the
// ordering between replicas on the same node is not deterministic).
expOrdered []roachpb.NodeID
expOrdered []roachpb.NodeID
dontSortByLocalityFirst bool
}{
{
name: "order by locality matching",
Expand Down Expand Up @@ -226,7 +227,24 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
expOrdered: []roachpb.NodeID{2, 3, 1, 4},
},
{
name: "order by latency",
name: "order by latency only",
nodeID: 1,
locality: locality(t, []string{"country=us"}),
latencies: map[roachpb.NodeID]time.Duration{
2: time.Hour,
3: time.Minute,
4: time.Second,
},
slice: ReplicaSlice{
info(t, 2, 2, []string{"country=us"}),
info(t, 4, 4, []string{"country=us"}),
info(t, 4, 44, []string{"country=us"}),
info(t, 3, 3, []string{"country=us"}),
},
expOrdered: []roachpb.NodeID{4, 3, 2},
},
{
name: "order by locality then latency",
nodeID: 1,
locality: locality(t, []string{"country=us", "region=west", "city=la"}),
latencies: map[roachpb.NodeID]time.Duration{
Expand All @@ -240,7 +258,25 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
info(t, 4, 44, []string{"country=us", "region=east", "city=ny"}),
info(t, 3, 3, []string{"country=uk", "city=london"}),
},
expOrdered: []roachpb.NodeID{4, 3, 2},
expOrdered: []roachpb.NodeID{2, 4, 3},
},
{
name: "disable locality setting",
nodeID: 1,
locality: locality(t, []string{"country=us", "region=west", "city=la"}),
latencies: map[roachpb.NodeID]time.Duration{
2: time.Hour,
3: time.Minute,
4: time.Second,
},
slice: ReplicaSlice{
info(t, 2, 2, []string{"country=us", "region=west", "city=sf"}),
info(t, 4, 4, []string{"country=us", "region=east", "city=ny"}),
info(t, 4, 44, []string{"country=us", "region=east", "city=ny"}),
info(t, 3, 3, []string{"country=uk", "city=london"}),
},
expOrdered: []roachpb.NodeID{4, 3, 2},
dontSortByLocalityFirst: true,
},
{
// Test that replicas on the local node sort first, regardless of factors
Expand Down Expand Up @@ -270,6 +306,10 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
// TODO(baptist): Remove this if we make this the default.
FollowerReadsUnhealthy.Override(context.Background(), &st.SV, false)
if test.dontSortByLocalityFirst {
sortByLocalityFirst.Override(context.Background(), &st.SV, false)
}

var latencyFn LatencyFunc
if test.latencies != nil {
latencyFn = func(id roachpb.NodeID) (time.Duration, bool) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/physicalplan/replicaoracle/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ func TestClosest(t *testing.T) {
Locality: nd2.Locality, // pretend node 2 is closest.
Settings: cluster.MakeTestingClusterSettings(),
HealthFunc: func(_ roachpb.NodeID) bool { return true },
LatencyFunc: func(id roachpb.NodeID) (time.Duration, bool) {
if id == 2 {
return time.Nanosecond, validLatencyFunc
}
return time.Millisecond, validLatencyFunc
},
})
o.(*closestOracle).latencyFunc = func(id roachpb.NodeID) (time.Duration, bool) {
if id == 2 {
return time.Nanosecond, validLatencyFunc
}
return time.Millisecond, validLatencyFunc
}
internalReplicas := []roachpb.ReplicaDescriptor{
{NodeID: 4, StoreID: 4},
{NodeID: 2, StoreID: 2},
Expand Down

0 comments on commit 23d8585

Please sign in to comment.