-
Notifications
You must be signed in to change notification settings - Fork 29
KSQL-14849: add 'confluent ksql cluster update --csu N' command #3368
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
0ff2c6b
18d52f4
9b9eee2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,170 @@ | ||
| package ksql | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "sort" | ||
|
|
||
| "github.com/spf13/cobra" | ||
|
|
||
| pcmd "github.com/confluentinc/cli/v4/pkg/cmd" | ||
| "github.com/confluentinc/cli/v4/pkg/errors" | ||
| "github.com/confluentinc/cli/v4/pkg/examples" | ||
| "github.com/confluentinc/cli/v4/pkg/output" | ||
| ) | ||
|
|
||
| // Valid CSU sizes that customers may target via self-serve cluster update. | ||
| // Mirrors the server-side authoritative list in cc-control-plane-ksql: | ||
| // internal/service/update_ksql_cluster_resize.go::validCSUSizes. | ||
| // Values 1, 2 are legacy and not user-selectable. Values above maxSelfServeCSU | ||
| // (the largest entry in this slice) still require a support ticket. | ||
| // | ||
| //nolint:gochecknoglobals | ||
| var validCsuSizes = []int32{4, 8, 12, 16, 20, 24, 28} | ||
|
|
||
| // maxSelfServeCSU is derived from validCsuSizes so the support-ticket | ||
| // threshold and the "Valid values" listing stay in lockstep — if the | ||
| // validCsuSizes slice is extended, the threshold moves with it. | ||
| // | ||
| //nolint:gochecknoglobals | ||
| var maxSelfServeCSU = func() int32 { | ||
| max := int32(0) | ||
| for _, v := range validCsuSizes { | ||
| if v > max { | ||
| max = v | ||
| } | ||
| } | ||
| return max | ||
| }() | ||
|
|
||
| func csuSupportTicketMessage() string { | ||
| return fmt.Sprintf( | ||
| "CSU values above %d require a support ticket. "+ | ||
| "Please contact Confluent Support to request a larger cluster size.", | ||
| maxSelfServeCSU) | ||
| } | ||
|
|
||
| func (c *ksqlCommand) newUpdateCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "update <id>", | ||
| Short: "Update a ksqlDB cluster.", | ||
| Long: buildUpdateLongDescription(), | ||
| Args: cobra.ExactArgs(1), | ||
| ValidArgsFunction: pcmd.NewValidArgsFunction(c.validArgs), | ||
| RunE: c.update, | ||
| // Hidden while the SDK call is shimmed (see Client.UpdateKsqlCluster | ||
| // in pkg/ccloudv2/ksql.go). Once the SDK is regenerated from cc-api | ||
| // PR #2507 and the shim is replaced with the real call, drop Hidden. | ||
| Hidden: true, | ||
| Example: examples.BuildExampleString( | ||
| examples.Example{ | ||
| Text: `Resize ksqlDB cluster "lksqlc-12345" to 8 CSUs.`, | ||
| Code: "confluent ksql cluster update lksqlc-12345 --csu 8", | ||
| }, | ||
| ), | ||
| } | ||
|
|
||
| cmd.Flags().Int32("csu", 0, fmt.Sprintf( | ||
| "Target number of CSUs for the cluster. Valid values: %s.", | ||
| formatCsuList(validCsuSizes))) | ||
| pcmd.AddContextFlag(cmd, c.CLICommand) | ||
| pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddOutputFlag(cmd) | ||
|
|
||
| cobra.CheckErr(cmd.MarkFlagRequired("csu")) | ||
|
|
||
| return cmd | ||
| } | ||
|
|
||
| func buildUpdateLongDescription() string { | ||
| return fmt.Sprintf( | ||
| "Update an existing ksqlDB cluster. Currently only the CSU count may be modified, "+ | ||
| "and only to larger sizes (shrink is not supported).\n\n"+ | ||
| "Valid CSU values are %s. Larger sizes require a support ticket. "+ | ||
| "The cluster will undergo a rolling restart to apply the new size; "+ | ||
| "the command returns once the resize has been accepted by the control plane.", | ||
| formatCsuList(validCsuSizes)) | ||
|
Comment on lines
+78
to
+85
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in |
||
| } | ||
|
|
||
| func (c *ksqlCommand) update(cmd *cobra.Command, args []string) error { | ||
| csu, err := cmd.Flags().GetInt32("csu") | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if err := validateCsuForUpdate(csu); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| environmentId, err := c.Context.EnvironmentId() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| clusterId := args[0] | ||
|
|
||
| // Pre-check current CSU so we can short-circuit a no-op locally before | ||
| // issuing the PATCH. The server-side validator also rejects no-op resizes | ||
| // with 400 ("new CSU size is the same as old CSU size, no-op"), but a | ||
| // client-side check produces a clearer message and avoids a wasted API | ||
| // round trip. Note: shrink is not supported server-side either. | ||
| current, err := c.V2Client.DescribeKsqlCluster(clusterId, environmentId) | ||
| if err != nil { | ||
| return errors.CatchKSQLNotFoundError(err, clusterId) | ||
| } | ||
| currentCsu := current.Spec.GetCsu() | ||
| if currentCsu == csu { | ||
| return fmt.Errorf("ksqlDB cluster %q is already at %d CSUs; no change requested", | ||
| clusterId, csu) | ||
| } | ||
| if csu < currentCsu { | ||
| return fmt.Errorf("ksqlDB cluster %q is currently %d CSUs; shrinking is not supported "+ | ||
| "(target %d < current %d)", clusterId, currentCsu, csu, currentCsu) | ||
| } | ||
|
|
||
| cluster, err := c.V2Client.UpdateKsqlCluster(clusterId, environmentId, csu) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Print the rolling-restart notice only AFTER the PATCH was accepted — | ||
| // otherwise a failed call (e.g., a 4xx from the server) would leave the | ||
| // customer with a misleading "Resizing…" message even though no resize | ||
| // is happening. | ||
| output.ErrPrintf(c.Config.EnableColor, | ||
| "Resizing ksqlDB cluster %q from %d to %d CSUs. A rolling restart will be "+ | ||
| "performed asynchronously; the cluster will continue serving queries during the resize.\n", | ||
| clusterId, currentCsu, csu) | ||
|
|
||
| table := output.NewTable(cmd) | ||
| table.Add(c.formatClusterForDisplayAndList(&cluster)) | ||
| return table.Print() | ||
| } | ||
|
|
||
| // validateCsuForUpdate returns nil if csu is in validCsuSizes, and a | ||
| // customer-safe error otherwise. The server-side check in | ||
| // cc-control-plane-ksql is authoritative; this client-side validation exists | ||
| // to fail fast with a clearer message before issuing the API call. | ||
| func validateCsuForUpdate(csu int32) error { | ||
| if csu > maxSelfServeCSU { | ||
| return fmt.Errorf("%d CSUs: %s", csu, csuSupportTicketMessage()) | ||
| } | ||
|
Comment on lines
+146
to
+149
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in |
||
| for _, valid := range validCsuSizes { | ||
| if csu == valid { | ||
| return nil | ||
| } | ||
| } | ||
| return fmt.Errorf("%d is not a valid CSU size for cluster update. Valid sizes are %s", | ||
| csu, formatCsuList(validCsuSizes)) | ||
| } | ||
|
|
||
| func formatCsuList(sizes []int32) string { | ||
| sorted := append([]int32(nil), sizes...) | ||
| sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] }) | ||
| out := "" | ||
| for i, s := range sorted { | ||
| if i > 0 { | ||
| out += ", " | ||
| } | ||
| out += fmt.Sprintf("%d", s) | ||
| } | ||
| return out | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| package ksql | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestValidateCsuForUpdate(t *testing.T) { | ||
| tests := []struct { | ||
| name string | ||
| csu int32 | ||
| expectErr bool | ||
| errContains string | ||
| }{ | ||
| {name: "valid 4", csu: 4}, | ||
| {name: "valid 8", csu: 8}, | ||
| {name: "valid 12", csu: 12}, | ||
| {name: "valid 16", csu: 16}, | ||
| {name: "valid 20", csu: 20}, | ||
| {name: "valid 24", csu: 24}, | ||
| {name: "valid 28", csu: 28}, | ||
| { | ||
| name: "legacy size 1 rejected", | ||
| csu: 1, | ||
| expectErr: true, | ||
| errContains: "not a valid CSU size", | ||
| }, | ||
| { | ||
| name: "legacy size 2 rejected", | ||
| csu: 2, | ||
| expectErr: true, | ||
| errContains: "not a valid CSU size", | ||
| }, | ||
| { | ||
| name: "in-range but non-canonical (5) rejected", | ||
| csu: 5, | ||
| expectErr: true, | ||
| errContains: "not a valid CSU size", | ||
| }, | ||
| { | ||
| name: "in-range but non-canonical (10) rejected", | ||
| csu: 10, | ||
| expectErr: true, | ||
| errContains: "not a valid CSU size", | ||
| }, | ||
| { | ||
| name: "above 28 routes to support-ticket message", | ||
| csu: 32, | ||
| expectErr: true, | ||
| errContains: "support ticket", | ||
| }, | ||
| { | ||
| name: "well above ceiling routes to support-ticket message", | ||
| csu: 128, | ||
| expectErr: true, | ||
| errContains: "support ticket", | ||
| }, | ||
| } | ||
|
|
||
| for _, tc := range tests { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| err := validateCsuForUpdate(tc.csu) | ||
| if tc.expectErr { | ||
| require.Error(t, err) | ||
| require.Contains(t, err.Error(), tc.errContains) | ||
| } else { | ||
| require.NoError(t, err) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func TestFormatCsuList(t *testing.T) { | ||
| require.Equal(t, "4, 8, 12, 16, 20, 24, 28", formatCsuList(validCsuSizes)) | ||
| // Input order should not matter; output is sorted ascending. | ||
| require.Equal(t, "4, 8, 16", formatCsuList([]int32{16, 4, 8})) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in
9b9eee25: addedHidden: trueon the cobra command. The subcommand is still reachable for testing and review, but won't appear inksql cluster --helpor in customer-facing discovery. DropHiddenwhen the SDK is regenerated and the shim is replaced with the real PATCH call.