Skip to content

Commit

Permalink
kv: sort nodes by localities and by latencies
Browse files Browse the repository at this point in the history
Previously this code was sorting by attributes, which was an
anachronism. Now that the RPC context contains information on
round trip latencies, use that as the better signal to optimize
replica order.

Release note: None
  • Loading branch information
spencerkimball committed Feb 14, 2018
1 parent 316fc1e commit d72a431
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 185 deletions.
11 changes: 7 additions & 4 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,13 @@ func (ds *DistSender) sendSingleRange(
// Try to send the call.
replicas := NewReplicaSlice(ds.gossip, desc)

// Rearrange the replicas so that those replicas with long common
// prefix of attributes end up first. If there's no prefix, this is a
// no-op.
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor())
// Rearrange the replicas so that they're ordered in expectation of
// request latency.
var latencyFn LatencyFunc
if ds.rpcContext != nil {
latencyFn = ds.rpcContext.RemoteClocks.Latency
}
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn)

// If this request needs to go to a lease holder and we know who that is, move
// it to the front.
Expand Down
34 changes: 17 additions & 17 deletions pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,12 @@ func TestSendRPCOrder(t *testing.T) {
g, clock := makeGossip(t, stopper)
rangeID := roachpb.RangeID(99)

nodeAttrs := map[int32][]string{
nodeTiers := map[int32][]roachpb.Tier{
1: {}, // The local node, set in each test case.
2: {"us", "west", "gpu"},
3: {"eu", "dublin", "pdu2", "gpu"},
4: {"us", "east", "gpu"},
5: {"us", "east", "gpu", "flaky"},
2: {roachpb.Tier{Key: "country", Value: "us"}, roachpb.Tier{Key: "region", Value: "west"}},
3: {roachpb.Tier{Key: "country", Value: "eu"}, roachpb.Tier{Key: "city", Value: "dublin"}},
4: {roachpb.Tier{Key: "country", Value: "us"}, roachpb.Tier{Key: "region", Value: "east"}, roachpb.Tier{Key: "city", Value: "nyc"}},
5: {roachpb.Tier{Key: "country", Value: "us"}, roachpb.Tier{Key: "region", Value: "east"}, roachpb.Tier{Key: "city", Value: "mia"}},
}

// Gets filled below to identify the replica by its address.
Expand All @@ -216,7 +216,7 @@ func TestSendRPCOrder(t *testing.T) {

testCases := []struct {
args roachpb.Request
attrs []string
tiers []roachpb.Tier
expReplica []roachpb.NodeID
leaseHolder int32 // 0 for not caching a lease holder.
// Naming is somewhat off, as eventually consistent reads usually
Expand All @@ -229,15 +229,15 @@ func TestSendRPCOrder(t *testing.T) {
// Inconsistent Scan without matching attributes.
{
args: &roachpb.ScanRequest{},
attrs: []string{},
tiers: []roachpb.Tier{},
expReplica: []roachpb.NodeID{1, 2, 3, 4, 5},
},
// Inconsistent Scan with matching attributes.
// Should move the two nodes matching the attributes to the front and
// go stable.
{
args: &roachpb.ScanRequest{},
attrs: nodeAttrs[5],
tiers: nodeTiers[5],
// Compare only the first two resulting addresses.
expReplica: []roachpb.NodeID{5, 4, 0, 0, 0},
},
Expand All @@ -246,22 +246,22 @@ func TestSendRPCOrder(t *testing.T) {
// a lease holder.
{
args: &roachpb.ScanRequest{},
attrs: []string{},
tiers: []roachpb.Tier{},
expReplica: []roachpb.NodeID{1, 2, 3, 4, 5},
consistent: true,
},
// Put without matching attributes that requires but does not find lease holder.
// Should go random and not change anything.
{
args: &roachpb.PutRequest{},
attrs: []string{"nomatch"},
tiers: []roachpb.Tier{{Key: "nomatch", Value: ""}},
expReplica: []roachpb.NodeID{1, 2, 3, 4, 5},
},
// Put with matching attributes but no lease holder.
// Should move the two nodes matching the attributes to the front.
{
args: &roachpb.PutRequest{},
attrs: append(nodeAttrs[5], "irrelevant"),
tiers: append(nodeTiers[5], roachpb.Tier{Key: "irrelevant", Value: ""}),
// Compare only the first two resulting addresses.
expReplica: []roachpb.NodeID{5, 4, 0, 0, 0},
},
Expand All @@ -270,7 +270,7 @@ func TestSendRPCOrder(t *testing.T) {
// (the last and second to last) in that order.
{
args: &roachpb.PutRequest{},
attrs: append(nodeAttrs[5], "irrelevant"),
tiers: append(nodeTiers[5], roachpb.Tier{Key: "irrelevant", Value: ""}),
// Compare only the first resulting addresses as we have a lease holder
// and that means we're only trying to send there.
expReplica: []roachpb.NodeID{2, 5, 4, 0, 0},
Expand All @@ -280,7 +280,7 @@ func TestSendRPCOrder(t *testing.T) {
// go random as the lease holder does not matter.
{
args: &roachpb.GetRequest{},
attrs: []string{},
tiers: []roachpb.Tier{},
expReplica: []roachpb.NodeID{1, 2, 3, 4, 5},
leaseHolder: 2,
},
Expand All @@ -296,8 +296,8 @@ func TestSendRPCOrder(t *testing.T) {
nd := &roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(i),
Address: util.MakeUnresolvedAddr(addr.Network(), addr.String()),
Attrs: roachpb.Attributes{
Attrs: nodeAttrs[i],
Locality: roachpb.Locality{
Tiers: nodeTiers[i],
},
}
if err := g.AddInfoProto(gossip.MakeNodeIDKey(roachpb.NodeID(i)), nd, time.Hour); err != nil {
Expand Down Expand Up @@ -343,8 +343,8 @@ func TestSendRPCOrder(t *testing.T) {
// The local node needs to get its attributes during sendRPC.
nd := &roachpb.NodeDescriptor{
NodeID: 6,
Attrs: roachpb.Attributes{
Attrs: tc.attrs,
Locality: roachpb.Locality{
Tiers: tc.tiers,
},
}
g.NodeID.Reset(nd.NodeID)
Expand Down
126 changes: 60 additions & 66 deletions pkg/kv/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package kv

import (
"context"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -30,8 +32,12 @@ type ReplicaInfo struct {
NodeDesc *roachpb.NodeDescriptor
}

func (i ReplicaInfo) attrs() []string {
return i.NodeDesc.Attrs.Attrs
func (i ReplicaInfo) locality() []roachpb.Tier {
return i.NodeDesc.Locality.Tiers
}

func (i ReplicaInfo) addr() string {
return i.NodeDesc.Address.String()
}

// A ReplicaSlice is a slice of ReplicaInfo.
Expand Down Expand Up @@ -81,51 +87,6 @@ func (rs ReplicaSlice) FindReplica(storeID roachpb.StoreID) int {
return -1
}

// FindReplicaByNodeID returns the index of the replica which matches the specified node
// ID. If no replica matches, -1 is returned.
func (rs ReplicaSlice) FindReplicaByNodeID(nodeID roachpb.NodeID) int {
for i := range rs {
if rs[i].NodeID == nodeID {
return i
}
}
return -1
}

// SortByCommonAttributePrefix rearranges the ReplicaSlice by comparing the
// attributes to the given reference attributes. The basis for the comparison
// is that of the common prefix of replica attributes (i.e. the number of equal
// attributes, starting at the first), with a longer prefix sorting first. The
// number of attributes successfully matched to at least one replica is
// returned (hence, if the return value equals the length of the ReplicaSlice,
// at least one replica matched all attributes).
func (rs ReplicaSlice) SortByCommonAttributePrefix(attrs []string) int {
if len(rs) < 2 {
return 0
}
topIndex := len(rs) - 1
for bucket := 0; bucket < len(attrs); bucket++ {
firstNotOrdered := 0
for i := 0; i <= topIndex; i++ {
if bucket < len(rs[i].attrs()) && rs[i].attrs()[bucket] == attrs[bucket] {
// Move replica which matches this attribute to an earlier
// place in the array, just behind the last matching replica.
// This packs all matching replicas together.
rs.Swap(firstNotOrdered, i)
firstNotOrdered++
}
}
if topIndex < len(rs)-1 {
shuffle.Shuffle(rs[firstNotOrdered : topIndex+1])
}
if firstNotOrdered == 0 {
return bucket
}
topIndex = firstNotOrdered - 1
}
return len(attrs)
}

// MoveToFront moves the replica at the given index to the front
// of the slice, keeping the order of the remaining elements stable.
// The function will panic when invoked with an invalid index.
Expand All @@ -139,30 +100,63 @@ func (rs ReplicaSlice) MoveToFront(i int) {
rs[0] = front
}

// OptimizeReplicaOrder sorts the replicas in the order in which they're to be
// used for sending RPCs (meaning in the order in which they'll be probed for
// the lease). "Closer" (matching in more attributes) replicas are ordered
// first. If the current node is a replica, then it'll be the first one.
// localityMatch returns the number of consecutive locality tiers
// which match between a and b.
func localityMatch(a, b []roachpb.Tier) int {
if len(a) == 0 {
return 0
}
for i := range a {
if i >= len(b) || a[i] != b[i] {
return i
}
}
return len(a)
}

// A LatencyFunc returns the latency from this node to a remote
// address and a bool indicating whether the latency is valid.
type LatencyFunc func(string) (time.Duration, bool)

// OptimizeReplicaOrder sorts the replicas in the order in which
// they're to be used for sending RPCs (meaning in the order in which
// they'll be probed for the lease). Lower latency and "closer"
// (matching in more attributes) replicas are ordered first. If the
// current node is a replica, then it'll be the first one.
//
// nodeDesc is the descriptor of the current node. It can be nil, in which case
// information about the current descriptor is not used in optimizing the order.
// nodeDesc is the descriptor of the current node. It can be nil, in
// which case information about the current descriptor is not used in
// optimizing the order.
//
// Note that this method is not concerned with any information the node might
// have about who the lease holder might be. If there is such info (e.g. in a
// LeaseHolderCache), the caller will probably want to further tweak the head of
// the ReplicaSlice.
func (rs ReplicaSlice) OptimizeReplicaOrder(nodeDesc *roachpb.NodeDescriptor) {
// Note that this method is not concerned with any information the
// node might have about who the lease holder might be. If the
// leaseholder is known by the caller, the caller will move it to the
// front if appropriate.
func (rs ReplicaSlice) OptimizeReplicaOrder(
nodeDesc *roachpb.NodeDescriptor, latencyFn LatencyFunc,
) {
// If we don't know which node we're on, send the RPCs randomly.
if nodeDesc == nil {
shuffle.Shuffle(rs)
return
}
// Sort replicas by attribute affinity, which we treat as a stand-in for
// proximity (for now).
rs.SortByCommonAttributePrefix(nodeDesc.Attrs.Attrs)

// If there is a replica in local node, move it to the front.
if i := rs.FindReplicaByNodeID(nodeDesc.NodeID); i > 0 {
rs.MoveToFront(i)
}
// Sort replicas by latency and then attribute affinity.
sort.Slice(rs, func(i, j int) bool {
// If there is a replica in local node, it sorts first.
if rs[i].NodeID == nodeDesc.NodeID {
return true
}
if latencyFn != nil {
latencyI, okI := latencyFn(rs[i].addr())
latencyJ, okJ := latencyFn(rs[j].addr())
if okI && okJ {
return latencyI < latencyJ
}
}
attrMatchI := localityMatch(nodeDesc.Locality.Tiers, rs[i].locality())
attrMatchJ := localityMatch(nodeDesc.Locality.Tiers, rs[j].locality())
// Longer locality matches sort first (the assumption is that
// they'll have better latencies).
return attrMatchI > attrMatchJ
})
}

0 comments on commit d72a431

Please sign in to comment.