Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: kvclient: stop reads on followers #114367

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel
Expand Up @@ -57,12 +57,14 @@ go_test(
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/physicalplan/replicaoracle",
Expand Down
153 changes: 153 additions & 0 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Expand Up @@ -30,9 +30,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle"
Expand Down Expand Up @@ -679,6 +682,7 @@ func TestOracle(t *testing.T) {
Settings: st,
RPCContext: rpcContext,
Clock: clock,
HealthFunc: func(roachpb.NodeID) bool { return true },
})

res, _, err := o.ChoosePreferredReplica(ctx, c.txn, desc, c.lh, c.ctPolicy, replicaoracle.QueryState{})
Expand Down Expand Up @@ -1076,3 +1080,152 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) {
}
})
}

// Test draining a node stops any follower reads to that node. This is important
// because a drained node is about to shut down and a follower read prior to a
// shutdown may need to wait for a gRPC timeout.
func TestDrainStopsFollowerReads(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer utilccl.TestingEnableEnterprise()()
ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
sv := &settings.SV
// TODO(baptist): Remove this if we make this the default.
kvcoord.FollowerReadsUnhealthy.Override(context.Background(), sv, false)

// Turn down these durations to allow follower reads to happen faster.
closeTime := 10 * time.Millisecond
closedts.TargetDuration.Override(ctx, sv, closeTime)
closedts.SideTransportCloseInterval.Override(ctx, sv, closeTime)
ClosedTimestampPropagationSlack.Override(ctx, sv, closeTime)

// Configure localities so n3 and n4 are in the same locality.
// SQL runs on n4 (west).
// Drain n3 (west).
numNodes := 4
locality := func(region string) roachpb.Locality {
return roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: region},
},
}
}
localities := []roachpb.Locality{
locality("us-east"),
locality("us-east"),
locality("us-west"),
locality("us-west"),
}
manualClock := hlc.NewHybridManualClock()

// Record which store processed the read request for our key.
var lastReader atomic.Int32
recordDestStore := func(args kvserverbase.FilterArgs) *kvpb.Error {
getArg, ok := args.Req.(*kvpb.GetRequest)
if !ok || !keys.ScratchRangeMin.Equal(getArg.Key) {
return nil
}
lastReader.Store(int32(args.Sid))
return nil
}

// Set up the nodes in different locality and use the LatencyFunc to
// simulate latency.
serverArgs := make(map[int]base.TestServerArgs)
for i := 0; i < numNodes; i++ {
i := i
serverArgs[i] = base.TestServerArgs{
Settings: settings,
Locality: localities[i],
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
TestingEvalFilter: recordDestStore,
},
},
// Currently we use latency as the "primary" signal and use
// locality only if the latency is unavailable. Simulate
// locality based on whether the nodes are in the same locality.
// TODO(baptist): Remove this if we sort replicas by region (#112993).
KVClient: &kvcoord.ClientTestingKnobs{
LatencyFunc: func(id roachpb.NodeID) (time.Duration, bool) {
if localities[id-1].Equal(localities[i]) {
return time.Millisecond, true
}
return 100 * time.Millisecond, true
},
},
},
}
}

// Set ReplicationManual as we don't want any leases to move around and affect
// the results of this test.
tc := testcluster.StartTestCluster(t, numNodes,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
},
},
},
ServerArgsPerNode: serverArgs,
})
defer tc.Stopper().Stop(ctx)

// Put the scratch range on nodes 1, 2, 3 and leave the lease on 1.
// We want all follower read request to come from 4 and go to node 3 due to
// the way latency and localities are set up.
scratchKey := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, scratchKey, tc.Targets(1, 2)...)
server := tc.Server(3)
db := server.DB()

// Keep the read time the same as a time in the recent past. Once the
// readTime is closed, we expect all future reads to go to n3.
readTime := server.Clock().Now()

testutils.SucceedsSoon(t, func() error {
sendFollowerRead(t, db, scratchKey, readTime)
reader := lastReader.Load()
if reader != 3 {
return errors.Newf("expected read to n3 not n%d", reader)
}
return nil
})

client, err := tc.GetAdminClient(ctx, t, 0)
require.NoError(t, err)
// Send a drain request to n3 and wait until the drain is completed. Other
// nodes find out about the drain asynchronously through gossip.
req := serverpb.DrainRequest{Shutdown: false, DoDrain: true, NodeId: "3"}
drainStream, err := client.Drain(ctx, &req)
require.NoError(t, err)
// When we get a response the drain is complete.
drainResp, err := drainStream.Recv()
require.NoError(t, err)
require.True(t, drainResp.IsDraining)

// Follower reads should stop going to n3 once other nodes notice it
// draining.
testutils.SucceedsSoon(t, func() error {
sendFollowerRead(t, db, scratchKey, readTime)
reader := lastReader.Load()
if reader == 3 {
return errors.New("expected to not read from n3")
}
return nil
})
}

func sendFollowerRead(t *testing.T, db *kv.DB, scratchKey roachpb.Key, readTime hlc.Timestamp) {
// Manually construct the BatchRequest to set the Timestamp.
b := &kv.Batch{}
b.Get(scratchKey)
b.Header.Timestamp = readTime
require.NoError(t, db.Run(context.Background(), b))
}
37 changes: 35 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Expand Up @@ -224,6 +224,16 @@ var senderConcurrencyLimit = settings.RegisterIntSetting(
settings.NonNegativeInt,
)

// FollowerReadsUnhealthy controls whether we will send follower reads to nodes
// that are not considered healthy. By default, we will sort these nodes behind
// healthy nodes.
var FollowerReadsUnhealthy = settings.RegisterBoolSetting(
settings.TenantWritable,
"kv.dist_sender.follower_reads_unhealthy.enabled",
"send follower reads to unhealthy nodes",
true,
)

func max(a, b int64) int64 {
if a > b {
return a
Expand Down Expand Up @@ -371,6 +381,9 @@ type DistSender struct {
// LatencyFunc is used to estimate the latency to other nodes.
latencyFunc LatencyFunc

// HealthFunc returns true if the node is alive and not draining.
healthFunc atomic.Pointer[HealthFunc]

onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error

// locality is the description of the topography of the server on which the
Expand Down Expand Up @@ -530,14 +543,34 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch
}

// Placeholder function until we inject the real health function in using
// SetHealthFunc.
// TODO(baptist): Restructure the code to allow injecting the correct
// HealthFunc at construction time.
healthFunc := HealthFunc(func(id roachpb.NodeID) bool {
return true
})
ds.healthFunc.Store(&healthFunc)

return ds
}

// SetHealthFunc is called after construction due to the circular dependency
// between DistSender and NodeLiveness.
func (ds *DistSender) SetHealthFunc(healthFn HealthFunc) {
ds.healthFunc.Store(&healthFn)
}

// LatencyFunc returns the LatencyFunc of the DistSender.
func (ds *DistSender) LatencyFunc() LatencyFunc {
return ds.latencyFunc
}

// HealthFunc returns the HealthFunc of the DistSender.
func (ds *DistSender) HealthFunc() HealthFunc {
return *ds.healthFunc.Load()
}

// DisableFirstRangeUpdates disables updates of the first range via
// gossip. Used by tests which want finer control of the contents of the range
// cache.
Expand Down Expand Up @@ -2068,7 +2101,7 @@ func (ds *DistSender) sendToReplicas(
// First order by latency, then move the leaseholder to the front of the
// list, if it is known.
if !ds.dontReorderReplicas {
replicas.OptimizeReplicaOrder(ds.getNodeID(), ds.latencyFunc, ds.locality)
replicas.OptimizeReplicaOrder(ds.st, ds.getNodeID(), ds.HealthFunc(), ds.latencyFunc, ds.locality)
}

idx := -1
Expand All @@ -2087,7 +2120,7 @@ func (ds *DistSender) sendToReplicas(
case kvpb.RoutingPolicy_NEAREST:
// Order by latency.
log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required")
replicas.OptimizeReplicaOrder(ds.getNodeID(), ds.latencyFunc, ds.locality)
replicas.OptimizeReplicaOrder(ds.st, ds.getNodeID(), ds.HealthFunc(), ds.latencyFunc, ds.locality)

default:
log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Expand Up @@ -634,7 +634,7 @@ func newTransportForRange(
if err != nil {
return nil, err
}
replicas.OptimizeReplicaOrder(ds.getNodeID(), latencyFn, ds.locality)
replicas.OptimizeReplicaOrder(ds.st, ds.getNodeID(), ds.HealthFunc(), latencyFn, ds.locality)
opts := SendOptions{class: connectionClass(&ds.st.SV)}
return ds.transportFactory(opts, ds.nodeDialer, replicas)
}
Expand Down
25 changes: 23 additions & 2 deletions pkg/kv/kvclient/kvcoord/replica_slice.go
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/shuffle"
)
Expand Down Expand Up @@ -189,6 +190,10 @@ func localityMatch(a, b []roachpb.Tier) int {
// node and a bool indicating whether the latency is valid.
type LatencyFunc func(roachpb.NodeID) (time.Duration, bool)

// HealthFunc returns true if the node should be considered alive. Unhealthy
// nodes are sorted behind healthy nodes.
type HealthFunc func(roachpb.NodeID) bool

// OptimizeReplicaOrder sorts the replicas in the order in which
// they're to be used for sending RPCs (meaning in the order in which
// they'll be probed for the lease). Lower latency and "closer"
Expand All @@ -205,7 +210,11 @@ type LatencyFunc func(roachpb.NodeID) (time.Duration, bool)
// leaseholder is known by the caller, the caller will move it to the
// front if appropriate.
func (rs ReplicaSlice) OptimizeReplicaOrder(
nodeID roachpb.NodeID, latencyFn LatencyFunc, locality roachpb.Locality,
st *cluster.Settings,
nodeID roachpb.NodeID,
healthFn HealthFunc,
latencyFn LatencyFunc,
locality roachpb.Locality,
) {
// If we don't know which node we're on or its locality, and we don't have
// latency information to other nodes, send the RPCs randomly.
Expand All @@ -216,10 +225,22 @@ func (rs ReplicaSlice) OptimizeReplicaOrder(

// Sort replicas by latency and then attribute affinity.
sort.Slice(rs, func(i, j int) bool {
// Replicas on the same node have the same latency.
// Replicas on the same node have the same score.
if rs[i].NodeID == rs[j].NodeID {
return false // i == j
}

if !FollowerReadsUnhealthy.Get(&st.SV) {
// Sort healthy nodes before unhealthy nodes.
// NB: This is checked before checking if we are on the local node because
// if we are unhealthy, then we prefer to choose a different follower.
healthI := healthFn(rs[i].NodeID)
healthJ := healthFn(rs[j].NodeID)
if healthI != healthJ {
return healthI
}
}

// Replicas on the local node sort first.
if rs[i].NodeID == nodeID {
return true // i < j
Expand Down