Skip to content

Commit

Permalink
kvclient: sort by region before sorting by latency
Browse files Browse the repository at this point in the history
Follower reads attempt to be efficient by sorting to the nearest
follower first. The nearest follower can be either the closest by
latency or by locality. There is a complex relationship between dollar
cost, latency and throughput for different nodes, and just sorting by
latency is not ideal in many cases. This PR changes the behavior to
first sort by the locality definition and then the latency. In most
cases this won't make a difference to the user, but in some cases this
behavior can be better.

Epic: none

Release note (performance improvement): Sorting by locality can improve
the performance of follower reads.
  • Loading branch information
andrewbaptist committed Nov 2, 2023
1 parent 3e32c60 commit 6d1bada
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 53 deletions.
15 changes: 13 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,17 @@ var senderConcurrencyLimit = settings.RegisterIntSetting(
settings.NonNegativeInt,
)

// sortByLocalityFirst controls whether we sort by locality before sorting by
// latency. If it is set to false we will only look at the latency values.
// TODO(baptist): Remove this in 25.1 once we have validated that we don't need
// to fall back to the previous behavior of only sorting by latency.
var sortByLocalityFirst = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"kv.dist_sender.sort_locality_first.enabled",
"sort followers by locality before sorting by latency",
true,
)

func max(a, b int64) int64 {
if a > b {
return a
Expand Down Expand Up @@ -2240,7 +2251,7 @@ func (ds *DistSender) sendToReplicas(
// First order by latency, then move the leaseholder to the front of the
// list, if it is known.
if !ds.dontReorderReplicas {
replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.latencyFunc, ds.locality)
replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.latencyFunc, ds.locality)
}

idx := -1
Expand All @@ -2259,7 +2270,7 @@ func (ds *DistSender) sendToReplicas(
case kvpb.RoutingPolicy_NEAREST:
// Order by latency.
log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required")
replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), ds.latencyFunc, ds.locality)
replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), ds.latencyFunc, ds.locality)

default:
log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ func newTransportForRange(
if err != nil {
return nil, err
}
replicas.OptimizeReplicaOrder(ds.nodeIDGetter(), latencyFn, ds.locality)
replicas.OptimizeReplicaOrder(ds.st, ds.nodeIDGetter(), latencyFn, ds.locality)
opts := SendOptions{class: connectionClass(&ds.st.SV)}
return ds.transportFactory(opts, ds.nodeDialer, replicas)
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5235,17 +5235,21 @@ func TestDistSenderComputeNetworkCost(t *testing.T) {
return desc
}

makeLocality := func(region string) roachpb.Locality {
return roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "az", Value: fmt.Sprintf("az%d", rand.Intn(10))},
{Key: "region", Value: region},
{Key: "dc", Value: fmt.Sprintf("dc%d", rand.Intn(10))},
},
}
}

makeNodeDescriptor := func(nodeID int, region string) roachpb.NodeDescriptor {
return roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(nodeID),
Address: util.UnresolvedAddr{},
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "az", Value: fmt.Sprintf("az%d", rand.Intn(10))},
{Key: "region", Value: region},
{Key: "dc", Value: fmt.Sprintf("dc%d", rand.Intn(10))},
},
},
NodeID: roachpb.NodeID(nodeID),
Address: util.UnresolvedAddr{},
Locality: makeLocality(region),
}
}

Expand All @@ -5254,11 +5258,7 @@ func TestDistSenderComputeNetworkCost(t *testing.T) {
ReplicaDescriptor: roachpb.ReplicaDescriptor{
ReplicaID: roachpb.ReplicaID(replicaID),
},
Tiers: []roachpb.Tier{
{Key: "az", Value: fmt.Sprintf("az%d", rand.Intn(10))},
{Key: "region", Value: region},
{Key: "dc", Value: fmt.Sprintf("dc%d", rand.Intn(10))},
},
Locality: makeLocality(region),
}
}

Expand Down
56 changes: 33 additions & 23 deletions pkg/kv/kvclient/kvcoord/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/shuffle"
"github.com/cockroachdb/errors"
Expand All @@ -25,7 +26,7 @@ import (
// Locality information.
type ReplicaInfo struct {
roachpb.ReplicaDescriptor
Tiers []roachpb.Tier
Locality roachpb.Locality
}

// A ReplicaSlice is a slice of ReplicaInfo.
Expand Down Expand Up @@ -129,7 +130,7 @@ func NewReplicaSlice(
}
rs = append(rs, ReplicaInfo{
ReplicaDescriptor: r,
Tiers: nd.Locality.Tiers,
Locality: nd.Locality,
})
}
if len(rs) == 0 {
Expand Down Expand Up @@ -171,20 +172,6 @@ func (rs ReplicaSlice) MoveToFront(i int) {
rs[0] = front
}

// localityMatch returns the number of consecutive locality tiers
// which match between a and b.
func localityMatch(a, b []roachpb.Tier) int {
if len(a) == 0 {
return 0
}
for i := range a {
if i >= len(b) || a[i] != b[i] {
return i
}
}
return len(a)
}

// A LatencyFunc returns the latency from this node to a remote
// node and a bool indicating whether the latency is valid.
type LatencyFunc func(roachpb.NodeID) (time.Duration, bool)
Expand All @@ -205,7 +192,7 @@ type LatencyFunc func(roachpb.NodeID) (time.Duration, bool)
// leaseholder is known by the caller, the caller will move it to the
// front if appropriate.
func (rs ReplicaSlice) OptimizeReplicaOrder(
nodeID roachpb.NodeID, latencyFn LatencyFunc, locality roachpb.Locality,
st *cluster.Settings, nodeID roachpb.NodeID, latencyFn LatencyFunc, locality roachpb.Locality,
) {
// If we don't know which node we're on or its locality, and we don't have
// latency information to other nodes, send the RPCs randomly.
Expand All @@ -228,18 +215,41 @@ func (rs ReplicaSlice) OptimizeReplicaOrder(
return false // j < i
}

// If this setting is false, ignore locality to match pre 24.1 behavior.
if sortByLocalityFirst.Get(&st.SV) {
// Longer locality matches sort first. The assumption is that they are
// closer and will be better choices If the locality match is the same,
// then use the latency.
attrMatchI := locality.SharedPrefix(rs[i].Locality)
attrMatchJ := locality.SharedPrefix(rs[j].Locality)
if attrMatchI != attrMatchJ {
return attrMatchI > attrMatchJ
}
}

// These nodes are otherwise equal, choose the one that has a lower
// latency to us. The latencyFn is only nil in some tests.
if latencyFn != nil {
// For disconnected or just recently connected nodes, the latencyFn can
// return not OK. We sort not OK nodes behind other nodes that we know the
// latency for.
latencyI, okI := latencyFn(rs[i].NodeID)
latencyJ, okJ := latencyFn(rs[j].NodeID)
if okI && !okJ {
return true
}
if okJ && !okI {
return false
}
if okI && okJ {
return latencyI < latencyJ
}
}
attrMatchI := localityMatch(locality.Tiers, rs[i].Tiers)
attrMatchJ := localityMatch(locality.Tiers, rs[j].Tiers)
// Longer locality matches sort first (the assumption is that
// they'll have better latencies).
return attrMatchI > attrMatchJ
// We want a transitive consistent sorting. Choose the node with the lower
// node id.
// NB: We don't usually get here since the latencyFn is defined and we know
// the latency to all other nodes.
return rs[i].NodeID < rs[j].NodeID
})
}

Expand All @@ -255,7 +265,7 @@ func (rs ReplicaSlice) Descriptors() []roachpb.ReplicaDescriptor {
// LocalityValue returns the value of the locality tier associated with the
// given key.
func (ri *ReplicaInfo) LocalityValue(key string) string {
for _, tier := range ri.Tiers {
for _, tier := range ri.Locality.Tiers {
if tier.Key == key {
return tier.Value
}
Expand Down
52 changes: 47 additions & 5 deletions pkg/kv/kvclient/kvcoord/replica_slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -182,7 +183,7 @@ func locality(t *testing.T, locStrs []string) roachpb.Locality {
func info(t *testing.T, nid roachpb.NodeID, sid roachpb.StoreID, locStrs []string) ReplicaInfo {
return ReplicaInfo{
ReplicaDescriptor: desc(nid, sid),
Tiers: locality(t, locStrs).Tiers,
Locality: locality(t, locStrs),
}
}

Expand All @@ -202,7 +203,8 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
// only identified by their node. If multiple replicas are on different
// stores of the same node, the node only appears once in this list (as the
// ordering between replicas on the same node is not deterministic).
expOrdered []roachpb.NodeID
expOrdered []roachpb.NodeID
disableLocality bool
}{
{
name: "order by locality matching",
Expand All @@ -218,7 +220,24 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
expOrdered: []roachpb.NodeID{1, 2, 4, 3},
},
{
name: "order by latency",
name: "order by latency only",
nodeID: 1,
locality: locality(t, []string{"country=us"}),
latencies: map[roachpb.NodeID]time.Duration{
2: time.Hour,
3: time.Minute,
4: time.Second,
},
slice: ReplicaSlice{
info(t, 2, 2, []string{"country=us"}),
info(t, 4, 4, []string{"country=us"}),
info(t, 4, 44, []string{"country=us"}),
info(t, 3, 3, []string{"country=us"}),
},
expOrdered: []roachpb.NodeID{4, 3, 2},
},
{
name: "order by locality then latency",
nodeID: 1,
locality: locality(t, []string{"country=us", "region=west", "city=la"}),
latencies: map[roachpb.NodeID]time.Duration{
Expand All @@ -232,7 +251,25 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
info(t, 4, 44, []string{"country=us", "region=east", "city=ny"}),
info(t, 3, 3, []string{"country=uk", "city=london"}),
},
expOrdered: []roachpb.NodeID{4, 3, 2},
expOrdered: []roachpb.NodeID{2, 4, 3},
},
{
name: "disable locality setting",
nodeID: 1,
locality: locality(t, []string{"country=us", "region=west", "city=la"}),
latencies: map[roachpb.NodeID]time.Duration{
2: time.Hour,
3: time.Minute,
4: time.Second,
},
slice: ReplicaSlice{
info(t, 2, 2, []string{"country=us", "region=west", "city=sf"}),
info(t, 4, 4, []string{"country=us", "region=east", "city=ny"}),
info(t, 4, 44, []string{"country=us", "region=east", "city=ny"}),
info(t, 3, 3, []string{"country=uk", "city=london"}),
},
expOrdered: []roachpb.NodeID{4, 3, 2},
disableLocality: true,
},
{
// Test that replicas on the local node sort first, regardless of factors
Expand All @@ -259,6 +296,11 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
if test.disableLocality {
sortByLocalityFirst.Override(context.Background(), &st.SV, false)
}

var latencyFn LatencyFunc
if test.latencies != nil {
latencyFn = func(id roachpb.NodeID) (time.Duration, bool) {
Expand All @@ -268,7 +310,7 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) {
}
// Randomize the input order, as it's not supposed to matter.
shuffle.Shuffle(test.slice)
test.slice.OptimizeReplicaOrder(test.nodeID, latencyFn, test.locality)
test.slice.OptimizeReplicaOrder(st, test.nodeID, latencyFn, test.locality)
var sortedNodes []roachpb.NodeID
sortedNodes = append(sortedNodes, test.slice[0].NodeID)
for i := 1; i < len(test.slice); i++ {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/physicalplan/replicaoracle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_test(
deps = [
"//pkg/gossip",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/util",
"//pkg/util/hlc",
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/physicalplan/replicaoracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func (o *randomOracle) ChoosePreferredReplica(
}

type closestOracle struct {
st *cluster.Settings
nodeDescs kvcoord.NodeDescStore
// nodeID and locality of the current node. Used to give preference to the
// current node and others "close" to it.
Expand All @@ -181,6 +182,7 @@ func newClosestOracle(cfg Config) Oracle {
latencyFn = latencyFunc(cfg.RPCContext)
}
return &closestOracle{
st: cfg.Settings,
nodeDescs: cfg.NodeDescs,
nodeID: cfg.NodeID,
locality: cfg.Locality,
Expand All @@ -202,7 +204,7 @@ func (o *closestOracle) ChoosePreferredReplica(
if err != nil {
return roachpb.ReplicaDescriptor{}, false, err
}
replicas.OptimizeReplicaOrder(o.nodeID, o.latencyFunc, o.locality)
replicas.OptimizeReplicaOrder(o.st, o.nodeID, o.latencyFunc, o.locality)
repl := replicas[0].ReplicaDescriptor
// There are no "misplanned" ranges if we know the leaseholder, and we're
// deliberately choosing non-leaseholder.
Expand All @@ -226,6 +228,7 @@ const maxPreferredRangesPerLeaseHolder = 10
// node.
// Finally, it tries not to overload any node.
type binPackingOracle struct {
st *cluster.Settings
maxPreferredRangesPerLeaseHolder int
nodeDescs kvcoord.NodeDescStore
// nodeID and locality of the current node. Used to give preference to the
Expand All @@ -241,6 +244,7 @@ type binPackingOracle struct {

func newBinPackingOracle(cfg Config) Oracle {
return &binPackingOracle{
st: cfg.Settings,
maxPreferredRangesPerLeaseHolder: maxPreferredRangesPerLeaseHolder,
nodeDescs: cfg.NodeDescs,
nodeID: cfg.NodeID,
Expand All @@ -266,7 +270,7 @@ func (o *binPackingOracle) ChoosePreferredReplica(
if err != nil {
return roachpb.ReplicaDescriptor{}, false, err
}
replicas.OptimizeReplicaOrder(o.nodeID, o.latencyFunc, o.locality)
replicas.OptimizeReplicaOrder(o.st, o.nodeID, o.latencyFunc, o.locality)

// Look for a replica that has been assigned some ranges, but it's not yet full.
minLoad := int(math.MaxInt32)
Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/physicalplan/replicaoracle/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -46,13 +47,14 @@ func TestClosest(t *testing.T) {
NodeDescs: g,
NodeID: 1,
Locality: nd2.Locality, // pretend node 2 is closest.
Settings: cluster.MakeTestingClusterSettings(),
LatencyFunc: func(id roachpb.NodeID) (time.Duration, bool) {
if id == 2 {
return time.Nanosecond, validLatencyFunc
}
return time.Millisecond, validLatencyFunc
},
})
o.(*closestOracle).latencyFunc = func(id roachpb.NodeID) (time.Duration, bool) {
if id == 2 {
return time.Nanosecond, validLatencyFunc
}
return time.Millisecond, validLatencyFunc
}
internalReplicas := []roachpb.ReplicaDescriptor{
{NodeID: 4, StoreID: 4},
{NodeID: 2, StoreID: 2},
Expand Down

0 comments on commit 6d1bada

Please sign in to comment.