diff --git a/pkg/cli/haproxy_test.go b/pkg/cli/haproxy_test.go index 7c067c85e068..d6de258f2a1e 100644 --- a/pkg/cli/haproxy_test.go +++ b/pkg/cli/haproxy_test.go @@ -124,12 +124,12 @@ func TestNodeStatusToNodeInfoConversion(t *testing.T) { {Desc: roachpb.NodeDescriptor{NodeID: 6}}, }, LivenessByNodeID: map[roachpb.NodeID]kvserverpb.NodeLivenessStatus{ - 1: kvserverpb.NodeLivenessStatus_DEAD, - 2: kvserverpb.NodeLivenessStatus_DECOMMISSIONING, - 3: kvserverpb.NodeLivenessStatus_UNKNOWN, - 4: kvserverpb.NodeLivenessStatus_UNAVAILABLE, - 5: kvserverpb.NodeLivenessStatus_LIVE, - 6: kvserverpb.NodeLivenessStatus_DECOMMISSIONED, + 1: kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD, + 2: kvserverpb.NodeLivenessStatus_DEPRECATED_DECOMMISSIONING, + 3: kvserverpb.NodeLivenessStatus_DEPRECATED_UNKNOWN, + 4: kvserverpb.NodeLivenessStatus_DEPRECATED_UNAVAILABLE, + 5: kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE, + 6: kvserverpb.NodeLivenessStatus_DEPRECATED_DECOMMISSIONED, }, }, []haProxyNodeInfo{ diff --git a/pkg/cli/node.go b/pkg/cli/node.go index 6fa79b90d47b..9683c5f28362 100644 --- a/pkg/cli/node.go +++ b/pkg/cli/node.go @@ -359,8 +359,6 @@ func runDecommissionNodeImpl( replicaCount += status.ReplicaCount allDecommissioning = allDecommissioning && status.CommissionStatus.Decommissioning() || status.CommissionStatus.Decommissioned() - // XXX: Write tests for recommissioning only canceling out extant - // decommissioning attempts, and no more. } if replicaCount == 0 { // We now mark the node as fully decommissioned. @@ -373,11 +371,12 @@ func runDecommissionNodeImpl( fmt.Fprintln(stderr) return errors.Wrap(err, "while trying to mark as decommissioned") } - // We print out the final commission_status stating - // `decommissioned`. This is checked in tests. - fmt.Fprintln(stderr) - if err := printDecommissionStatus(*resp); err != nil { - return err + if !reflect.DeepEqual(&prevResponse, resp) { + fmt.Fprintln(stderr) + if err := printDecommissionStatus(*resp); err != nil { + return err + } + prevResponse = *resp } } if replicaCount == 0 && allDecommissioning { diff --git a/pkg/cmd/roachtest/decommission.go b/pkg/cmd/roachtest/decommission.go index 029d98ec3aa0..c973c7a5ebc6 100644 --- a/pkg/cmd/roachtest/decommission.go +++ b/pkg/cmd/roachtest/decommission.go @@ -260,6 +260,26 @@ func execCLI( func runDecommissionAcceptance(ctx context.Context, t *test, c *cluster) { t.Skip("TODO(irfansharif)", "Rewrite this test to incorporate new recommission semantics") + // XXX: Write tests for recommissioning only canceling out extant + // decommissioning attempts, and no more. + + // XXX: Test all the error conditions for setCommissionStatusInternal. + + // XXX: Add a test for the following scenario. + // - Decommission n2 from n1, where n2 has no liveness record. n1 + // running v20.2 + // - n2 is running v20.1, tries to update it's liveness record. Finds + // it's own liveness record written by n1 (but the new proto, + // which it doesn't know how to read fully). Will it parse + // properly? It's important for v20.2 nodes to be able to read v20.1 and + // v20.2 representations. It's important for v20.2 to write a representation + // understood by both v20.1 and v20.2. + + // XXX: What's the guarantee that v21.1 nodes will never see v20.1 + // representation? A: Heartbeats are only ever sent out by live nodes, and + // operators aren't allowed to deploy clusters such that there's a 20.1 + // node heartbeating a 21.1 cluster. Still, can we foolproof this somehow? + args := startArgs("--env=COCKROACH_SCAN_MAX_IDLE_TIME=5ms") c.Put(ctx, cockroach, "./cockroach") c.Start(ctx, t, args) @@ -376,6 +396,8 @@ func runDecommissionAcceptance(ctx context.Context, t *test, c *cluster) { return res } + // XXX: Write tests for what gets inserted into event log, and what doesn't. + // XXX: In this test we're able to issue decommission commands through nodes // that have been fully decommissioned. diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index d60763d8c621..0d20ce4144e7 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -309,7 +309,7 @@ func createTestAllocator( stopper, g, manual, storePool, _ := createTestStorePool( TestTimeUntilStoreDeadOff, deterministic, func() int { return numNodes }, - kvserverpb.NodeLivenessStatus_LIVE) + kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE) a := MakeAllocator(storePool, func(string) (time.Duration, bool) { return 0, true }) @@ -332,7 +332,7 @@ func mockStorePool( liveNodeSet := map[roachpb.NodeID]kvserverpb.NodeLivenessStatus{} storePool.detailsMu.storeDetails = map[roachpb.StoreID]*storeDetail{} for _, storeID := range aliveStoreIDs { - liveNodeSet[roachpb.NodeID(storeID)] = kvserverpb.NodeLivenessStatus_LIVE + liveNodeSet[roachpb.NodeID(storeID)] = kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE detail := storePool.getStoreDetailLocked(storeID) detail.desc = &roachpb.StoreDescriptor{ StoreID: storeID, @@ -340,7 +340,7 @@ func mockStorePool( } } for _, storeID := range unavailableStoreIDs { - liveNodeSet[roachpb.NodeID(storeID)] = kvserverpb.NodeLivenessStatus_UNAVAILABLE + liveNodeSet[roachpb.NodeID(storeID)] = kvserverpb.NodeLivenessStatus_DEPRECATED_UNAVAILABLE detail := storePool.getStoreDetailLocked(storeID) detail.desc = &roachpb.StoreDescriptor{ StoreID: storeID, @@ -348,7 +348,7 @@ func mockStorePool( } } for _, storeID := range deadStoreIDs { - liveNodeSet[roachpb.NodeID(storeID)] = kvserverpb.NodeLivenessStatus_DEAD + liveNodeSet[roachpb.NodeID(storeID)] = kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD detail := storePool.getStoreDetailLocked(storeID) detail.desc = &roachpb.StoreDescriptor{ StoreID: storeID, @@ -356,7 +356,7 @@ func mockStorePool( } } for _, storeID := range decommissioningStoreIDs { - liveNodeSet[roachpb.NodeID(storeID)] = kvserverpb.NodeLivenessStatus_DECOMMISSIONING + liveNodeSet[roachpb.NodeID(storeID)] = kvserverpb.NodeLivenessStatus_DEPRECATED_DECOMMISSIONING detail := storePool.getStoreDetailLocked(storeID) detail.desc = &roachpb.StoreDescriptor{ StoreID: storeID, @@ -364,7 +364,7 @@ func mockStorePool( } } for _, storeID := range decommissionedStoreIDs { - liveNodeSet[roachpb.NodeID(storeID)] = kvserverpb.NodeLivenessStatus_DECOMMISSIONED + liveNodeSet[roachpb.NodeID(storeID)] = kvserverpb.NodeLivenessStatus_DEPRECATED_DECOMMISSIONED detail := storePool.getStoreDetailLocked(storeID) detail.desc = &roachpb.StoreDescriptor{ StoreID: storeID, @@ -378,7 +378,7 @@ func mockStorePool( if status, ok := liveNodeSet[nodeID]; ok { return status } - return kvserverpb.NodeLivenessStatus_UNAVAILABLE + return kvserverpb.NodeLivenessStatus_DEPRECATED_UNAVAILABLE } } @@ -1230,7 +1230,7 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { stopper, g, _, storePool, nl := createTestStorePool( TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ - kvserverpb.NodeLivenessStatus_LIVE) + kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE) a := MakeAllocator(storePool, func(string) (time.Duration, bool) { return 0, true }) @@ -1250,7 +1250,7 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { sg.GossipStores(stores, t) // UNAVAILABLE is the node liveness status used for a node that's draining. - nl.setNodeStatus(1, kvserverpb.NodeLivenessStatus_UNAVAILABLE) + nl.setNodeStatus(1, kvserverpb.NodeLivenessStatus_DEPRECATED_UNAVAILABLE) existing := []roachpb.ReplicaDescriptor{ {StoreID: 1}, @@ -1625,7 +1625,7 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) { stopper, g, _, storePool, nl := createTestStorePool( TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ - kvserverpb.NodeLivenessStatus_LIVE) + kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE) a := MakeAllocator(storePool, func(string) (time.Duration, bool) { return 0, true }) @@ -1645,7 +1645,7 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) { sg.GossipStores(stores, t) // UNAVAILABLE is the node liveness status used for a node that's draining. - nl.setNodeStatus(1, kvserverpb.NodeLivenessStatus_UNAVAILABLE) + nl.setNodeStatus(1, kvserverpb.NodeLivenessStatus_DEPRECATED_UNAVAILABLE) testCases := []struct { leaseholder roachpb.StoreID @@ -3608,7 +3608,7 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { stopper, g, _, storePool, _ := createTestStorePool( TestTimeUntilStoreDeadOff, true, /* deterministic */ func() int { return 10 }, /* nodeCount */ - kvserverpb.NodeLivenessStatus_LIVE) + kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE) defer stopper.Stop(context.Background()) // 3 stores where the lease count for each store is equal to 10x the store ID. @@ -4971,7 +4971,7 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { stopper, _, _, sp, _ := createTestStorePool( TestTimeUntilStoreDeadOff, false, /* deterministic */ func() int { return numNodes }, - kvserverpb.NodeLivenessStatus_LIVE) + kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE) a := MakeAllocator(sp, func(string) (time.Duration, bool) { return 0, true }) @@ -5586,7 +5586,7 @@ func TestAllocatorFullDisks(t *testing.T) { const capacity = (1 << 30) + 1 const rangeSize = 16 << 20 - mockNodeLiveness := newMockNodeLiveness(kvserverpb.NodeLivenessStatus_LIVE) + mockNodeLiveness := newMockNodeLiveness(kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE) sp := NewStorePool( log.AmbientContext{Tracer: st.Tracer}, st, @@ -5628,7 +5628,7 @@ func TestAllocatorFullDisks(t *testing.T) { for i := 0; i < generations; i++ { // First loop through test stores and randomly add data. for j := 0; j < len(testStores); j++ { - if mockNodeLiveness.nodeLivenessFunc(roachpb.NodeID(j), time.Time{}, 0) == kvserverpb.NodeLivenessStatus_DEAD { + if mockNodeLiveness.nodeLivenessFunc(roachpb.NodeID(j), time.Time{}, 0) == kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD { continue } ts := &testStores[j] @@ -5643,7 +5643,7 @@ func TestAllocatorFullDisks(t *testing.T) { if ts.Capacity.Available <= 0 { t.Errorf("testStore %d ran out of space during generation %d (rangesAdded=%d/%d): %+v", j, i, rangesAdded, rangesToAdd, ts.Capacity) - mockNodeLiveness.setNodeStatus(roachpb.NodeID(j), kvserverpb.NodeLivenessStatus_DEAD) + mockNodeLiveness.setNodeStatus(roachpb.NodeID(j), kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) } wg.Add(1) if err := g.AddInfoProto(gossip.MakeStoreKey(roachpb.StoreID(j)), &ts.StoreDescriptor, 0); err != nil { @@ -5655,7 +5655,7 @@ func TestAllocatorFullDisks(t *testing.T) { // Loop through each store a number of times and maybe rebalance. for j := 0; j < 10; j++ { for k := 0; k < len(testStores); k++ { - if mockNodeLiveness.nodeLivenessFunc(roachpb.NodeID(k), time.Time{}, 0) == kvserverpb.NodeLivenessStatus_DEAD { + if mockNodeLiveness.nodeLivenessFunc(roachpb.NodeID(k), time.Time{}, 0) == kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD { continue } ts := &testStores[k] @@ -5689,11 +5689,11 @@ func TestAllocatorFullDisks(t *testing.T) { // Simulate rocksdb compactions freeing up disk space. for j := 0; j < len(testStores); j++ { - if mockNodeLiveness.nodeLivenessFunc(roachpb.NodeID(j), time.Time{}, 0) != kvserverpb.NodeLivenessStatus_DEAD { + if mockNodeLiveness.nodeLivenessFunc(roachpb.NodeID(j), time.Time{}, 0) != kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD { ts := &testStores[j] if ts.Capacity.Available <= 0 { t.Errorf("testStore %d ran out of space during generation %d: %+v", j, i, ts.Capacity) - mockNodeLiveness.setNodeStatus(roachpb.NodeID(j), kvserverpb.NodeLivenessStatus_DEAD) + mockNodeLiveness.setNodeStatus(roachpb.NodeID(j), kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) } else { ts.compact() } @@ -5737,7 +5737,7 @@ func Example_rebalancing() { func() int { return nodes }, - newMockNodeLiveness(kvserverpb.NodeLivenessStatus_LIVE).nodeLivenessFunc, + newMockNodeLiveness(kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE).nodeLivenessFunc, /* deterministic */ true, ) alloc := MakeAllocator(sp, func(string) (time.Duration, bool) { diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 740c46d312e6..14407eb9d5f2 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -239,7 +239,7 @@ func NewTestStorePool(cfg StoreConfig) *StorePool { return 1 }, func(roachpb.NodeID, time.Time, time.Duration) kvserverpb.NodeLivenessStatus { - return kvserverpb.NodeLivenessStatus_LIVE + return kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE }, /* deterministic */ false, ) diff --git a/pkg/kv/kvserver/kvserverpb/liveness.go b/pkg/kv/kvserver/kvserverpb/liveness.go index fb8a401c7ffa..44468b641bca 100644 --- a/pkg/kv/kvserver/kvserverpb/liveness.go +++ b/pkg/kv/kvserver/kvserverpb/liveness.go @@ -58,12 +58,14 @@ func (l *Liveness) String() string { // v20.2 we introduced a dedicated enum to be able to disambiguate between the // two. That being said, v20.2 nodes need to be able to operate in mixed // clusters with v20.1 nodes, that only know to interpret the boolean -// representation. EnsureCompatible is able to reconcile across both -// representations by mutating the receiver such that it's understood by both -// v20.1 and v20.2 nodes (See AssertValid for what this entails). -// If the receiver object is clearly one generated from a v20.1 node, we -// consider the deprecated boolean representation as the authoritative one. We -// consider the enum state authoritative if not. +// representation. +// +// EnsureCompatible is able to reconcile across both representations by mutating +// the receiver such that it's understood by both v20.1 and v20.2 nodes (See +// AssertValid for what this entails). If the receiver object is clearly one +// generated from a v20.1 node, we consider the deprecated boolean +// representation as the authoritative one. We consider the enum state +// authoritative if not. // // TODO(irfansharif): Remove this once v20.2 is cut. func (l *Liveness) EnsureCompatible() { diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index 026de689ee34..d0b8d79b6b38 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -48,6 +48,11 @@ var ( // when encountering this error. errChangeCommissionStatusFailed = errors.New("failed to change the commissioning status") + // ErrIllegalCommissionStatusTransition is returned when we're not allowed + // to set a target commission status because of the existing commission + // status. It's not a retryable error. + ErrIllegalCommissionTransition = errors.New("illegal commission status transition") + // ErrEpochIncremented is returned when a heartbeat request fails because // the underlying liveness record has had its epoch incremented. ErrEpochIncremented = errors.New("heartbeat failed on epoch increment") @@ -248,11 +253,11 @@ func (nl *NodeLiveness) sem(nodeID roachpb.NodeID) chan struct{} { func (nl *NodeLiveness) SetDraining(ctx context.Context, drain bool, reporter func(int, string)) { ctx = nl.ambientCtx.AnnotateCtx(ctx) for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { - livenessRec, err := nl.SelfEx() + oldLivenessRec, err := nl.SelfEx() if err != nil && !errors.Is(err, ErrNoLivenessRecord) { log.Errorf(ctx, "unexpected error getting liveness: %+v", err) } - err = nl.setDrainingInternal(ctx, livenessRec, drain, reporter) + err = nl.setDrainingInternal(ctx, oldLivenessRec, drain, reporter) if err != nil { if log.V(1) { log.Infof(ctx, "attempting to set liveness draining status to %v: %v", drain, err) @@ -338,10 +343,13 @@ func (nl *NodeLiveness) SetCommissionStatus( // liveness, the previous view is correct. This, too, is required to // de-flake TestNodeLivenessDecommissionAbsent. // - // We work off a copy of the liveness record. When - // XXX: I want to maybe update with the reconciled version, but - // commissionStatusInternal expects empty liveness, if none existed. - // Need to work off a copy. + // We work off a copy of the liveness record as the version we + // update with needs to be compatible with the v20.1 representation. + // `setCommissionStatusInternal` checks for the case where the + // liveness record is an empty one (i.e. no record existed + // previously). Since making the liveness record backwards + // compatible entails setting a non-zero value for the + // CommissionStatus, we have to use a copy here. oldLivenessRecCopy := oldLivenessRec oldLivenessRecCopy.Liveness.EnsureCompatible() nl.maybeUpdate(oldLivenessRecCopy) @@ -376,30 +384,34 @@ func (nl *NodeLiveness) setDrainingInternal( <-sem }() - update := livenessUpdate{ - old: oldLivenessRec.Liveness, - oldRaw: oldLivenessRec.raw, - ignoreCache: true, - } + // Lets compute what our new liveness record should be. + var newLiveness kvserverpb.Liveness if oldLivenessRec.Liveness == (kvserverpb.Liveness{}) { // Liveness record didn't previously exist, so we create one. - update.new = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 1, + newLiveness = kvserverpb.Liveness{ + NodeID: nodeID, + Epoch: 1, + CommissionStatus: kvserverpb.CommissionStatus_COMMISSIONED, } } else { - update.new = oldLivenessRec.Liveness + newLiveness = oldLivenessRec.Liveness + // We may have read a liveness record written by a v20.1 node, so we + // reconcile. + newLiveness.EnsureCompatible() } - if reporter != nil && drain && !update.new.Draining { + if reporter != nil && drain && !newLiveness.Draining { // Report progress to the Drain RPC. reporter(1, "liveness record") } - update.new.Draining = drain - // We may have read a liveness record written by a v20.1 node, so we - // reconcile. - update.new.EnsureCompatible() + newLiveness.Draining = drain + update := livenessUpdate{ + old: oldLivenessRec.Liveness, + new: newLiveness, + oldRaw: oldLivenessRec.raw, + ignoreCache: true, + } written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { // We may have read a liveness record written by a v20.1 node, so we // reconcile. @@ -478,11 +490,10 @@ func (nl *NodeLiveness) setCommissionStatusInternal( if oldLivenessRec.CommissionStatus == newLiveness.CommissionStatus { // No-op. Return early. - // XXX: Write tests for what gets inserted into event log, and what doesn't. return false, nil } else if oldLivenessRec.CommissionStatus.Decommissioned() && newLiveness.CommissionStatus.Decommissioning() { - // Also check for old == decomm'd and new == decomm'ing. + // Marking a decommissioned node for decommissioning is a no-op. return false, nil } @@ -493,22 +504,31 @@ func (nl *NodeLiveness) setCommissionStatusInternal( // - Decommissioning => Decommissioned // // See diagram above CommissionStatus type for more details. - // - // XXX: How to best surface these errors all the way up to the CLI output? - if newLiveness.CommissionStatus.Commissioned() && - !oldLivenessRec.Liveness.CommissionStatus.Decommissioning() { - return false, errors.Newf("can only recommission a decommissioning node, found %s", - oldLivenessRec.Liveness.CommissionStatus.String()) - } - if newLiveness.CommissionStatus.Decommissioning() && - !oldLivenessRec.Liveness.CommissionStatus.Commissioned() { - return false, errors.Newf("can only decommission a commissioned node, found %s", - oldLivenessRec.Liveness.CommissionStatus.String()) - } - if newLiveness.CommissionStatus.Decommissioned() && - !oldLivenessRec.Liveness.CommissionStatus.Decommissioning() { - return false, errors.Newf("can only fully decommission a decommissioning node, found %s", - oldLivenessRec.Liveness.CommissionStatus.String()) + + // ASK(reviewers): How to best surface these errors all the way up to the + // CLI output? I can't figure out what pieces I need from the errors package + // to make it work. + if oldLivenessRec.Liveness != (kvserverpb.Liveness{}) { + if newLiveness.CommissionStatus.Commissioned() && + !oldLivenessRec.Liveness.CommissionStatus.Decommissioning() { + err := errors.Newf("can only recommission a decommissioning node, found %s", + oldLivenessRec.Liveness.CommissionStatus.String()) + return false, errors.Mark(err, ErrIllegalCommissionTransition) + } + if newLiveness.CommissionStatus.Decommissioning() && + !oldLivenessRec.Liveness.CommissionStatus.Commissioned() { + // NB: This code-path is actually inaccessible, given the no-op + // conditions above. We keep it for clarity. + err := errors.Newf("can only decommission a commissioned node, found %s", + oldLivenessRec.Liveness.CommissionStatus.String()) + return false, errors.Mark(err, ErrIllegalCommissionTransition) + } + if newLiveness.CommissionStatus.Decommissioned() && + !oldLivenessRec.Liveness.CommissionStatus.Decommissioning() { + err := errors.Newf("can only fully decommission a decommissioning node, found %s", + oldLivenessRec.Liveness.CommissionStatus.String()) + return false, errors.Mark(err, ErrIllegalCommissionTransition) + } } update := livenessUpdate{ @@ -542,16 +562,6 @@ func (nl *NodeLiveness) setCommissionStatusInternal( return statusChanged, nil } -// XXX: Add a test for the following scenario. -// - Decommission n2 from n1, where n2 has no liveness record. n1 -// running v20.2 -// - n2 is running v20.1, tries to update it's liveness record. Finds -// it's own liveness record written by n1 (but the new proto, -// which it doesn't know how to read fully). Will it parse -// properly? It's important for v20.2 nodes to be able to read v20.1 and -// v20.2 representations. It's important for v20.2 to write a representation -// understood by both v20.1 and v20.2. - // GetLivenessThreshold returns the maximum duration between heartbeats // before a node is considered not-live. func (nl *NodeLiveness) GetLivenessThreshold() time.Duration { @@ -699,7 +709,7 @@ func (nl *NodeLiveness) Heartbeat(ctx context.Context, liveness kvserverpb.Liven } func (nl *NodeLiveness) heartbeatInternal( - ctx context.Context, liveness kvserverpb.Liveness, incrementEpoch bool, + ctx context.Context, oldLiveness kvserverpb.Liveness, incrementEpoch bool, ) error { ctx, sp := tracing.EnsureChildSpan(ctx, nl.ambientCtx.Tracer, "liveness heartbeat") defer sp.Finish() @@ -723,48 +733,44 @@ func (nl *NodeLiveness) heartbeatInternal( <-sem }() - update := livenessUpdate{ - old: liveness, - } - if liveness == (kvserverpb.Liveness{}) { + // Lets compute what our new liveness record should be. + var newLiveness kvserverpb.Liveness + if oldLiveness == (kvserverpb.Liveness{}) { // Liveness record didn't previously exist, so we create one. - // XXX: Expects empty liveness. - update.new = kvserverpb.Liveness{ + newLiveness = kvserverpb.Liveness{ NodeID: nodeID, Epoch: 1, CommissionStatus: kvserverpb.CommissionStatus_COMMISSIONED, } } else { - update.new = liveness + newLiveness = oldLiveness if incrementEpoch { - update.new.Epoch++ - // Clear draining field. - update.new.Draining = false + newLiveness.Epoch++ + newLiveness.Draining = false // Clear draining field. } + // We may have read a liveness record written by a v20.1 node, so we + // reconcile. + newLiveness.EnsureCompatible() } - // We may have read a liveness record written by a v20.1 node, so we - // reconcile. - // - // XXX: What's the guarantee that v21.1 nodes will never see v20.1 - // representation? A: Heartbeats are only ever sent out by live nodes, and - // operators aren't allowed to deploy clusters such that there's a 20.1 - // node heartbeating a 21.1 cluster. Still, can we foolproof this somehow? - update.new.EnsureCompatible() - // We need to add the maximum clock offset to the expiration because it's // used when determining liveness for a node. { - update.new.Expiration = hlc.LegacyTimestamp( + newLiveness.Expiration = hlc.LegacyTimestamp( nl.clock.Now().Add((nl.livenessThreshold).Nanoseconds(), 0)) // This guards against the system clock moving backwards. As long // as the cockroach process is running, checks inside hlc.Clock // will ensure that the clock never moves backwards, but these // checks don't work across process restarts. - if update.new.Expiration.Less(liveness.Expiration) { + if newLiveness.Expiration.Less(oldLiveness.Expiration) { return errors.Errorf("proposed liveness update expires earlier than previous record") } } + + update := livenessUpdate{ + old: oldLiveness, + new: newLiveness, + } written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { // Update liveness to actual value on mismatch. @@ -1155,7 +1161,6 @@ func (nl *NodeLiveness) maybeUpdate(new LivenessRecord) { // shouldReplaceLiveness checks to see if the new liveness, is infact, newer // than the old liveness. func shouldReplaceLiveness(old, new kvserverpb.Liveness) bool { - // XXX: Expects empty liveness. if (old == kvserverpb.Liveness{}) { return true } diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index 6ff5fe8d29b9..a36abcd475e8 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -882,10 +882,10 @@ func TestNodeLivenessStatusMap(t *testing.T) { // Below we're going to check that all statuses converge and stabilize // to a known situation. testData := []testCase{ - {liveNodeID, kvserverpb.NodeLivenessStatus_LIVE}, - {deadNodeID, kvserverpb.NodeLivenessStatus_DEAD}, - {decommissioningNodeID, kvserverpb.NodeLivenessStatus_DECOMMISSIONING}, - {removedNodeID, kvserverpb.NodeLivenessStatus_DECOMMISSIONED}, + {liveNodeID, kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE}, + {deadNodeID, kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD}, + {decommissioningNodeID, kvserverpb.NodeLivenessStatus_DEPRECATED_DECOMMISSIONING}, + {removedNodeID, kvserverpb.NodeLivenessStatus_DEPRECATED_DECOMMISSIONED}, } for _, test := range testData { @@ -999,9 +999,10 @@ func TestNodeLivenessDecommissionAbsent(t *testing.T) { // Pretend the node was once there but isn't gossiped anywhere. if err := mtc.dbs[0].CPut(ctx, keys.NodeLivenessKey(goneNodeID), &kvserverpb.Liveness{ - NodeID: goneNodeID, - Epoch: 1, - Expiration: hlc.LegacyTimestamp(mtc.clock().Now()), + NodeID: goneNodeID, + Epoch: 1, + Expiration: hlc.LegacyTimestamp(mtc.clock().Now()), + CommissionStatus: kvserverpb.CommissionStatus_COMMISSIONED, }, nil); err != nil { t.Fatal(err) } @@ -1042,71 +1043,3 @@ func TestNodeLivenessDecommissionAbsent(t *testing.T) { t.Fatal("no change committed") } } - -// TestMixedVersionNodeLiveness tests how liveness records are interpreted in -// mixed version clusters running v20.1 and v20.2 nodes. This is of interest -// given the proto representation for liveness records were changed between the -// two major versions. -// -// TODO(irfansharif): Remove this in v20.2. -func TestMixedVersionNodeLiveness(t *testing.T) { - defer leaktest.AfterTest(t)() - // XXX: Fill this in. - defer leaktest.AfterTest(t)() - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 3) - g := mtc.gossips[0] - - pauseNodeLivenessHeartbeats(mtc, true) - - // Verify liveness is properly initialized. This needs to be wrapped in a - // SucceedsSoon because node liveness gets initialized via an async gossip - // callback. - var liveness kvserver.LivenessRecord - testutils.SucceedsSoon(t, func() error { - var err error - liveness, err = mtc.nodeLivenesses[0].GetLiveness(g.NodeID.Get()) - return err - }) - if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness.Liveness); err != nil { - t.Fatal(err) - } - - // Gossip random nonsense for liveness and verify that asking for - // the node's own node ID returns the "correct" value. - key := gossip.MakeNodeLivenessKey(mtc.gossips[1].NodeID.Get()) - var count int32 - g.RegisterCallback(key, func(_ string, val roachpb.Value) { - atomic.AddInt32(&count, 1) - }) - testutils.SucceedsSoon(t, func() error { - // XXX: Use v20.1 liveness here. - newLiveness := liveness - newLiveness.Epoch = 42 // XXX: Do we need this? - - if err := g.AddInfoProto(key, &newLiveness, 0); err != nil { - t.Fatal(err) - } - if atomic.LoadInt32(&count) < 2 { // XXX: Why 2? - return errors.New("expected gossip callback to get triggered") - } - return nil - }) - - l := mtc.nodeLivenesses[0] - lGetRec, err := l.GetLiveness(mtc.gossips[1].NodeID.Get()) - require.NoError(t, err) - lGet := lGetRec.Liveness - lSelf, err := l.Self() - if err != nil { - t.Fatal(err) - } - if !reflect.DeepEqual(lGet, lSelf) { - t.Errorf("expected GetLiveness() to return same value as Self(): %+v != %+v", lGet, lSelf) - } - if lGet.Epoch == 2 { - t.Errorf("expected GetLiveness() not to return artificially gossiped liveness: %+v", lGet) - } - t.Logf("GetLiveness() returned liveness: %+v", lGet) -} diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index e351dd69081d..8c5f8cb734b0 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -126,7 +126,7 @@ func TestStorePoolGossipUpdate(t *testing.T) { stopper, g, _, sp, _ := createTestStorePool( TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 0 }, /* NodeCount */ - kvserverpb.NodeLivenessStatus_DEAD) + kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) defer stopper.Stop(context.Background()) sg := gossiputil.NewStoreGossiper(g) @@ -193,7 +193,7 @@ func TestStorePoolGetStoreList(t *testing.T) { stopper, g, _, sp, mnl := createTestStorePool( TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ - kvserverpb.NodeLivenessStatus_DEAD) + kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) defer stopper.Stop(context.Background()) sg := gossiputil.NewStoreGossiper(g) constraints := []zonepb.ConstraintsConjunction{ @@ -259,11 +259,11 @@ func TestStorePoolGetStoreList(t *testing.T) { // absentStore is purposefully not gossiped. }, t) for i := 1; i <= 7; i++ { - mnl.setNodeStatus(roachpb.NodeID(i), kvserverpb.NodeLivenessStatus_LIVE) + mnl.setNodeStatus(roachpb.NodeID(i), kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE) } // Set deadStore as dead. - mnl.setNodeStatus(deadStore.Node.NodeID, kvserverpb.NodeLivenessStatus_DEAD) + mnl.setNodeStatus(deadStore.Node.NodeID, kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) sp.detailsMu.Lock() // Set declinedStore as throttled. sp.detailsMu.storeDetails[declinedStore.StoreID].throttledUntil = sp.clock.Now().GoTime().Add(time.Hour) @@ -444,7 +444,7 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { stopper, g, _, sp, _ := createTestStorePool( TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ - kvserverpb.NodeLivenessStatus_DEAD) + kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) defer stopper.Stop(context.Background()) sg := gossiputil.NewStoreGossiper(g) stores := []*roachpb.StoreDescriptor{ @@ -565,7 +565,7 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { stopper, _, _, sp, _ := createTestStorePool( TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ - kvserverpb.NodeLivenessStatus_DEAD) + kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) defer stopper.Stop(ctx) // Create store. @@ -615,7 +615,7 @@ func TestStorePoolGetStoreDetails(t *testing.T) { stopper, g, _, sp, _ := createTestStorePool( TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ - kvserverpb.NodeLivenessStatus_DEAD) + kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) defer stopper.Stop(context.Background()) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(uniqueStore, t) @@ -635,7 +635,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) { stopper, g, _, sp, mnl := createTestStorePool( TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ - kvserverpb.NodeLivenessStatus_DEAD) + kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) defer stopper.Stop(context.Background()) sg := gossiputil.NewStoreGossiper(g) @@ -692,7 +692,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) { sg.GossipStores(stores, t) for i := 1; i <= 5; i++ { - mnl.setNodeStatus(roachpb.NodeID(i), kvserverpb.NodeLivenessStatus_LIVE) + mnl.setNodeStatus(roachpb.NodeID(i), kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE) } liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas) @@ -703,8 +703,8 @@ func TestStorePoolFindDeadReplicas(t *testing.T) { t.Fatalf("expected no dead replicas initially, found %d (%v)", len(deadReplicas), deadReplicas) } // Mark nodes 4 & 5 as dead. - mnl.setNodeStatus(4, kvserverpb.NodeLivenessStatus_DEAD) - mnl.setNodeStatus(5, kvserverpb.NodeLivenessStatus_DEAD) + mnl.setNodeStatus(4, kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) + mnl.setNodeStatus(5, kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas) if a, e := liveReplicas, replicas[:3]; !reflect.DeepEqual(a, e) { @@ -715,7 +715,7 @@ func TestStorePoolFindDeadReplicas(t *testing.T) { } // Mark node 4 as merely unavailable. - mnl.setNodeStatus(4, kvserverpb.NodeLivenessStatus_UNAVAILABLE) + mnl.setNodeStatus(4, kvserverpb.NodeLivenessStatus_DEPRECATED_UNAVAILABLE) liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas) if a, e := liveReplicas, replicas[:3]; !reflect.DeepEqual(a, e) { @@ -738,7 +738,7 @@ func TestStorePoolDefaultState(t *testing.T) { stopper, _, _, sp, _ := createTestStorePool( TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ - kvserverpb.NodeLivenessStatus_DEAD) + kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) defer stopper.Stop(context.Background()) liveReplicas, deadReplicas := sp.liveAndDeadReplicas([]roachpb.ReplicaDescriptor{{StoreID: 1}}) @@ -763,7 +763,7 @@ func TestStorePoolThrottle(t *testing.T) { stopper, g, _, sp, _ := createTestStorePool( TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ - kvserverpb.NodeLivenessStatus_DEAD) + kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) defer stopper.Stop(context.Background()) sg := gossiputil.NewStoreGossiper(g) @@ -801,7 +801,7 @@ func TestGetLocalities(t *testing.T) { stopper, g, _, sp, _ := createTestStorePool( TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ - kvserverpb.NodeLivenessStatus_DEAD) + kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) defer stopper.Stop(context.Background()) sg := gossiputil.NewStoreGossiper(g) @@ -872,7 +872,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { stopper, g, _, sp, mnl := createTestStorePool( TestTimeUntilStoreDead, false, /* deterministic */ func() int { return 10 }, /* nodeCount */ - kvserverpb.NodeLivenessStatus_DEAD) + kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) defer stopper.Stop(context.Background()) sg := gossiputil.NewStoreGossiper(g) @@ -929,7 +929,7 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { sg.GossipStores(stores, t) for i := 1; i <= 5; i++ { - mnl.setNodeStatus(roachpb.NodeID(i), kvserverpb.NodeLivenessStatus_LIVE) + mnl.setNodeStatus(roachpb.NodeID(i), kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE) } liveReplicas, deadReplicas := sp.liveAndDeadReplicas(replicas) @@ -940,9 +940,9 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { t.Fatalf("expected no dead replicas initially, found %d (%v)", len(deadReplicas), deadReplicas) } // Mark node 4 as decommissioning. - mnl.setNodeStatus(4, kvserverpb.NodeLivenessStatus_DECOMMISSIONING) + mnl.setNodeStatus(4, kvserverpb.NodeLivenessStatus_DEPRECATED_DECOMMISSIONING) // Mark node 5 as dead. - mnl.setNodeStatus(5, kvserverpb.NodeLivenessStatus_DEAD) + mnl.setNodeStatus(5, kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD) liveReplicas, deadReplicas = sp.liveAndDeadReplicas(replicas) // Decommissioning replicas are considered live. @@ -977,9 +977,10 @@ func TestNodeLivenessLivenessStatus(t *testing.T) { WallTime: now.Add(5 * time.Minute).UnixNano(), }, DeprecatedDecommissioning: false, + CommissionStatus: kvserverpb.CommissionStatus_COMMISSIONED, Draining: false, }, - expected: kvserverpb.NodeLivenessStatus_LIVE, + expected: kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE, }, { liveness: kvserverpb.Liveness{ @@ -990,9 +991,10 @@ func TestNodeLivenessLivenessStatus(t *testing.T) { WallTime: now.UnixNano() + 1, }, DeprecatedDecommissioning: false, + CommissionStatus: kvserverpb.CommissionStatus_COMMISSIONED, Draining: false, }, - expected: kvserverpb.NodeLivenessStatus_LIVE, + expected: kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE, }, // Expired status. { @@ -1004,9 +1006,10 @@ func TestNodeLivenessLivenessStatus(t *testing.T) { WallTime: now.UnixNano(), }, DeprecatedDecommissioning: false, + CommissionStatus: kvserverpb.CommissionStatus_COMMISSIONED, Draining: false, }, - expected: kvserverpb.NodeLivenessStatus_UNAVAILABLE, + expected: kvserverpb.NodeLivenessStatus_DEPRECATED_UNAVAILABLE, }, // Expired status. { @@ -1017,9 +1020,10 @@ func TestNodeLivenessLivenessStatus(t *testing.T) { WallTime: now.UnixNano(), }, DeprecatedDecommissioning: false, + CommissionStatus: kvserverpb.CommissionStatus_COMMISSIONED, Draining: false, }, - expected: kvserverpb.NodeLivenessStatus_UNAVAILABLE, + expected: kvserverpb.NodeLivenessStatus_DEPRECATED_UNAVAILABLE, }, // Max bound of expired. { @@ -1030,9 +1034,10 @@ func TestNodeLivenessLivenessStatus(t *testing.T) { WallTime: now.Add(-threshold).UnixNano() + 1, }, DeprecatedDecommissioning: false, + CommissionStatus: kvserverpb.CommissionStatus_COMMISSIONED, Draining: false, }, - expected: kvserverpb.NodeLivenessStatus_UNAVAILABLE, + expected: kvserverpb.NodeLivenessStatus_DEPRECATED_UNAVAILABLE, }, // Dead status. { @@ -1043,9 +1048,10 @@ func TestNodeLivenessLivenessStatus(t *testing.T) { WallTime: now.Add(-threshold).UnixNano(), }, DeprecatedDecommissioning: false, + CommissionStatus: kvserverpb.CommissionStatus_COMMISSIONED, Draining: false, }, - expected: kvserverpb.NodeLivenessStatus_DEAD, + expected: kvserverpb.NodeLivenessStatus_DEPRECATED_DEAD, }, // Decommissioning. { @@ -1056,9 +1062,10 @@ func TestNodeLivenessLivenessStatus(t *testing.T) { WallTime: now.Add(time.Second).UnixNano(), }, DeprecatedDecommissioning: true, + CommissionStatus: kvserverpb.CommissionStatus_DECOMMISSIONING, Draining: false, }, - expected: kvserverpb.NodeLivenessStatus_DECOMMISSIONING, + expected: kvserverpb.NodeLivenessStatus_DEPRECATED_DECOMMISSIONING, }, // Decommissioned. { @@ -1069,9 +1076,10 @@ func TestNodeLivenessLivenessStatus(t *testing.T) { WallTime: now.Add(-threshold).UnixNano(), }, DeprecatedDecommissioning: true, + CommissionStatus: kvserverpb.CommissionStatus_DECOMMISSIONED, Draining: false, }, - expected: kvserverpb.NodeLivenessStatus_DECOMMISSIONED, + expected: kvserverpb.NodeLivenessStatus_DEPRECATED_DECOMMISSIONED, }, // Draining (reports as unavailable). { @@ -1082,9 +1090,10 @@ func TestNodeLivenessLivenessStatus(t *testing.T) { WallTime: now.Add(5 * time.Minute).UnixNano(), }, DeprecatedDecommissioning: false, + CommissionStatus: kvserverpb.CommissionStatus_COMMISSIONED, Draining: true, }, - expected: kvserverpb.NodeLivenessStatus_UNAVAILABLE, + expected: kvserverpb.NodeLivenessStatus_DEPRECATED_UNAVAILABLE, }, } { t.Run("", func(t *testing.T) { diff --git a/pkg/server/admin_cluster_test.go b/pkg/server/admin_cluster_test.go index 037bf7653154..c3348d8e399d 100644 --- a/pkg/server/admin_cluster_test.go +++ b/pkg/server/admin_cluster_test.go @@ -178,7 +178,7 @@ func TestLivenessAPI(t *testing.T) { if !ok { return errors.Errorf("found no liveness status for node %d", s.NodeID()) } - if a, e := status, kvserverpb.NodeLivenessStatus_LIVE; a != e { + if a, e := status, kvserverpb.NodeLivenessStatus_DEPRECATED_LIVE; a != e { return errors.Errorf( "liveness status for node %s was %s, wanted %s", s.NodeID(), a, e, )