Skip to content

Commit

Permalink
kvclient: sort by region before sorting by latency
Browse files Browse the repository at this point in the history
Follower reads attempt to be efficient by sorting to the nearest
follower first. The nearest follower can be either the closest by
latency or by locality. There is a complex relationship between dollar
cost, latency and throughput for different nodes, and just sorting by
latency may not be ideal in many cases. This PR changes the behavior to
first sort by the locality definition and then the latency.

Specifically where this can matter is when the network latency between
nodes is close to or below the processing time for a PingRequest
including goroutine scheduling, CPU and OS queueing on both sides.

Epic: none

Release note (performance improvement): Sorting by locality can improve
the performance and predictability of follower reads.
  • Loading branch information
andrewbaptist committed Nov 14, 2023
1 parent 7ba9725 commit a2ba551
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 27 deletions.
11 changes: 11 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,17 @@ var FollowerReadsUnhealthy = settings.RegisterBoolSetting(
true,
)

// sortByLocalityFirst controls whether we sort by locality before sorting by
// latency. If it is set to false we will only look at the latency values.
// TODO(baptist): Remove this in 25.1 once we have validated that we don't need
// to fall back to the previous behavior of only sorting by latency.
var sortByLocalityFirst = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"kv.dist_sender.sort_locality_first.enabled",
"sort followers by locality before sorting by latency",
true,
)

func max(a, b int64) int64 {
if a > b {
return a
Expand Down
29 changes: 14 additions & 15 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5398,17 +5398,21 @@ func TestDistSenderComputeNetworkCost(t *testing.T) {
return desc
}

makeLocality := func(region string) roachpb.Locality {
return roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "az", Value: fmt.Sprintf("az%d", rand.Intn(10))},
{Key: "region", Value: region},
{Key: "dc", Value: fmt.Sprintf("dc%d", rand.Intn(10))},
},
}
}

makeNodeDescriptor := func(nodeID int, region string) roachpb.NodeDescriptor {
return roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(nodeID),
Address: util.UnresolvedAddr{},
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "az", Value: fmt.Sprintf("az%d", rand.Intn(10))},
{Key: "region", Value: region},
{Key: "dc", Value: fmt.Sprintf("dc%d", rand.Intn(10))},
},
},
NodeID: roachpb.NodeID(nodeID),
Address: util.UnresolvedAddr{},
Locality: makeLocality(region),
}
}

Expand All @@ -5417,12 +5421,7 @@ func TestDistSenderComputeNetworkCost(t *testing.T) {
ReplicaDescriptor: roachpb.ReplicaDescriptor{
ReplicaID: roachpb.ReplicaID(replicaID),
},
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "az", Value: fmt.Sprintf("az%d", rand.Intn(10))},
{Key: "region", Value: region},
{Key: "dc", Value: fmt.Sprintf("dc%d", rand.Intn(10))},
}},
Locality: makeLocality(region),
}
}

Expand Down
18 changes: 15 additions & 3 deletions pkg/kv/kvclient/kvcoord/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,27 @@ func (rs ReplicaSlice) OptimizeReplicaOrder(
return rs[i].healthy
}

// If the region is different choose the closer one.
// If the setting is true(default) consider locality before latency.
if sortByLocalityFirst.Get(&st.SV) {
// If the region is different choose the closer one.
if rs[i].tierMatchLength != rs[j].tierMatchLength {
return rs[i].tierMatchLength > rs[j].tierMatchLength
}
}

// Use latency if they are different. The local node has a latency of -1
// so will sort before any other node.
if rs[i].latency != rs[j].latency {
return rs[i].latency < rs[j].latency
}

// If the region is different choose the closer one.
if rs[i].tierMatchLength != rs[j].tierMatchLength {
return rs[i].tierMatchLength > rs[j].tierMatchLength
// If the setting is false, sort locality after latency.
if !sortByLocalityFirst.Get(&st.SV) {
// If the region is different choose the closer one.
if rs[i].tierMatchLength != rs[j].tierMatchLength {
return rs[i].tierMatchLength > rs[j].tierMatchLength
}
}

// If everything else is equal sort by node id.
Expand Down
46 changes: 43 additions & 3 deletions pkg/kv/kvclient/kvcoord/replica_slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
// only identified by their node. If multiple replicas are on different
// stores of the same node, the node only appears once in this list (as the
// ordering between replicas on the same node is not deterministic).
expOrdered []roachpb.NodeID
expOrdered []roachpb.NodeID
disableLocality bool
}{
{
name: "order by locality matching",
Expand Down Expand Up @@ -226,7 +227,24 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
expOrdered: []roachpb.NodeID{2, 3, 1, 4},
},
{
name: "order by latency",
name: "order by latency only",
nodeID: 1,
locality: locality(t, []string{"country=us"}),
latencies: map[roachpb.NodeID]time.Duration{
2: time.Hour,
3: time.Minute,
4: time.Second,
},
slice: ReplicaSlice{
info(t, 2, 2, []string{"country=us"}),
info(t, 4, 4, []string{"country=us"}),
info(t, 4, 44, []string{"country=us"}),
info(t, 3, 3, []string{"country=us"}),
},
expOrdered: []roachpb.NodeID{4, 3, 2},
},
{
name: "order by locality then latency",
nodeID: 1,
locality: locality(t, []string{"country=us", "region=west", "city=la"}),
latencies: map[roachpb.NodeID]time.Duration{
Expand All @@ -240,7 +258,25 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
info(t, 4, 44, []string{"country=us", "region=east", "city=ny"}),
info(t, 3, 3, []string{"country=uk", "city=london"}),
},
expOrdered: []roachpb.NodeID{4, 3, 2},
expOrdered: []roachpb.NodeID{2, 4, 3},
},
{
name: "disable locality setting",
nodeID: 1,
locality: locality(t, []string{"country=us", "region=west", "city=la"}),
latencies: map[roachpb.NodeID]time.Duration{
2: time.Hour,
3: time.Minute,
4: time.Second,
},
slice: ReplicaSlice{
info(t, 2, 2, []string{"country=us", "region=west", "city=sf"}),
info(t, 4, 4, []string{"country=us", "region=east", "city=ny"}),
info(t, 4, 44, []string{"country=us", "region=east", "city=ny"}),
info(t, 3, 3, []string{"country=uk", "city=london"}),
},
expOrdered: []roachpb.NodeID{4, 3, 2},
disableLocality: true,
},
{
// Test that replicas on the local node sort first, regardless of factors
Expand Down Expand Up @@ -270,6 +306,10 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
// TODO(baptist): Remove this if we make this the default.
FollowerReadsUnhealthy.Override(context.Background(), &st.SV, false)
if test.disableLocality {
sortByLocalityFirst.Override(context.Background(), &st.SV, false)
}

var latencyFn LatencyFunc
if test.latencies != nil {
latencyFn = func(id roachpb.NodeID) (time.Duration, bool) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/physicalplan/replicaoracle/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ func TestClosest(t *testing.T) {
Locality: nd2.Locality, // pretend node 2 is closest.
Settings: cluster.MakeTestingClusterSettings(),
HealthFunc: func(_ roachpb.NodeID) bool { return true },
LatencyFunc: func(id roachpb.NodeID) (time.Duration, bool) {
if id == 2 {
return time.Nanosecond, validLatencyFunc
}
return time.Millisecond, validLatencyFunc
},
})
o.(*closestOracle).latencyFunc = func(id roachpb.NodeID) (time.Duration, bool) {
if id == 2 {
return time.Nanosecond, validLatencyFunc
}
return time.Millisecond, validLatencyFunc
}
internalReplicas := []roachpb.ReplicaDescriptor{
{NodeID: 4, StoreID: 4},
{NodeID: 2, StoreID: 2},
Expand Down

0 comments on commit a2ba551

Please sign in to comment.