Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver,cli,roachtest,sql: introduce a fully decommissioned bit #50329

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>20.1-10</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>20.1-11</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
12 changes: 7 additions & 5 deletions pkg/cli/cliflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,14 +939,16 @@ drain all active client connections and migrate away range leases.`,
Wait = FlagInfo{
Name: "wait",
Description: `
Specifies when to return after having marked the targets as decommissioning.
Specifies when to return during the decommissioning process.
Takes any of the following values:
<PRE>

- all: waits until all target nodes' replica counts have dropped to zero.
This is the default.
- none: marks the targets as decommissioning, but does not wait for the
process to complete. Use when polling manually from an external system.
- all waits until all target nodes' replica counts have dropped to zero and
marks the nodes as fully decommissioned. This is the default.
- none marks the targets as decommissioning, but does not wait for the
replica counts to drop to zero before returning. If the replica counts
are found to be zero, nodes are marked as fully decommissioned. Use
when polling manually from an external system.
</PRE>`,
}

Expand Down
55 changes: 51 additions & 4 deletions pkg/cli/demo_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cli/cliflags"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security"
Expand Down Expand Up @@ -361,17 +362,17 @@ func (c *transientCluster) DrainAndShutdown(nodeID roachpb.NodeID) error {
return nil
}

// CallDecommission calls the Decommission RPC on a node.
func (c *transientCluster) CallDecommission(nodeID roachpb.NodeID, decommissioning bool) error {
// Recommission recommissions a given node.
func (c *transientCluster) Recommission(nodeID roachpb.NodeID) error {
nodeIndex := int(nodeID - 1)

if nodeIndex < 0 || nodeIndex >= len(c.servers) {
return errors.Errorf("node %d does not exist", nodeID)
}

req := &serverpb.DecommissionRequest{
NodeIDs: []roachpb.NodeID{nodeID},
Decommissioning: decommissioning,
NodeIDs: []roachpb.NodeID{nodeID},
TargetMembership: kvserverpb.MembershipStatus_ACTIVE,
}

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -387,6 +388,52 @@ func (c *transientCluster) CallDecommission(nodeID roachpb.NodeID, decommissioni
if err != nil {
return errors.Wrap(err, "while trying to mark as decommissioning")
}

return nil
}

// Decommission decommissions a given node.
func (c *transientCluster) Decommission(nodeID roachpb.NodeID) error {
nodeIndex := int(nodeID - 1)

if nodeIndex < 0 || nodeIndex >= len(c.servers) {
return errors.Errorf("node %d does not exist", nodeID)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

adminClient, finish, err := getAdminClient(ctx, *(c.s.Cfg))
if err != nil {
return err
}
defer finish()

// This (cumbersome) two step process is due to the allowed state
// transitions for membership status. To mark a node as fully
// decommissioned, it has to be marked as decommissioning first.
{
req := &serverpb.DecommissionRequest{
NodeIDs: []roachpb.NodeID{nodeID},
TargetMembership: kvserverpb.MembershipStatus_DECOMMISSIONING,
}
_, err = adminClient.Decommission(ctx, req)
if err != nil {
return errors.Wrap(err, "while trying to mark as decommissioning")
}
}

{
req := &serverpb.DecommissionRequest{
NodeIDs: []roachpb.NodeID{nodeID},
TargetMembership: kvserverpb.MembershipStatus_DECOMMISSIONED,
}
_, err = adminClient.Decommission(ctx, req)
if err != nil {
return errors.Wrap(err, "while trying to mark as decommissioned")
}
}

return nil
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/cli/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,14 @@ func MaybeDecorateGRPCError(
if pgcode.MakeCode(string(wErr.Code)) == pgcode.ProtocolViolation {
return connSecurityHint()
}

// Are we running a v20.2 binary against a v20.1 server?
if strings.Contains(wErr.Message, "column \"membership\" does not exist") {
// The v20.2 binary makes use of columns not present in v20.1,
// so this is a disallowed operation. Surface a better error
// code here.
return fmt.Errorf("cannot use a v20.2 cli against servers running v20.1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @knz just to make sure this is ok - we want the 20.2 binary to use this new column name (the old one stays around for 20.1 usage). Or is there a simple version check we can use to determine the column to use? EIther way, if you feel like we don't need to support it, I'd prefer to keep this simple.

}
// Otherwise, there was a regular SQL error. Just report
// that.
return err
Expand Down
45 changes: 19 additions & 26 deletions pkg/cli/interactive_tests/test_demo_node_cmds.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ eexpect "node 2 is already running"
send "\\demo shutdown 3\r"
eexpect "node 3 has been shutdown"

send "select node_id, draining, decommissioning from crdb_internal.gossip_liveness ORDER BY node_id;\r"
eexpect "1 | false | false"
eexpect "2 | false | false"
eexpect "3 | true | false"
eexpect "4 | false | false"
eexpect "5 | false | false"
send "select node_id, draining, decommissioning, membership from crdb_internal.gossip_liveness ORDER BY node_id;\r"
eexpect "1 | false | false | active"
eexpect "2 | false | false | active"
eexpect "3 | true | false | active"
eexpect "4 | false | false | active"
eexpect "5 | false | false | active"

# Cannot shut it down again.
send "\\demo shutdown 3\r"
Expand All @@ -55,33 +55,26 @@ eexpect "movr>"
send "\\demo restart 3\r"
eexpect "node 3 has been restarted"

send "select node_id, draining, decommissioning from crdb_internal.gossip_liveness ORDER BY node_id;\r"
eexpect "1 | false | false"
eexpect "2 | false | false"
eexpect "3 | false | false"
eexpect "4 | false | false"
eexpect "5 | false | false"
send "select node_id, draining, decommissioning, membership from crdb_internal.gossip_liveness ORDER BY node_id;\r"
eexpect "1 | false | false | active"
eexpect "2 | false | false | active"
eexpect "3 | false | false | active"
eexpect "4 | false | false | active"
eexpect "5 | false | false | active"

# Try commissioning commands
send "\\demo decommission 4\r"
eexpect "node 4 has been decommissioned"

send "select node_id, draining, decommissioning from crdb_internal.gossip_liveness ORDER BY node_id;\r"
eexpect "1 | false | false"
eexpect "2 | false | false"
eexpect "3 | false | false"
eexpect "4 | false | true"
eexpect "5 | false | false"
send "select node_id, draining, decommissioning, membership from crdb_internal.gossip_liveness ORDER BY node_id;\r"
eexpect "1 | false | false | active"
eexpect "2 | false | false | active"
eexpect "3 | false | false | active"
eexpect "4 | false | true | decommissioned"
eexpect "5 | false | false | active"

send "\\demo recommission 4\r"
eexpect "node 4 has been recommissioned"

send "select node_id, draining, decommissioning from crdb_internal.gossip_liveness ORDER BY node_id;\r"
eexpect "1 | false | false"
eexpect "2 | false | false"
eexpect "3 | false | false"
eexpect "4 | false | false"
eexpect "5 | false | false"
eexpect "can only recommission a decommissioning node"

interrupt
eexpect eof
Expand Down
11 changes: 4 additions & 7 deletions pkg/cli/interactive_tests/test_multiple_nodes.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ eexpect eof

end_test

start_test "Check that a recommissioning an active node prints out a warning"
spawn $argv node recommission 2
eexpect "warning: node 2 is not decommissioned"
eexpect eof

start_test "Check that a double decommission prints out a warning"
spawn $argv node decommission 2 --wait none
Expand All @@ -30,13 +34,6 @@ eexpect "warning: node 2 is already decommissioning or decommissioned"
eexpect eof
end_test

start_test "Check that a double recommission prints out a warning"
spawn $argv node recommission 2
eexpect eof

spawn $argv node recommission 2
eexpect "warning: node 2 is not decommissioned"
eexpect eof
end_test


Expand Down
72 changes: 58 additions & 14 deletions pkg/cli/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -64,9 +66,10 @@ func runLsNodes(cmd *cobra.Command, args []string) error {
_, rows, err := runQuery(
conn,
makeQuery(`SELECT node_id FROM crdb_internal.gossip_liveness
WHERE decommissioning = false OR split_part(expiration,',',1)::decimal > now()::decimal`),
WHERE membership = 'active' OR split_part(expiration,',',1)::decimal > now()::decimal`),
irfansharif marked this conversation as resolved.
Show resolved Hide resolved
false,
)

if err != nil {
return err
}
Expand Down Expand Up @@ -105,6 +108,7 @@ var statusNodesColumnHeadersForStats = []string{
var statusNodesColumnHeadersForDecommission = []string{
"gossiped_replicas",
"is_decommissioning",
"membership",
"is_draining",
}

Expand Down Expand Up @@ -143,7 +147,7 @@ func runStatusNodeInner(showDecommissioned bool, args []string) ([]string, [][]s

maybeAddActiveNodesFilter := func(query string) string {
if !showDecommissioned {
query += " WHERE decommissioning = false OR split_part(expiration,',',1)::decimal > now()::decimal"
query += " WHERE membership = 'active' OR split_part(expiration,',',1)::decimal > now()::decimal"
}
return query
}
Expand Down Expand Up @@ -184,10 +188,12 @@ SELECT node_id AS id,
FROM crdb_internal.kv_store_status
GROUP BY node_id`

// TODO(irfansharif): Remove the `is_decommissioning` column in v20.2.
const decommissionQuery = `
SELECT node_id AS id,
ranges AS gossiped_replicas,
decommissioning AS is_decommissioning,
membership != 'active' as is_decommissioning,
membership AS membership,
draining AS is_draining
FROM crdb_internal.gossip_liveness LEFT JOIN crdb_internal.gossip_nodes USING (node_id)`

Expand Down Expand Up @@ -274,6 +280,7 @@ var decommissionNodesColumnHeaders = []string{
"is_live",
"replicas",
"is_decommissioning",
"membership",
"is_draining",
}

Expand Down Expand Up @@ -330,7 +337,6 @@ func runDecommissionNode(cmd *cobra.Command, args []string) error {
}

c := serverpb.NewAdminClient(conn)

return runDecommissionNodeImpl(ctx, c, nodeCtx.nodeDecommissionWait, nodeIDs)
}

Expand Down Expand Up @@ -409,11 +415,17 @@ func runDecommissionNodeImpl(
MaxBackoff: 20 * time.Second,
}

// Marking a node as fully decommissioned is driven by a two-step process.
// We start off by marking each node as 'decommissioning'. In doing so,
// replicas are slowly moved off of these nodes. It's only after when we're
// made aware that the replica counts have all hit zero, and that all nodes
// have been successfully marked as 'decommissioning', that we then go and
// mark each node as 'decommissioned'.
prevResponse := serverpb.DecommissionStatusResponse{}
for r := retry.StartWithCtx(ctx, opts); r.Next(); {
req := &serverpb.DecommissionRequest{
NodeIDs: nodeIDs,
Decommissioning: true,
NodeIDs: nodeIDs,
TargetMembership: kvserverpb.MembershipStatus_DECOMMISSIONING,
}
resp, err := c.Decommission(ctx, req)
if err != nil {
Expand All @@ -430,18 +442,43 @@ func runDecommissionNodeImpl(
} else {
fmt.Fprintf(stderr, ".")
}

anyActive := false
var replicaCount int64
allDecommissioning := true
for _, status := range resp.Status {
anyActive = anyActive || status.Membership.Active()
replicaCount += status.ReplicaCount
allDecommissioning = allDecommissioning && status.Decommissioning
}
if replicaCount == 0 && allDecommissioning {

if !anyActive && replicaCount == 0 {
// We now mark the nodes as fully decommissioned.
req := &serverpb.DecommissionRequest{
NodeIDs: nodeIDs,
TargetMembership: kvserverpb.MembershipStatus_DECOMMISSIONED,
}
resp, err := c.Decommission(ctx, req)
if err != nil {
fmt.Fprintln(stderr)
return errors.Wrap(err, "while trying to mark as decommissioned")
}
if !reflect.DeepEqual(&prevResponse, resp) {
fmt.Fprintln(stderr)
if err := printDecommissionStatus(*resp); err != nil {
return err
}
prevResponse = *resp
}

fmt.Fprintln(os.Stdout, "\nNo more data reported on target nodes. "+
"Please verify cluster health before removing the nodes.")
return nil
}

if wait == nodeDecommissionWaitNone {
// The intent behind --wait=none is for it to be used when polling
// manually from an external system. We'll only mark nodes as
// fully decommissioned once the replica count hits zero and they're
// all marked as decommissioning.
return nil
}
if replicaCount < minReplicaCount {
Expand All @@ -453,7 +490,7 @@ func runDecommissionNodeImpl(
}

func decommissionResponseAlignment() string {
return "rcrcc"
return "rcrccc"
}

// decommissionResponseValueToRows converts DecommissionStatusResponse_Status to
Expand All @@ -468,7 +505,8 @@ func decommissionResponseValueToRows(
strconv.FormatInt(int64(node.NodeID), 10),
strconv.FormatBool(node.IsLive),
strconv.FormatInt(node.ReplicaCount, 10),
strconv.FormatBool(node.Decommissioning),
strconv.FormatBool(!node.Membership.Active()),
node.Membership.String(),
strconv.FormatBool(node.Draining),
})
}
Expand Down Expand Up @@ -522,13 +560,19 @@ func runRecommissionNode(cmd *cobra.Command, args []string) error {
}

c := serverpb.NewAdminClient(conn)

req := &serverpb.DecommissionRequest{
NodeIDs: nodeIDs,
Decommissioning: false,
NodeIDs: nodeIDs,
TargetMembership: kvserverpb.MembershipStatus_ACTIVE,
}
resp, err := c.Decommission(ctx, req)
if err != nil {
// If it's a specific illegal membership transition error, we try to
// surface a more readable message to the user. See
// ValidateLivenessTransition in kvserverpb/liveness.go for where this
// error is generated.
if s, ok := status.FromError(err); ok && s.Code() == codes.FailedPrecondition {
return errors.Newf("%s", s.Message())
}
return err
}
return printDecommissionStatus(*resp)
Expand Down
Loading