Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
github.com/confluentinc/ccloud-sdk-go-v2/sso v0.0.1
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0
github.com/confluentinc/cmf-sdk-go v0.0.5
github.com/confluentinc/cmf-sdk-go v0.0.7
github.com/confluentinc/confluent-kafka-go/v2 v2.14.1
github.com/confluentinc/go-editor v0.11.0
github.com/confluentinc/go-prompt v0.2.40
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0 h1:Wh3+AsUCncoxRPfs0zC
github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.5.0/go.mod h1:unZupel8OU3/o8MRcL9YiJo+56MalsCtHHCc/ZNi0BI=
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0 h1:rF9cKecDCowq+oDWjf8rSpXXZHAnVXowIsT/OXF4MOI=
github.com/confluentinc/ccloud-sdk-go-v2/usm v0.1.0/go.mod h1:umhEDvQp/5h0ALKBpYTQOmFwaWrvilnbE8Rkzh6oJ4Q=
github.com/confluentinc/cmf-sdk-go v0.0.5 h1:TS6S3ClVsM1kanB00mlcmqXczozDTO2t4Du5blDSYvE=
github.com/confluentinc/cmf-sdk-go v0.0.5/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg=
github.com/confluentinc/cmf-sdk-go v0.0.7 h1:+BS3A0v3K4VvzDjo1WwJMKWvlkmK7KK6hZSEzGaYRF4=
github.com/confluentinc/cmf-sdk-go v0.0.7/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg=
github.com/confluentinc/confluent-kafka-go/v2 v2.14.1 h1:DOm/3yIL7L8GOEa7TFDht6MiNa/FiOeb8kNjHr28S/4=
github.com/confluentinc/confluent-kafka-go/v2 v2.14.1/go.mod h1:aR1aciwbULyLhKkv9eq88JhS4XmGOusEnHZx1R93XZI=
github.com/confluentinc/go-editor v0.11.0 h1:fcEALYHj7xV/fRSp54/IHi2DS4GlZMJWVgrYvi/llvU=
Expand Down
12 changes: 10 additions & 2 deletions internal/flink/command_compute_pool.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package flink

import (
"fmt"

"github.com/spf13/cobra"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"

"github.com/confluentinc/cli/v4/pkg/config"
"github.com/confluentinc/cli/v4/pkg/flink"
)

type computePoolOut struct {
Expand Down Expand Up @@ -77,11 +80,16 @@ func convertSdkComputePoolToLocalComputePool(sdkComputePool cmfsdk.ComputePool)
},
}

if sdkComputePool.Status != nil {
if phase := extractComputePoolPhase(sdkComputePool); phase != "" {
localPool.Status = &LocalComputePoolStatus{
Phase: sdkComputePool.Status.Phase,
Phase: phase,
}
}

return localPool
}

func extractComputePoolPhase(pool cmfsdk.ComputePool) string {
Comment thread
channingdong marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we expecting similar *map[string]interface{} as a general data struct for future CMF development?

We have an ongoing initiative to automate the CP CMF CLI source code generation, and your team doesn't need to spend time on the standard CRUDL feature development anymore, so we'd like to have the custom functions as few as possible.

Can we explore ways to make this function more generic so that we can try to generate it whenever we have a need to extract certain key from a map?

Copy link
Copy Markdown
Author

@paras-negi-flink Paras Negi (paras-negi-flink) Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, it's realistically the only practical way to surface controller-driven values, and patterns like these will pop up from time to time.

Generalized to pkg/flink.GetMapField so auto-gen has a single helper to target.

phase, _ := flink.GetMapField[string](pool.GetStatus(), "phase", fmt.Sprintf("compute pool %q", pool.GetMetadata().Name))
return phase
}
6 changes: 3 additions & 3 deletions internal/flink/command_compute_pool_create_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ func (c *command) computePoolCreateOnPrem(cmd *cobra.Command, args []string) err
}
table.Add(&computePoolOutOnPrem{
CreationTime: creationTime,
Name: sdkComputePool.GetMetadata().Name,
Type: sdkComputePool.GetSpec().Type,
Phase: sdkOutputComputePool.GetStatus().Phase,
Name: sdkOutputComputePool.GetMetadata().Name,
Type: sdkOutputComputePool.GetSpec().Type,
Phase: extractComputePoolPhase(sdkOutputComputePool),
})
return table.Print()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_compute_pool_describe_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (c *command) computePoolDescribeOnPrem(cmd *cobra.Command, args []string) e
CreationTime: creationTime,
Name: sdkComputePool.GetMetadata().Name,
Type: sdkComputePool.GetSpec().Type,
Phase: sdkComputePool.GetStatus().Phase,
Phase: extractComputePoolPhase(sdkComputePool),
})
return table.Print()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_compute_pool_list_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *command) computePoolListOnPrem(cmd *cobra.Command, _ []string) error {
CreationTime: creationTime,
Name: pool.GetMetadata().Name,
Type: pool.GetSpec().Type,
Phase: pool.GetStatus().Phase,
Phase: extractComputePoolPhase(pool),
})
}
return list.Print()
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *command) environmentCreate(cmd *cobra.Command, args []string) error {
}

var postEnvironment cmfsdk.PostEnvironment
postEnvironment.Name = environmentName
postEnvironment.Name = &environmentName
postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed
postEnvironment.KubernetesNamespace = &kubernetesNamespace
postEnvironment.StatementDefaults = &defaultsStatementParsed
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_environment_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *command) environmentUpdate(cmd *cobra.Command, args []string) error {
}

var postEnvironment cmfsdk.PostEnvironment
postEnvironment.Name = environmentName
postEnvironment.Name = &environmentName
postEnvironment.FlinkApplicationDefaults = &defaultsApplicationParsed
postEnvironment.StatementDefaults = &defaultsStatementParsed
postEnvironment.ComputePoolDefaults = &defaultsComputePoolParsed
Expand Down
35 changes: 35 additions & 0 deletions pkg/flink/cmf_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package flink

import "github.com/confluentinc/cli/v4/pkg/log"

// GetMapField extracts a typed value of type T from an untyped map.
// cmf-sdk-go represents K8s/FKO-style fields (status, spec, metadata, defaults)
// as map[string]any since their schema is controller-driven and not statically
// known. This is the single safe entry point for reading those fields.
//
// Returns (zero, false) if the key is absent, nil, or not of type T. A type
// mismatch is logged at debug level — that's a server/schema contract
// violation, not a normal absence.
//
// For nested maps, call repeatedly:
//
// jobStatus, ok := GetMapField[map[string]any](status, "jobStatus", label)
// if ok {
// state, _ := GetMapField[string](jobStatus, "state", label)
// }
//
// contextLabel should identify the resource for actionable debug logs
// (e.g. `compute pool "foo"`).
func GetMapField[T any](m map[string]any, key, contextLabel string) (T, bool) {
var zero T
raw, ok := m[key]
if !ok || raw == nil {
return zero, false
}
v, ok := raw.(T)
if !ok {
log.CliLogger.Debugf("%s: %s has unexpected type %T, expected %T", contextLabel, key, raw, zero)
return zero, false
}
return v, true
}
65 changes: 65 additions & 0 deletions pkg/flink/cmf_map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package flink

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestGetMapField_String(t *testing.T) {

Check warning on line 9 in pkg/flink/cmf_map_test.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Rename function "TestGetMapField_String" to match the regular expression ^(_|[a-zA-Z0-9]+)$

[S100] Function names should comply with a naming convention See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3333&issues=a75b8890-8be4-47ec-b0fd-49882650c7d7&open=a75b8890-8be4-47ec-b0fd-49882650c7d7
tests := []struct {
name string
m map[string]any
want string
wantOk bool
}{
{name: "nil map", m: nil, want: "", wantOk: false},
{name: "empty map", m: map[string]any{}, want: "", wantOk: false},
{name: "key missing", m: map[string]any{"other": "x"}, want: "", wantOk: false},
{name: "value nil", m: map[string]any{"phase": nil}, want: "", wantOk: false},
{name: "string value", m: map[string]any{"phase": "RUNNING"}, want: "RUNNING", wantOk: true},
{name: "empty string", m: map[string]any{"phase": ""}, want: "", wantOk: true},
{name: "wrong type int", m: map[string]any{"phase": 42}, want: "", wantOk: false},
{name: "wrong type bool", m: map[string]any{"phase": true}, want: "", wantOk: false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, ok := GetMapField[string](tt.m, "phase", "test")
require.Equal(t, tt.wantOk, ok)
require.Equal(t, tt.want, got)
})
}
}

func TestGetMapField_Int64(t *testing.T) {

Check warning on line 34 in pkg/flink/cmf_map_test.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Rename function "TestGetMapField_Int64" to match the regular expression ^(_|[a-zA-Z0-9]+)$

[S100] Function names should comply with a naming convention See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3333&issues=461f82ca-dc73-420d-a242-91a30073112d&open=461f82ca-dc73-420d-a242-91a30073112d
got, ok := GetMapField[int64](map[string]any{"observedGeneration": int64(7)}, "observedGeneration", "test")
require.True(t, ok)
require.Equal(t, int64(7), got)

// JSON-decoded numbers commonly arrive as float64 in untyped maps; document the
// (sharp-edged) implication: callers extracting numerics from JSON-sourced maps
// must request the type the decoder actually produced (float64), not int64.
_, ok = GetMapField[int64](map[string]any{"observedGeneration": float64(7)}, "observedGeneration", "test")
require.False(t, ok)
}

func TestGetMapField_NestedMap(t *testing.T) {

Check warning on line 46 in pkg/flink/cmf_map_test.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Rename function "TestGetMapField_NestedMap" to match the regular expression ^(_|[a-zA-Z0-9]+)$

[S100] Function names should comply with a naming convention See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3333&issues=fac5c163-c8b9-43af-ac15-31f18c0bd98b&open=fac5c163-c8b9-43af-ac15-31f18c0bd98b
nested := map[string]any{"jobName": "foo", "state": "RUNNING"}
root := map[string]any{"jobStatus": nested}

got, ok := GetMapField[map[string]any](root, "jobStatus", "test")
require.True(t, ok)
require.Equal(t, nested, got)

// Compose for nested traversal.
state, ok := GetMapField[string](got, "state", "test")
require.True(t, ok)
require.Equal(t, "RUNNING", state)
}

func TestGetMapField_Slice(t *testing.T) {

Check warning on line 60 in pkg/flink/cmf_map_test.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Rename function "TestGetMapField_Slice" to match the regular expression ^(_|[a-zA-Z0-9]+)$

[S100] Function names should comply with a naming convention See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3333&issues=71edc099-1cba-4be6-bbb2-058297c644dd&open=71edc099-1cba-4be6-bbb2-058297c644dd
conditions := []any{"Ready", "Healthy"}
got, ok := GetMapField[[]any](map[string]any{"conditions": conditions}, "conditions", "test")
require.True(t, ok)
require.Equal(t, conditions, got)
}
6 changes: 3 additions & 3 deletions pkg/flink/cmf_rest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (cmfClient *CmfRestClient) UpdateApplication(ctx context.Context, environme
// CreateEnvironment Create an environment.
// Internally, since the call for Create and Update is the same, we check if the environment exists before creation.
func (cmfClient *CmfRestClient) CreateEnvironment(ctx context.Context, postEnvironment cmfsdk.PostEnvironment) (cmfsdk.Environment, error) {
environmentName := postEnvironment.Name
environmentName := postEnvironment.GetName()
_, httpResponse, _ := cmfClient.EnvironmentsApi.GetEnvironment(ctx, environmentName).Execute()
// check if the environment exists by checking the status code
if httpResponse != nil && httpResponse.StatusCode == http.StatusOK {
Expand Down Expand Up @@ -281,10 +281,10 @@ func (cmfClient *CmfRestClient) ListEnvironments(ctx context.Context) ([]cmfsdk.
return environments, nil
}

// UpdateEnvironment Create an environment.
// UpdateEnvironment updates an existing environment.
// Internally, since the call for Create and Update is the same, we check if the environment exists before updation.
func (cmfClient *CmfRestClient) UpdateEnvironment(ctx context.Context, postEnvironment cmfsdk.PostEnvironment) (cmfsdk.Environment, error) {
environmentName := postEnvironment.Name
environmentName := postEnvironment.GetName()
_, httpResponse, err := cmfClient.EnvironmentsApi.GetEnvironment(ctx, environmentName).Execute()
// check if the environment exists by checking the status code
if httpResponse != nil && httpResponse.StatusCode == http.StatusNotFound {
Expand Down
13 changes: 7 additions & 6 deletions test/test-server/flink_onprem_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@
func createComputePool(poolName, phase string) cmfsdk.ComputePool {
timeStamp := time.Date(2025, time.March, 12, 23, 42, 0, 0, time.UTC).String()

status := cmfsdk.ComputePoolStatus{
Phase: phase,
status := map[string]interface{}{
"phase": phase,
}

return cmfsdk.ComputePool{
Expand Down Expand Up @@ -364,14 +364,15 @@
err = json.Unmarshal(reqBody, &environment)
require.NoError(t, err)

if strings.Contains(environment.Name, "failure") {
envName := environment.GetName()
if strings.Contains(envName, "failure") {
http.Error(w, "", http.StatusUnprocessableEntity)
return
}

// Already existing environment: update
if environment.Name == "default" || environment.Name == "test" {
outputEnvironment := createEnvironment(environment.Name, environment.Name+"-namespace")
if envName == "default" || envName == "test" {
outputEnvironment := createEnvironment(envName, envName+"-namespace")

Check failure on line 375 in test/test-server/flink_onprem_handler.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define a constant instead of duplicating this literal "-namespace" 3 times.

[S1192] String literals should not be duplicated See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3333&issues=a78a9e34-1a20-4108-bfe9-7fe34ffb7cda&open=a78a9e34-1a20-4108-bfe9-7fe34ffb7cda
// This is a dummy update - only the defaults can be updated anyway.
outputEnvironment.FlinkApplicationDefaults = environment.FlinkApplicationDefaults
outputEnvironment.ComputePoolDefaults = environment.ComputePoolDefaults
Expand All @@ -382,7 +383,7 @@
}

// New environment: create
outputEnvironment := createEnvironment(environment.Name, environment.GetKubernetesNamespace())
outputEnvironment := createEnvironment(envName, environment.GetKubernetesNamespace())
outputEnvironment.FlinkApplicationDefaults = environment.FlinkApplicationDefaults
outputEnvironment.ComputePoolDefaults = environment.ComputePoolDefaults
outputEnvironment.StatementDefaults = environment.StatementDefaults
Expand Down