From 535a228b9905b31dac27441aba0567c3f441df20 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Mon, 23 Oct 2023 18:59:24 -0400 Subject: [PATCH] kvclient: stop reads on followers Stop follower reads on draining, decommissioning or unhealthy nodes. Epic: none Fixes: #112351 Release note (performance improvement): This change prevents failed requests from being issued on followers that are draining, decommissioning or unhealthy which prevents latency spikes if those nodes later go offline. --- pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel | 2 + .../kvfollowerreadsccl/followerreads_test.go | 153 ++++++++++++++++++ pkg/kv/kvclient/kvcoord/dist_sender.go | 37 ++++- .../kvclient/kvcoord/dist_sender_rangefeed.go | 2 +- pkg/kv/kvclient/kvcoord/replica_slice.go | 25 ++- pkg/kv/kvclient/kvcoord/replica_slice_test.go | 33 +++- pkg/server/server.go | 10 ++ .../physicalplan/replicaoracle/BUILD.bazel | 1 + pkg/sql/physicalplan/replicaoracle/oracle.go | 13 +- .../physicalplan/replicaoracle/oracle_test.go | 9 +- pkg/sql/physicalplan/span_resolver.go | 1 + 11 files changed, 275 insertions(+), 11 deletions(-) diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel b/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel index 693072776d62..8237b9e0dfc8 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel @@ -57,12 +57,14 @@ go_test( "//pkg/kv/kvserver", "//pkg/kv/kvserver/closedts", "//pkg/kv/kvserver/concurrency/lock", + "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", "//pkg/rpc", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/security/username", "//pkg/server", + "//pkg/server/serverpb", "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/physicalplan/replicaoracle", diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index 44c6adc9d3e3..62dc2f1d9935 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -30,9 +30,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle" @@ -679,6 +682,7 @@ func TestOracle(t *testing.T) { Settings: st, RPCContext: rpcContext, Clock: clock, + HealthFunc: func(roachpb.NodeID) bool { return true }, }) res, _, err := o.ChoosePreferredReplica(ctx, c.txn, desc, c.lh, c.ctPolicy, replicaoracle.QueryState{}) @@ -1076,3 +1080,152 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) { } }) } + +// Test draining a node stops any follower reads to that node. This is important +// because a drained node is about to shut down and a follower read prior to a +// shutdown may need to wait for a gRPC timeout. +func TestDrainStopsFollowerReads(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer utilccl.TestingEnableEnterprise()() + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + sv := &settings.SV + // TODO(baptist): Remove this if we make this the default. + kvcoord.FollowerReadsUnhealthy.Override(context.Background(), sv, false) + + // Turn down these durations to allow follower reads to happen faster. + closeTime := 10 * time.Millisecond + closedts.TargetDuration.Override(ctx, sv, closeTime) + closedts.SideTransportCloseInterval.Override(ctx, sv, closeTime) + ClosedTimestampPropagationSlack.Override(ctx, sv, closeTime) + + // Configure localities so n3 and n4 are in the same locality. + // SQL runs on n4 (west). + // Drain n3 (west). + numNodes := 4 + locality := func(region string) roachpb.Locality { + return roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: region}, + }, + } + } + localities := []roachpb.Locality{ + locality("us-east"), + locality("us-east"), + locality("us-west"), + locality("us-west"), + } + manualClock := hlc.NewHybridManualClock() + + // Record which store processed the read request for our key. + var lastReader atomic.Int32 + recordDestStore := func(args kvserverbase.FilterArgs) *kvpb.Error { + getArg, ok := args.Req.(*kvpb.GetRequest) + if !ok || !keys.ScratchRangeMin.Equal(getArg.Key) { + return nil + } + lastReader.Store(int32(args.Sid)) + return nil + } + + // Set up the nodes in different locality and use the LatencyFunc to + // simulate latency. + serverArgs := make(map[int]base.TestServerArgs) + for i := 0; i < numNodes; i++ { + i := i + serverArgs[i] = base.TestServerArgs{ + Settings: settings, + Locality: localities[i], + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingEvalFilter: recordDestStore, + }, + }, + // Currently we use latency as the "primary" signal and use + // locality only if the latency is unavailable. Simulate + // locality based on whether the nodes are in the same locality. + // TODO(baptist): Remove this if we sort replicas by region (#112993). + KVClient: &kvcoord.ClientTestingKnobs{ + LatencyFunc: func(id roachpb.NodeID) (time.Duration, bool) { + if localities[id-1].Equal(localities[i]) { + return time.Millisecond, true + } + return 100 * time.Millisecond, true + }, + }, + }, + } + } + + // Set ReplicationManual as we don't want any leases to move around and affect + // the results of this test. + tc := testcluster.StartTestCluster(t, numNodes, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + DisableDefaultTestTenant: true, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + WallClock: manualClock, + }, + }, + }, + ServerArgsPerNode: serverArgs, + }) + defer tc.Stopper().Stop(ctx) + + // Put the scratch range on nodes 1, 2, 3 and leave the lease on 1. + // We want all follower read request to come from 4 and go to node 3 due to + // the way latency and localities are set up. + scratchKey := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, scratchKey, tc.Targets(1, 2)...) + server := tc.Server(3) + db := server.DB() + + // Keep the read time the same as a time in the recent past. Once the + // readTime is closed, we expect all future reads to go to n3. + readTime := server.Clock().Now() + + testutils.SucceedsSoon(t, func() error { + sendFollowerRead(t, db, scratchKey, readTime) + reader := lastReader.Load() + if reader != 3 { + return errors.Newf("expected read to n3 not n%d", reader) + } + return nil + }) + + client, err := tc.GetAdminClient(ctx, t, 0) + require.NoError(t, err) + // Send a drain request to n3 and wait until the drain is completed. Other + // nodes find out about the drain asynchronously through gossip. + req := serverpb.DrainRequest{Shutdown: false, DoDrain: true, NodeId: "3"} + drainStream, err := client.Drain(ctx, &req) + require.NoError(t, err) + // When we get a response the drain is complete. + drainResp, err := drainStream.Recv() + require.NoError(t, err) + require.True(t, drainResp.IsDraining) + + // Follower reads should stop going to n3 once other nodes notice it + // draining. + testutils.SucceedsSoon(t, func() error { + sendFollowerRead(t, db, scratchKey, readTime) + reader := lastReader.Load() + if reader == 3 { + return errors.New("expected to not read from n3") + } + return nil + }) +} + +func sendFollowerRead(t *testing.T, db *kv.DB, scratchKey roachpb.Key, readTime hlc.Timestamp) { + // Manually construct the BatchRequest to set the Timestamp. + b := &kv.Batch{} + b.Get(scratchKey) + b.Header.Timestamp = readTime + require.NoError(t, db.Run(context.Background(), b)) +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index ebbedac2ea21..f691b3ba751a 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -224,6 +224,16 @@ var senderConcurrencyLimit = settings.RegisterIntSetting( settings.NonNegativeInt, ) +// FollowerReadsUnhealthy controls whether we will send follower reads to nodes +// that are not considered healthy. By default, we will sort these nodes behind +// healthy nodes. +var FollowerReadsUnhealthy = settings.RegisterBoolSetting( + settings.TenantWritable, + "kv.dist_sender.follower_reads_unhealthy.enabled", + "send follower reads to unhealthy nodes", + true, +) + func max(a, b int64) int64 { if a > b { return a @@ -371,6 +381,9 @@ type DistSender struct { // LatencyFunc is used to estimate the latency to other nodes. latencyFunc LatencyFunc + // HealthFunc returns true if the node is alive and not draining. + healthFunc atomic.Pointer[HealthFunc] + onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error // locality is the description of the topography of the server on which the @@ -530,14 +543,34 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch } + // Placeholder function until we inject the real health function in using + // SetHealthFunc. + // TODO(baptist): Restructure the code to allow injecting the correct + // HealthFunc at construction time. + healthFunc := HealthFunc(func(id roachpb.NodeID) bool { + return true + }) + ds.healthFunc.Store(&healthFunc) + return ds } +// SetHealthFunc is called after construction due to the circular dependency +// between DistSender and NodeLiveness. +func (ds *DistSender) SetHealthFunc(healthFn HealthFunc) { + ds.healthFunc.Store(&healthFn) +} + // LatencyFunc returns the LatencyFunc of the DistSender. func (ds *DistSender) LatencyFunc() LatencyFunc { return ds.latencyFunc } +// HealthFunc returns the HealthFunc of the DistSender. +func (ds *DistSender) HealthFunc() HealthFunc { + return *ds.healthFunc.Load() +} + // DisableFirstRangeUpdates disables updates of the first range via // gossip. Used by tests which want finer control of the contents of the range // cache. @@ -2068,7 +2101,7 @@ func (ds *DistSender) sendToReplicas( // First order by latency, then move the leaseholder to the front of the // list, if it is known. if !ds.dontReorderReplicas { - replicas.OptimizeReplicaOrder(ds.getNodeID(), ds.latencyFunc, ds.locality) + replicas.OptimizeReplicaOrder(ds.st, ds.getNodeID(), ds.HealthFunc(), ds.latencyFunc, ds.locality) } idx := -1 @@ -2087,7 +2120,7 @@ func (ds *DistSender) sendToReplicas( case kvpb.RoutingPolicy_NEAREST: // Order by latency. log.VEvent(ctx, 2, "routing to nearest replica; leaseholder not required") - replicas.OptimizeReplicaOrder(ds.getNodeID(), ds.latencyFunc, ds.locality) + replicas.OptimizeReplicaOrder(ds.st, ds.getNodeID(), ds.HealthFunc(), ds.latencyFunc, ds.locality) default: log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index a2ffe8c7f8a9..33b7cb625eff 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -634,7 +634,7 @@ func newTransportForRange( if err != nil { return nil, err } - replicas.OptimizeReplicaOrder(ds.getNodeID(), latencyFn, ds.locality) + replicas.OptimizeReplicaOrder(ds.st, ds.getNodeID(), ds.HealthFunc(), latencyFn, ds.locality) opts := SendOptions{class: connectionClass(&ds.st.SV)} return ds.transportFactory(opts, ds.nodeDialer, replicas) } diff --git a/pkg/kv/kvclient/kvcoord/replica_slice.go b/pkg/kv/kvclient/kvcoord/replica_slice.go index 873a24ef81fd..ab40a6054339 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice.go @@ -17,6 +17,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/shuffle" ) @@ -189,6 +190,10 @@ func localityMatch(a, b []roachpb.Tier) int { // node and a bool indicating whether the latency is valid. type LatencyFunc func(roachpb.NodeID) (time.Duration, bool) +// HealthFunc returns true if the node should be considered alive. Unhealthy +// nodes are sorted behind healthy nodes. +type HealthFunc func(roachpb.NodeID) bool + // OptimizeReplicaOrder sorts the replicas in the order in which // they're to be used for sending RPCs (meaning in the order in which // they'll be probed for the lease). Lower latency and "closer" @@ -205,7 +210,11 @@ type LatencyFunc func(roachpb.NodeID) (time.Duration, bool) // leaseholder is known by the caller, the caller will move it to the // front if appropriate. func (rs ReplicaSlice) OptimizeReplicaOrder( - nodeID roachpb.NodeID, latencyFn LatencyFunc, locality roachpb.Locality, + st *cluster.Settings, + nodeID roachpb.NodeID, + healthFn HealthFunc, + latencyFn LatencyFunc, + locality roachpb.Locality, ) { // If we don't know which node we're on or its locality, and we don't have // latency information to other nodes, send the RPCs randomly. @@ -216,10 +225,22 @@ func (rs ReplicaSlice) OptimizeReplicaOrder( // Sort replicas by latency and then attribute affinity. sort.Slice(rs, func(i, j int) bool { - // Replicas on the same node have the same latency. + // Replicas on the same node have the same score. if rs[i].NodeID == rs[j].NodeID { return false // i == j } + + if !FollowerReadsUnhealthy.Get(&st.SV) { + // Sort healthy nodes before unhealthy nodes. + // NB: This is checked before checking if we are on the local node because + // if we are unhealthy, then we prefer to choose a different follower. + healthI := healthFn(rs[i].NodeID) + healthJ := healthFn(rs[j].NodeID) + if healthI != healthJ { + return healthI + } + } + // Replicas on the local node sort first. if rs[i].NodeID == nodeID { return true // i < j diff --git a/pkg/kv/kvclient/kvcoord/replica_slice_test.go b/pkg/kv/kvclient/kvcoord/replica_slice_test.go index 7d693d41dbab..a77514130cbe 100644 --- a/pkg/kv/kvclient/kvcoord/replica_slice_test.go +++ b/pkg/kv/kvclient/kvcoord/replica_slice_test.go @@ -18,6 +18,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/errorutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -197,6 +198,8 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { locality roachpb.Locality // map from node address (see nodeDesc()) to latency to that node. latencies map[roachpb.NodeID]time.Duration + // map of unhealthy nodes + unhealthy map[roachpb.NodeID]struct{} slice ReplicaSlice // expOrder is the expected order in which the replicas sort. Replicas are // only identified by their node. If multiple replicas are on different @@ -217,6 +220,24 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { }, expOrdered: []roachpb.NodeID{1, 2, 4, 3}, }, + { + // Same test as above, but mark nodes 2 and 4 as unhealthy. + name: "order by health", + nodeID: 1, + locality: locality(t, []string{"country=us", "region=west", "city=la"}), + slice: ReplicaSlice{ + info(t, 1, 1, []string{"country=us", "region=west", "city=la"}), + info(t, 2, 2, []string{"country=us", "region=west", "city=sf"}), + info(t, 3, 3, []string{"country=uk", "city=london"}), + info(t, 3, 33, []string{"country=uk", "city=london"}), + info(t, 4, 4, []string{"country=us", "region=east", "city=ny"}), + }, + unhealthy: map[roachpb.NodeID]struct{}{ + 1: {}, + 4: {}, + }, + expOrdered: []roachpb.NodeID{2, 3, 1, 4}, + }, { name: "order by latency", nodeID: 1, @@ -259,6 +280,9 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { } for _, test := range testCases { t.Run(test.name, func(t *testing.T) { + st := cluster.MakeTestingClusterSettings() + // TODO(baptist): Remove this if we make this the default. + FollowerReadsUnhealthy.Override(context.Background(), &st.SV, false) var latencyFn LatencyFunc if test.latencies != nil { latencyFn = func(id roachpb.NodeID) (time.Duration, bool) { @@ -266,9 +290,16 @@ func TestReplicaSliceOptimizeReplicaOrder(t *testing.T) { return lat, ok } } + healthFn := func(id roachpb.NodeID) bool { + if test.unhealthy == nil { + return true + } + _, ok := test.unhealthy[id] + return !ok + } // Randomize the input order, as it's not supposed to matter. shuffle.Shuffle(test.slice) - test.slice.OptimizeReplicaOrder(test.nodeID, latencyFn, test.locality) + test.slice.OptimizeReplicaOrder(st, test.nodeID, healthFn, latencyFn, test.locality) var sortedNodes []roachpb.NodeID sortedNodes = append(sortedNodes, test.slice[0].NodeID) for i := 1; i < len(test.slice); i++ { diff --git a/pkg/server/server.go b/pkg/server/server.go index 1428cf320951..b516cd7ce9e8 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -511,6 +511,16 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { }) registry.AddMetricStruct(nodeLiveness.Metrics()) + // TODO(baptist): Refactor this to change the dependency between liveness and + // the dist sender. Today the persistence of liveness requires the distsender + // to read and write the liveness records, but the cache only needs the gossip + // struct. We could construct the liveness cache separately from the rest of + // liveness and use that to compute this rather than the entire liveness + // struct. + distSender.SetHealthFunc(func(id roachpb.NodeID) bool { + return nodeLiveness.IsAvailableNotDraining(id) + }) + nodeLivenessFn := storepool.MakeStorePoolNodeLivenessFunc(nodeLiveness) if nodeLivenessKnobs, ok := cfg.TestingKnobs.NodeLiveness.(kvserver.NodeLivenessTestingKnobs); ok && nodeLivenessKnobs.StorePoolNodeLivenessFn != nil { diff --git a/pkg/sql/physicalplan/replicaoracle/BUILD.bazel b/pkg/sql/physicalplan/replicaoracle/BUILD.bazel index 89c4dcf7b8cd..c1b0d6137cae 100644 --- a/pkg/sql/physicalplan/replicaoracle/BUILD.bazel +++ b/pkg/sql/physicalplan/replicaoracle/BUILD.bazel @@ -28,6 +28,7 @@ go_test( "//pkg/config/zonepb", "//pkg/gossip", "//pkg/roachpb", + "//pkg/settings/cluster", "//pkg/testutils", "//pkg/util", "//pkg/util/hlc", diff --git a/pkg/sql/physicalplan/replicaoracle/oracle.go b/pkg/sql/physicalplan/replicaoracle/oracle.go index 1ce79c504288..0e7831f6faad 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle.go @@ -52,6 +52,7 @@ type Config struct { Clock *hlc.Clock RPCContext *rpc.Context LatencyFunc kvcoord.LatencyFunc + HealthFunc kvcoord.HealthFunc } // Oracle is used to choose the lease holder for ranges. This @@ -163,6 +164,7 @@ func (o *randomOracle) ChoosePreferredReplica( } type closestOracle struct { + st *cluster.Settings nodeDescs kvcoord.NodeDescStore // nodeID and locality of the current node. Used to give preference to the // current node and others "close" to it. @@ -172,6 +174,7 @@ type closestOracle struct { // inside the same process. nodeID roachpb.NodeID locality roachpb.Locality + healthFunc kvcoord.HealthFunc latencyFunc kvcoord.LatencyFunc } @@ -181,10 +184,12 @@ func newClosestOracle(cfg Config) Oracle { latencyFn = latencyFunc(cfg.RPCContext) } return &closestOracle{ + st: cfg.Settings, nodeDescs: cfg.NodeDescs, nodeID: cfg.NodeID, locality: cfg.Locality, latencyFunc: latencyFn, + healthFunc: cfg.HealthFunc, } } @@ -202,7 +207,7 @@ func (o *closestOracle) ChoosePreferredReplica( if err != nil { return roachpb.ReplicaDescriptor{}, false, err } - replicas.OptimizeReplicaOrder(o.nodeID, o.latencyFunc, o.locality) + replicas.OptimizeReplicaOrder(o.st, o.nodeID, o.healthFunc, o.latencyFunc, o.locality) repl := replicas[0].ReplicaDescriptor // There are no "misplanned" ranges if we know the leaseholder, and we're // deliberately choosing non-leaseholder. @@ -226,6 +231,7 @@ const maxPreferredRangesPerLeaseHolder = 10 // node. // Finally, it tries not to overload any node. type binPackingOracle struct { + st *cluster.Settings maxPreferredRangesPerLeaseHolder int nodeDescs kvcoord.NodeDescStore // nodeID and locality of the current node. Used to give preference to the @@ -237,15 +243,18 @@ type binPackingOracle struct { nodeID roachpb.NodeID locality roachpb.Locality latencyFunc kvcoord.LatencyFunc + healthFunc kvcoord.HealthFunc } func newBinPackingOracle(cfg Config) Oracle { return &binPackingOracle{ + st: cfg.Settings, maxPreferredRangesPerLeaseHolder: maxPreferredRangesPerLeaseHolder, nodeDescs: cfg.NodeDescs, nodeID: cfg.NodeID, locality: cfg.Locality, latencyFunc: latencyFunc(cfg.RPCContext), + healthFunc: cfg.HealthFunc, } } @@ -266,7 +275,7 @@ func (o *binPackingOracle) ChoosePreferredReplica( if err != nil { return roachpb.ReplicaDescriptor{}, false, err } - replicas.OptimizeReplicaOrder(o.nodeID, o.latencyFunc, o.locality) + replicas.OptimizeReplicaOrder(o.st, o.nodeID, o.healthFunc, o.latencyFunc, o.locality) // Look for a replica that has been assigned some ranges, but it's not yet full. minLoad := int(math.MaxInt32) diff --git a/pkg/sql/physicalplan/replicaoracle/oracle_test.go b/pkg/sql/physicalplan/replicaoracle/oracle_test.go index 0fe7b7ad79d3..66421591588b 100644 --- a/pkg/sql/physicalplan/replicaoracle/oracle_test.go +++ b/pkg/sql/physicalplan/replicaoracle/oracle_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -44,9 +45,11 @@ func TestClosest(t *testing.T) { nd2, err := g.GetNodeDescriptor(2) require.NoError(t, err) o := NewOracle(ClosestChoice, Config{ - NodeDescs: g, - NodeID: 1, - Locality: nd2.Locality, // pretend node 2 is closest. + NodeDescs: g, + NodeID: 1, + Locality: nd2.Locality, // pretend node 2 is closest. + Settings: cluster.MakeTestingClusterSettings(), + HealthFunc: func(_ roachpb.NodeID) bool { return true }, }) o.(*closestOracle).latencyFunc = func(id roachpb.NodeID) (time.Duration, bool) { if id == 2 { diff --git a/pkg/sql/physicalplan/span_resolver.go b/pkg/sql/physicalplan/span_resolver.go index 4d1208372c84..dfecb4a74916 100644 --- a/pkg/sql/physicalplan/span_resolver.go +++ b/pkg/sql/physicalplan/span_resolver.go @@ -154,6 +154,7 @@ func NewSpanResolver( Clock: clock, RPCContext: rpcCtx, LatencyFunc: distSender.LatencyFunc(), + HealthFunc: distSender.HealthFunc(), }), distSender: distSender, }