Skip to content

Commit

Permalink
Fix flink artifact list (#2686)
Browse files Browse the repository at this point in the history
  • Loading branch information
brianstrauch committed Mar 14, 2024
1 parent 2b19b8a commit 0f32a57
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 55 deletions.
18 changes: 6 additions & 12 deletions internal/flink/command_artifact_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,19 @@ func (c *command) newListCommand() *cobra.Command {
}

func (c *command) list(cmd *cobra.Command, _ []string) error {
cloud, err := cmd.Flags().GetString("cloud")
if err != nil {
return err
}

plugins, err := c.V2Client.ListCustomPlugins(cloud)
plugins, err := c.V2Client.ListCustomPlugins("")
if err != nil {
return err
}

list := output.NewList(cmd)
for _, plugin := range plugins {
if plugin.GetConnectorType() != "flink-udf" {
continue
if plugin.GetConnectorType() == "flink-udf" {
list.Add(&customPluginOutList{
Name: plugin.GetDisplayName(),
Id: plugin.GetId(),
})
}
list.Add(&customPluginOutList{
Name: plugin.GetDisplayName(),
Id: plugin.GetId(),
})
}
return list.Print()
}
5 changes: 5 additions & 0 deletions test/fixtures/output/connect/custom-plugin/list-json.golden
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,10 @@
"id": "ccp-789012",
"name": "CliPluginTest2",
"cloud": "aws"
},
{
"id": "ccp-789013",
"name": "CliPluginTest3",
"cloud": "aws"
}
]
3 changes: 3 additions & 0 deletions test/fixtures/output/connect/custom-plugin/list-yaml.golden
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
- id: ccp-789012
name: CliPluginTest2
cloud: aws
- id: ccp-789013
name: CliPluginTest3
cloud: aws
1 change: 1 addition & 0 deletions test/fixtures/output/connect/custom-plugin/list.golden
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
-------------+----------------+--------
ccp-123456 | CliPluginTest1 | aws
ccp-789012 | CliPluginTest2 | aws
ccp-789013 | CliPluginTest3 | aws
3 changes: 3 additions & 0 deletions test/fixtures/output/flink/artifact/list.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ID | Name
-------------+-----------------
ccp-789013 | CliPluginTest3
11 changes: 11 additions & 0 deletions test/flink_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package test

func (s *CLITestSuite) TestFlinkArtifactList() {
tests := []CLITest{
{args: "flink artifact list", fixture: "flink/artifact/list.golden"},
}

for _, test := range tests {
test.login = "cloud"
s.runIntegrationTest(test)
}
}

func (s *CLITestSuite) TestFlinkComputePool() {
tests := []CLITest{
{args: "flink compute-pool create my-compute-pool --cloud aws --region us-west-2", fixture: "flink/compute-pool/create.golden"},
Expand Down
4 changes: 2 additions & 2 deletions test/test-server/ccloudv2_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ var ccloudV2Routes = []route{
{"/connect/v1/environments/{env}/clusters/{clusters}/connectors/{connector}/config", handleConnectorConfig},
{"/connect/v1/environments/{env}/clusters/{clusters}/connectors/{connector}/pause", handleConnectorPause},
{"/connect/v1/environments/{env}/clusters/{clusters}/connectors/{connector}/resume", handleConnectorResume},
{"/connect/v1/custom-connector-plugins", handleCustomPlugin},
{"/connect/v1/custom-connector-plugins/{id}", handleCustomPluginWithId},
{"/connect/v1/custom-connector-plugins", handleCustomConnectorPlugins},
{"/connect/v1/custom-connector-plugins/{id}", handleCustomConnectorPluginsId},
{"/connect/v1/presigned-upload-url", handleCustomPluginUploadUrl},
{"/connect/v1/dummy-presigned-url", handleCustomPluginUploadFile},
{"/fcpm/v2/compute-pools", handleFcpmComputePools},
Expand Down
73 changes: 39 additions & 34 deletions test/test-server/connect_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,73 +224,78 @@ func handlePluginValidate(t *testing.T) http.HandlerFunc {
}

// Handler for: "/connect/v1/custom-connector-plugins"
func handleCustomPlugin(t *testing.T) http.HandlerFunc {
func handleCustomConnectorPlugins(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
switch r.Method {
case http.MethodPost:
plugin := connectcustompluginv1.ConnectV1CustomConnectorPlugin{
Id: PtrString("ccp-123456"),
DisplayName: PtrString("my-custom-plugin"),
Cloud: PtrString("aws"),
Id: connectcustompluginv1.PtrString("ccp-123456"),
DisplayName: connectcustompluginv1.PtrString("my-custom-plugin"),
Cloud: connectcustompluginv1.PtrString("aws"),
}
err := json.NewEncoder(w).Encode(plugin)
require.NoError(t, err)
}
if r.Method == http.MethodGet {
case http.MethodGet:
plugin1 := connectcustompluginv1.ConnectV1CustomConnectorPlugin{
Id: PtrString("ccp-123456"),
DisplayName: PtrString("CliPluginTest1"),
Cloud: PtrString("aws"),
Id: connectcustompluginv1.PtrString("ccp-123456"),
DisplayName: connectcustompluginv1.PtrString("CliPluginTest1"),
Cloud: connectcustompluginv1.PtrString("aws"),
}
plugin2 := connectcustompluginv1.ConnectV1CustomConnectorPlugin{
Id: PtrString("ccp-789012"),
DisplayName: PtrString("CliPluginTest2"),
Cloud: PtrString("aws"),
Id: connectcustompluginv1.PtrString("ccp-789012"),
DisplayName: connectcustompluginv1.PtrString("CliPluginTest2"),
Cloud: connectcustompluginv1.PtrString("aws"),
}
plugin3 := connectcustompluginv1.ConnectV1CustomConnectorPlugin{
Id: connectcustompluginv1.PtrString("ccp-789013"),
DisplayName: connectcustompluginv1.PtrString("CliPluginTest3"),
ConnectorType: connectcustompluginv1.PtrString("flink-udf"),
Cloud: connectcustompluginv1.PtrString("aws"),
}
err := json.NewEncoder(w).Encode(connectcustompluginv1.ConnectV1CustomConnectorPluginList{Data: []connectcustompluginv1.ConnectV1CustomConnectorPlugin{plugin1, plugin2}})
err := json.NewEncoder(w).Encode(connectcustompluginv1.ConnectV1CustomConnectorPluginList{Data: []connectcustompluginv1.ConnectV1CustomConnectorPlugin{plugin1, plugin2, plugin3}})
require.NoError(t, err)
}
}
}

// Handler for: "/connect/v1/custom-connector-plugins/{id}"
func handleCustomPluginWithId(t *testing.T) http.HandlerFunc {
func handleCustomConnectorPluginsId(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
switch r.Method {
case http.MethodGet:
vars := mux.Vars(r)
id := vars["id"]
var plugin connectcustompluginv1.ConnectV1CustomConnectorPlugin
if id == "ccp-123456" {
plugin = connectcustompluginv1.ConnectV1CustomConnectorPlugin{
Id: PtrString("ccp-123456"),
DisplayName: PtrString("CliPluginTest"),
ConnectorType: PtrString("source"),
ConnectorClass: PtrString("io.confluent.kafka.connect.test"),
Cloud: PtrString("aws"),
Id: connectcustompluginv1.PtrString("ccp-123456"),
DisplayName: connectcustompluginv1.PtrString("CliPluginTest"),
ConnectorType: connectcustompluginv1.PtrString("source"),
ConnectorClass: connectcustompluginv1.PtrString("io.confluent.kafka.connect.test"),
Cloud: connectcustompluginv1.PtrString("aws"),
}
} else {
sensitiveProperties := []string{"aws.key", "aws.secret"}
plugin = connectcustompluginv1.ConnectV1CustomConnectorPlugin{
Id: PtrString("ccp-123456"),
DisplayName: PtrString("CliPluginTest"),
Description: PtrString("Source datagen plugin"),
ConnectorType: PtrString("source"),
ConnectorClass: PtrString("io.confluent.kafka.connect.test"),
Cloud: PtrString("aws"),
Id: connectcustompluginv1.PtrString("ccp-123456"),
DisplayName: connectcustompluginv1.PtrString("CliPluginTest"),
Description: connectcustompluginv1.PtrString("Source datagen plugin"),
ConnectorType: connectcustompluginv1.PtrString("source"),
ConnectorClass: connectcustompluginv1.PtrString("io.confluent.kafka.connect.test"),
Cloud: connectcustompluginv1.PtrString("aws"),
SensitiveConfigProperties: &sensitiveProperties,
}
}
err := json.NewEncoder(w).Encode(plugin)
require.NoError(t, err)
}
if r.Method == http.MethodPatch {
case http.MethodPatch:
plugin := connectcustompluginv1.ConnectV1CustomConnectorPlugin{
Id: PtrString("ccp-123456"),
DisplayName: PtrString("CliPluginTestUpdate"),
Id: connectcustompluginv1.PtrString("ccp-123456"),
DisplayName: connectcustompluginv1.PtrString("CliPluginTestUpdate"),
}
err := json.NewEncoder(w).Encode(plugin)
require.NoError(t, err)
}
if r.Method == http.MethodDelete {
case http.MethodDelete:
err := json.NewEncoder(w).Encode(connectcustompluginv1.ConnectV1CustomConnectorPlugin{})
require.NoError(t, err)
}
Expand All @@ -317,7 +322,7 @@ func handleCustomPluginUploadUrl(t *testing.T) http.HandlerFunc {
func handleCustomPluginUploadFile(t *testing.T) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
err := json.NewEncoder(w).Encode(PtrString("Success"))
err := json.NewEncoder(w).Encode(connectcustompluginv1.PtrString("Success"))
require.NoError(t, err)
}
}
Expand Down
14 changes: 7 additions & 7 deletions test/test-server/kafka_rest_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func handleKafkaRestTopics(t *testing.T) http.HandlerFunc {
}
}

func PtrString(v string) *string { return &v }
func ptrString(v string) *string { return &v }

// Handler for: "/kafka/v3/clusters/{cluster}/topics/{topic}/configs"
func handleKafkaRestTopicConfigs(t *testing.T) http.HandlerFunc {
Expand All @@ -259,16 +259,16 @@ func handleKafkaRestTopicConfigs(t *testing.T) http.HandlerFunc {
Data: []cpkafkarestv3.TopicConfigData{
{
Name: "cleanup.policy",
Value: PtrString("delete"),
Value: ptrString("delete"),
},
{
Name: "compression.type",
Value: PtrString("producer"),
Value: ptrString("producer"),
IsReadOnly: true,
},
{
Name: "retention.ms",
Value: PtrString("604800000"),
Value: ptrString("604800000"),
},
},
}
Expand All @@ -281,12 +281,12 @@ func handleKafkaRestTopicConfigs(t *testing.T) http.HandlerFunc {
Data: []cpkafkarestv3.TopicConfigData{
{
Name: "compression.type",
Value: PtrString("producer"),
Value: ptrString("producer"),
IsReadOnly: true,
},
{
Name: "retention.ms",
Value: PtrString("1"),
Value: ptrString("1"),
},
},
}
Expand Down Expand Up @@ -669,7 +669,7 @@ func handleKafkaRestLink(t *testing.T) http.HandlerFunc {
err := json.NewEncoder(w).Encode(cpkafkarestv3.ListLinksResponseData{
Kind: "",
Metadata: cpkafkarestv3.ResourceMetadata{},
DestinationClusterId: PtrString("cluster-2"),
DestinationClusterId: ptrString("cluster-2"),
LinkName: link,
ClusterLinkId: "LINKID1",
TopicNames: []string{"link-1-topic-1", "link-1-topic-2"},
Expand Down

0 comments on commit 0f32a57

Please sign in to comment.