From ab76750e7deb45e4d5a94c1e659a1d5e0f09afa7 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 13 May 2024 15:10:04 -0400 Subject: [PATCH] upgradecluster: retry until cluster stable with unavailable nodes Nodes can be transiently unavailable (failing a heartbeat), in which case the upgrade manager will error out. Retry `UntilClusterStable` for up to 10 times when there are unavailable nodes before returning an error. Resolves: #120521 Resolves: #121069 Resolves: #119696 Release note: None --- pkg/upgrade/upgradecluster/cluster.go | 34 ++++++++++++++++------- pkg/upgrade/upgradecluster/helper_test.go | 20 ++++++++----- pkg/upgrade/upgradecluster/nodes.go | 24 ++++++++-------- 3 files changed, 49 insertions(+), 29 deletions(-) diff --git a/pkg/upgrade/upgradecluster/cluster.go b/pkg/upgrade/upgradecluster/cluster.go index ddfeba333340..41670a8c8799 100644 --- a/pkg/upgrade/upgradecluster/cluster.go +++ b/pkg/upgrade/upgradecluster/cluster.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/rangedesc" + "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" "google.golang.org/grpc" ) @@ -66,23 +67,33 @@ func New(cfg ClusterConfig) *Cluster { // UntilClusterStable is part of the upgrade.Cluster interface. func (c *Cluster) UntilClusterStable(ctx context.Context, fn func() error) error { - ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness) + live, _, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness) if err != nil { return err } + // Allow for several retries in case of transient node unavailability. + const maxUnavailableRetries = 10 + unavailableRetries := 0 + for { if err := fn(); err != nil { return err } - curNodes, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness) + curLive, curUnavailable, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness) if err != nil { return err } - if ok, diffs := ns.Identical(curNodes); !ok { - log.Infof(ctx, "%s, retrying", diffs) - ns = curNodes + if ok, diffs := live.Identical(curLive); !ok || curUnavailable != nil { + log.Infof(ctx, "%s, retrying, unavailable %v", diffs, curUnavailable) + live = curLive + if curUnavailable != nil { + unavailableRetries++ + if unavailableRetries > maxUnavailableRetries { + return errors.Newf("nodes %v required, but unavailable", curUnavailable) + } + } continue } break @@ -92,11 +103,14 @@ func (c *Cluster) UntilClusterStable(ctx context.Context, fn func() error) error // NumNodesOrTenantPods is part of the upgrade.Cluster interface. func (c *Cluster) NumNodesOrServers(ctx context.Context) (int, error) { - ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness) + live, unavailable, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness) if err != nil { return 0, err } - return len(ns), nil + if len(unavailable) > 0 { + return 0, errors.Newf("unavailable node(s): %v", unavailable) + } + return len(live), nil } // ForEveryNodeOrTenantPod is part of the upgrade.Cluster interface. @@ -104,17 +118,17 @@ func (c *Cluster) ForEveryNodeOrServer( ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error, ) error { - ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness) + live, _, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness) if err != nil { return err } // We'll want to rate limit outgoing RPCs (limit pulled out of thin air). qp := quotapool.NewIntPool("every-node", 25) - log.Infof(ctx, "executing %s on nodes %s", redact.Safe(op), ns) + log.Infof(ctx, "executing %s on nodes %s", redact.Safe(op), live) grp := ctxgroup.WithContext(ctx) - for _, node := range ns { + for _, node := range live { id := node.ID // copy out of the loop variable alloc, err := qp.Acquire(ctx, 1) if err != nil { diff --git a/pkg/upgrade/upgradecluster/helper_test.go b/pkg/upgrade/upgradecluster/helper_test.go index add3cf066eec..aff792d55397 100644 --- a/pkg/upgrade/upgradecluster/helper_test.go +++ b/pkg/upgrade/upgradecluster/helper_test.go @@ -117,7 +117,7 @@ func TestHelperEveryNode(t *testing.T) { NodeLiveness: tc, Dialer: NoopDialer{}, }) - expRe := fmt.Sprintf("n%d required, but unavailable", downedNode) + expRe := fmt.Sprintf("nodes n\\{%d\\} required, but unavailable", downedNode) opCount := 0 if err := h.UntilClusterStable(ctx, func() error { return h.ForEveryNodeOrServer(ctx, "dummy-op", func( @@ -157,11 +157,15 @@ func TestClusterNodes(t *testing.T) { t.Run("retrieves-all", func(t *testing.T) { nl := livenesspb.TestCreateNodeVitality(1, 2, 3) - ns, err := NodesFromNodeLiveness(ctx, nl) + ns, unavailable, err := NodesFromNodeLiveness(ctx, nl) if err != nil { t.Fatal(err) } + if len(unavailable) > 0 { + t.Fatalf("expected no unavailable nodes, found %d", len(unavailable)) + } + if got := len(ns); got != numNodes { t.Fatalf("expected %d Nodes, got %d", numNodes, got) } @@ -182,7 +186,7 @@ func TestClusterNodes(t *testing.T) { const decommissionedNode = 3 nl.Decommissioned(decommissionedNode, false) - ns, err := NodesFromNodeLiveness(ctx, nl) + ns, _, err := NodesFromNodeLiveness(ctx, nl) if err != nil { t.Fatal(err) } @@ -206,10 +210,12 @@ func TestClusterNodes(t *testing.T) { const downedNode = 3 nl.DownNode(downedNode) - _, err := NodesFromNodeLiveness(ctx, nl) - expRe := fmt.Sprintf("n%d required, but unavailable", downedNode) - if !testutils.IsError(err, expRe) { - t.Fatalf("expected error %q, got %q", expRe, err) + _, unavailable, err := NodesFromNodeLiveness(ctx, nl) + if len(unavailable) != 1 { + t.Fatalf("expected 1 unavailable node, found %d", len(unavailable)) + } + if err != nil { + t.Fatal(err) } }) } diff --git a/pkg/upgrade/upgradecluster/nodes.go b/pkg/upgrade/upgradecluster/nodes.go index 58644500e160..741582c15cd0 100644 --- a/pkg/upgrade/upgradecluster/nodes.go +++ b/pkg/upgrade/upgradecluster/nodes.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -31,10 +30,10 @@ type Node struct { type Nodes []Node // NodesFromNodeLiveness returns the IDs and epochs for all nodes that are -// currently part of the cluster (i.e. they haven't been decommissioned away). -// Migrations have the pre-requisite that all nodes are up and running so that -// we're able to execute all relevant node-level operations on them. If any of -// the nodes are found to be unavailable, an error is returned. +// currently part of the cluster (i.e. they haven't been decommissioned away) +// and any nodes which are currently unavailable. Migrations have the +// pre-requisite that all nodes are up and running so that we're able to +// execute all relevant node-level operations on them. // // It's important to note that this makes no guarantees about new nodes // being added to the cluster. It's entirely possible for that to happen @@ -43,25 +42,26 @@ type Nodes []Node // EveryNode. func NodesFromNodeLiveness( ctx context.Context, nl livenesspb.NodeVitalityInterface, -) (Nodes, error) { - var ns []Node +) (live, unavailable Nodes, _ error) { ls, err := nl.ScanNodeVitalityFromKV(ctx) if err != nil { - return nil, err + return nil, nil, err } for id, n := range ls { if n.IsDecommissioned() { continue } if !n.IsLive(livenesspb.Upgrade) { - return nil, errors.Newf("n%d required, but unavailable", id) + unavailable = append(unavailable, Node{ID: id, Epoch: n.GenLiveness().Epoch}) } // TODO(baptist): Stop using Epoch, need to determine an alternative. - ns = append(ns, Node{ID: id, Epoch: n.GenLiveness().Epoch}) + live = append(live, Node{ID: id, Epoch: n.GenLiveness().Epoch}) + } // Tests assume the nodes are sorted, so sort by node id first. - sort.Slice(ns, func(i, j int) bool { return ns[i].ID < ns[j].ID }) - return ns, nil + sort.Slice(live, func(i, j int) bool { return live[i].ID < live[j].ID }) + sort.Slice(unavailable, func(i, j int) bool { return unavailable[i].ID < unavailable[j].ID }) + return live, unavailable, nil } // Identical returns whether or not two lists of Nodes are identical as sets,