Skip to content

Commit

Permalink
Merge #124288
Browse files Browse the repository at this point in the history
124288: upgradecluster: retry until cluster stable with unavailable nodes r=rafiss a=kvoli

Nodes can be transiently unavailable (failing a heartbeat), in which case the upgrade manager will error out. Retry `UntilClusterStable` for up to 10 times when there are unavailable nodes before returning an error.

Resolves: #120521
Resolves: #121069
Resolves: #119696
Release note: None

Co-authored-by: Austen McClernon <austen@cockroachlabs.com>
  • Loading branch information
craig[bot] and kvoli committed Jun 4, 2024
2 parents f68825a + ab76750 commit 7efb667
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 29 deletions.
34 changes: 24 additions & 10 deletions pkg/upgrade/upgradecluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/rangedesc"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -66,23 +67,33 @@ func New(cfg ClusterConfig) *Cluster {

// UntilClusterStable is part of the upgrade.Cluster interface.
func (c *Cluster) UntilClusterStable(ctx context.Context, fn func() error) error {
ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness)
live, _, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness)
if err != nil {
return err
}

// Allow for several retries in case of transient node unavailability.
const maxUnavailableRetries = 10
unavailableRetries := 0

for {
if err := fn(); err != nil {
return err
}
curNodes, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness)
curLive, curUnavailable, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness)
if err != nil {
return err
}

if ok, diffs := ns.Identical(curNodes); !ok {
log.Infof(ctx, "%s, retrying", diffs)
ns = curNodes
if ok, diffs := live.Identical(curLive); !ok || curUnavailable != nil {
log.Infof(ctx, "%s, retrying, unavailable %v", diffs, curUnavailable)
live = curLive
if curUnavailable != nil {
unavailableRetries++
if unavailableRetries > maxUnavailableRetries {
return errors.Newf("nodes %v required, but unavailable", curUnavailable)
}
}
continue
}
break
Expand All @@ -92,29 +103,32 @@ func (c *Cluster) UntilClusterStable(ctx context.Context, fn func() error) error

// NumNodesOrTenantPods is part of the upgrade.Cluster interface.
func (c *Cluster) NumNodesOrServers(ctx context.Context) (int, error) {
ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness)
live, unavailable, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness)
if err != nil {
return 0, err
}
return len(ns), nil
if len(unavailable) > 0 {
return 0, errors.Newf("unavailable node(s): %v", unavailable)
}
return len(live), nil
}

// ForEveryNodeOrTenantPod is part of the upgrade.Cluster interface.
func (c *Cluster) ForEveryNodeOrServer(
ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error,
) error {

ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness)
live, _, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness)
if err != nil {
return err
}

// We'll want to rate limit outgoing RPCs (limit pulled out of thin air).
qp := quotapool.NewIntPool("every-node", 25)
log.Infof(ctx, "executing %s on nodes %s", redact.Safe(op), ns)
log.Infof(ctx, "executing %s on nodes %s", redact.Safe(op), live)
grp := ctxgroup.WithContext(ctx)

for _, node := range ns {
for _, node := range live {
id := node.ID // copy out of the loop variable
alloc, err := qp.Acquire(ctx, 1)
if err != nil {
Expand Down
20 changes: 13 additions & 7 deletions pkg/upgrade/upgradecluster/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestHelperEveryNode(t *testing.T) {
NodeLiveness: tc,
Dialer: NoopDialer{},
})
expRe := fmt.Sprintf("n%d required, but unavailable", downedNode)
expRe := fmt.Sprintf("nodes n\\{%d\\} required, but unavailable", downedNode)
opCount := 0
if err := h.UntilClusterStable(ctx, func() error {
return h.ForEveryNodeOrServer(ctx, "dummy-op", func(
Expand Down Expand Up @@ -157,11 +157,15 @@ func TestClusterNodes(t *testing.T) {

t.Run("retrieves-all", func(t *testing.T) {
nl := livenesspb.TestCreateNodeVitality(1, 2, 3)
ns, err := NodesFromNodeLiveness(ctx, nl)
ns, unavailable, err := NodesFromNodeLiveness(ctx, nl)
if err != nil {
t.Fatal(err)
}

if len(unavailable) > 0 {
t.Fatalf("expected no unavailable nodes, found %d", len(unavailable))
}

if got := len(ns); got != numNodes {
t.Fatalf("expected %d Nodes, got %d", numNodes, got)
}
Expand All @@ -182,7 +186,7 @@ func TestClusterNodes(t *testing.T) {
const decommissionedNode = 3
nl.Decommissioned(decommissionedNode, false)

ns, err := NodesFromNodeLiveness(ctx, nl)
ns, _, err := NodesFromNodeLiveness(ctx, nl)
if err != nil {
t.Fatal(err)
}
Expand All @@ -206,10 +210,12 @@ func TestClusterNodes(t *testing.T) {
const downedNode = 3
nl.DownNode(downedNode)

_, err := NodesFromNodeLiveness(ctx, nl)
expRe := fmt.Sprintf("n%d required, but unavailable", downedNode)
if !testutils.IsError(err, expRe) {
t.Fatalf("expected error %q, got %q", expRe, err)
_, unavailable, err := NodesFromNodeLiveness(ctx, nl)
if len(unavailable) != 1 {
t.Fatalf("expected 1 unavailable node, found %d", len(unavailable))
}
if err != nil {
t.Fatal(err)
}
})
}
24 changes: 12 additions & 12 deletions pkg/upgrade/upgradecluster/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

Expand All @@ -31,10 +30,10 @@ type Node struct {
type Nodes []Node

// NodesFromNodeLiveness returns the IDs and epochs for all nodes that are
// currently part of the cluster (i.e. they haven't been decommissioned away).
// Migrations have the pre-requisite that all nodes are up and running so that
// we're able to execute all relevant node-level operations on them. If any of
// the nodes are found to be unavailable, an error is returned.
// currently part of the cluster (i.e. they haven't been decommissioned away)
// and any nodes which are currently unavailable. Migrations have the
// pre-requisite that all nodes are up and running so that we're able to
// execute all relevant node-level operations on them.
//
// It's important to note that this makes no guarantees about new nodes
// being added to the cluster. It's entirely possible for that to happen
Expand All @@ -43,25 +42,26 @@ type Nodes []Node
// EveryNode.
func NodesFromNodeLiveness(
ctx context.Context, nl livenesspb.NodeVitalityInterface,
) (Nodes, error) {
var ns []Node
) (live, unavailable Nodes, _ error) {
ls, err := nl.ScanNodeVitalityFromKV(ctx)
if err != nil {
return nil, err
return nil, nil, err
}
for id, n := range ls {
if n.IsDecommissioned() {
continue
}
if !n.IsLive(livenesspb.Upgrade) {
return nil, errors.Newf("n%d required, but unavailable", id)
unavailable = append(unavailable, Node{ID: id, Epoch: n.GenLiveness().Epoch})
}
// TODO(baptist): Stop using Epoch, need to determine an alternative.
ns = append(ns, Node{ID: id, Epoch: n.GenLiveness().Epoch})
live = append(live, Node{ID: id, Epoch: n.GenLiveness().Epoch})

}
// Tests assume the nodes are sorted, so sort by node id first.
sort.Slice(ns, func(i, j int) bool { return ns[i].ID < ns[j].ID })
return ns, nil
sort.Slice(live, func(i, j int) bool { return live[i].ID < live[j].ID })
sort.Slice(unavailable, func(i, j int) bool { return unavailable[i].ID < unavailable[j].ID })
return live, unavailable, nil
}

// Identical returns whether or not two lists of Nodes are identical as sets,
Expand Down

0 comments on commit 7efb667

Please sign in to comment.