Skip to content

Commit

Permalink
Updated Connect create cluster command to include offsets (#2754)
Browse files Browse the repository at this point in the history
  • Loading branch information
nmala01 authored May 9, 2024
1 parent a54dfad commit 5df247d
Show file tree
Hide file tree
Showing 14 changed files with 207 additions and 32 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/confluentinc/ccloud-sdk-go-v2/cdx v0.0.5
github.com/confluentinc/ccloud-sdk-go-v2/cli v0.3.0
github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.10.0
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.6.0
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.2
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.8.0
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.10.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.10.0 h1:YQEcSvhX5ODllg0mhxLivckK
github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.10.0/go.mod h1:357Zo3HvVAe5iQgUFxUbQPAKJasGm8vFMkOB+krVmR8=
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.6.0 h1:KNd/u0jWJAGHZUQFt+7Kepp+uNkxa2kOuMSZsdqD+Co=
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.6.0/go.mod h1:rnFq6HGTig8wXRXWQ23bYwtIvhy9uzRk/rjPswh4Mto=
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0 h1:ISrVOX9qJ2Sxiu/fGBqqHeaA0SRJQujc8yP7qAZRL3Y=
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0/go.mod h1:zHG/3DzsnoHC81B1AY9K/8bMX3mxbIp5/nHHdypa//w=
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.2 h1:NSiuOYjZOKxHTRGYI7X9wFvf57ZNzWpZaChmWv7/UQw=
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.2/go.mod h1:AaF39Acy3LFnHSHExaUtqNmbs7kL5/AL54CXX61+Il8=
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.8.0 h1:H88FsS/dovseu4lhUnQ7hIfhy2Jy7/9VzFbypm0Dtl0=
Expand Down
37 changes: 34 additions & 3 deletions internal/connect/command_cluster_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,36 @@ func (c *clusterCommand) newCreateCommand() *cobra.Command {
RunE: c.create,
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin},
Example: examples.BuildExampleString(
examples.Example{
Text: "Create a configuration file with connector configs and offsets.",
Code: `{
"name": "MyGcsLogsBucketConnector",
"config": {
"connector.class": "GcsSink",
"data.format": "BYTES",
"flush.size": "1000",
"gcs.bucket.name": "APILogsBucket",
"gcs.credentials.config": "****************",
"kafka.api.key": "****************",
"kafka.api.secret": "****************",
"name": "MyGcsLogsBucketConnector",
"tasks.max": "2",
"time.interval": "DAILY",
"topics": "APILogsTopic"
},
"offsets": [
{
"partition": {
"kafka_partition": 0,
"kafka_topic": "topic_A"
},
"offset": {
"kafka_offset": 1000
}
}
]
}`,
},
examples.Example{
Text: "Create a connector in the current or specified Kafka cluster context.",
Code: "confluent connect cluster create --config-file config.json",
Expand Down Expand Up @@ -54,14 +84,15 @@ func (c *clusterCommand) create(cmd *cobra.Command, _ []string) error {
return err
}

userConfigs, err := getConfig(cmd)
userConfigs, offsets, err := getConfigAndOffsets(cmd, false)
if err != nil {
return err
}

connectConfig := connectv1.InlineObject{
Name: connectv1.PtrString((*userConfigs)["name"]),
Config: userConfigs,
Name: connectv1.PtrString((userConfigs)["name"]),
Config: &userConfigs,
Offsets: &offsets,
}

environmentId, err := c.Context.EnvironmentId()
Expand Down
8 changes: 4 additions & 4 deletions internal/connect/command_cluster_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (c *clusterCommand) update(cmd *cobra.Command, args []string) error {
return err
}

var userConfigs *map[string]string
var userConfigs map[string]string
if cmd.Flags().Changed("config") {
configs, err := cmd.Flags().GetStringSlice("config")
if err != nil {
Expand All @@ -64,9 +64,9 @@ func (c *clusterCommand) update(cmd *cobra.Command, args []string) error {
for name, value := range configMap {
currentConfigs[name] = value
}
userConfigs = &currentConfigs
userConfigs = currentConfigs
} else if cmd.Flags().Changed("config-file") {
userConfigs, err = getConfig(cmd)
userConfigs, err = getConfig(cmd, true)
if err != nil {
return err
}
Expand All @@ -79,7 +79,7 @@ func (c *clusterCommand) update(cmd *cobra.Command, args []string) error {
return err
}

if _, err := c.V2Client.CreateOrUpdateConnectorConfig(connector.Info.GetName(), environmentId, kafkaCluster.ID, *userConfigs); err != nil {
if _, err := c.V2Client.CreateOrUpdateConnectorConfig(connector.Info.GetName(), environmentId, kafkaCluster.ID, userConfigs); err != nil {
return err
}

Expand Down
55 changes: 36 additions & 19 deletions internal/connect/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
"github.com/confluentinc/cli/v3/pkg/errors"
)

func getConfig(cmd *cobra.Command) (*map[string]string, error) {
func getConfigAndOffsets(cmd *cobra.Command, isUpdate bool) (map[string]string, []map[string]any, error) {
configFile, err := cmd.Flags().GetString("config-file")
if err != nil {
return nil, err
return nil, nil, err
}

options, err := parseConfigFile(configFile)
options, offsets, err := parseConfigFile(configFile, isUpdate)
if err != nil {
return nil, fmt.Errorf(errors.UnableToReadConfigurationFileErrorMsg, configFile, err)
return nil, nil, fmt.Errorf(errors.UnableToReadConfigurationFileErrorMsg, configFile, err)
}

connectorType := options["confluent.connector.type"]
Expand All @@ -30,51 +30,68 @@ func getConfig(cmd *cobra.Command) (*map[string]string, error) {
_, classExists := options["connector.class"]

if connectorType != "CUSTOM" && (!nameExists || !classExists) {
return nil, fmt.Errorf(`required configs "name" and "connector.class" missing from connector config file "%s"`, configFile)
return nil, nil, fmt.Errorf(`required configs "name" and "connector.class" missing from connector config file "%s"`, configFile)
}

return &options, nil
return options, offsets, nil
}

func parseConfigFile(filename string) (map[string]string, error) {
func getConfig(cmd *cobra.Command, isUpdate bool) (map[string]string, error) {
options, _, err := getConfigAndOffsets(cmd, isUpdate)
return options, err
}

func parseConfigFile(filename string, isUpdate bool) (map[string]string, []map[string]any, error) {
jsonFile, err := os.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf(errors.UnableToReadConfigurationFileErrorMsg, filename, err)
return nil, nil, fmt.Errorf(errors.UnableToReadConfigurationFileErrorMsg, filename, err)
}
if len(jsonFile) == 0 {
return nil, fmt.Errorf(`connector config file "%s" is empty`, filename)
return nil, nil, fmt.Errorf(`connector config file "%s" is empty`, filename)
}

kvPairs := make(map[string]string)
var options map[string]any
var offsets []map[string]any

if err := json.Unmarshal(jsonFile, &options); err != nil {
return nil, fmt.Errorf(errors.UnableToReadConfigurationFileErrorMsg, filename, err)
return nil, nil, fmt.Errorf(errors.UnableToReadConfigurationFileErrorMsg, filename, err)
}

for key, val := range options {
if val2, ok := val.(string); ok {
kvPairs[key] = val2
} else {
// We support object-as-a-value only for "config" key.
if key != "config" {
return nil, fmt.Errorf(`only string values are permitted for the configuration "%s"`, key)
}

} else if key == "config" {
configMap, ok := val.(map[string]any)
if !ok {
return nil, fmt.Errorf(`value for the configuration "config" is malformed`)
return nil, nil, fmt.Errorf(`value for the configuration "config" is malformed`)
}

for configKey, configVal := range configMap {
value, isString := configVal.(string)
if !isString {
return nil, fmt.Errorf(`only string values are permitted for the configuration "%s"`, configKey)
return nil, nil, fmt.Errorf(`only string values are permitted for the configuration "%s"`, configKey)
}
kvPairs[configKey] = value
}
} else if key == "offsets" {
if isUpdate {
return nil, nil, fmt.Errorf("offsets are not allowed in configuration file for `confluent connect cluster update`")
}
var request *[]map[string]any
valBytes, err := json.Marshal(val)
if err != nil {
return nil, nil, fmt.Errorf(`error while marshalling offsets, value for the configuration "offsets" is malformed`)
}
if err := json.Unmarshal(valBytes, &request); err != nil {
return nil, nil, fmt.Errorf(`error while unmarshalling offsets, value for the configuration "offsets" is malformed`)
}

offsets = *request
} else {
return nil, nil, fmt.Errorf(`only string values are permitted for the configuration "%s"`, key)
}
}

return kvPairs, err
return kvPairs, offsets, err
}
2 changes: 1 addition & 1 deletion pkg/ccloudv2/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (c *Client) connectApiContext() context.Context {
return context.WithValue(context.Background(), connectv1.ContextAccessToken, c.cfg.Context().GetAuthToken())
}

func (c *Client) CreateConnector(environmentId, kafkaClusterId string, connect connectv1.InlineObject) (connectv1.ConnectV1Connector, error) {
func (c *Client) CreateConnector(environmentId, kafkaClusterId string, connect connectv1.InlineObject) (connectv1.ConnectV1ConnectorWithOffsets, error) {
resp, httpResp, err := c.ConnectClient.ConnectorsConnectV1Api.CreateConnectv1Connector(c.connectApiContext(), environmentId, kafkaClusterId).InlineObject(connect).Execute()
return resp, errors.CatchCCloudV2Error(err, httpResp)
}
Expand Down
5 changes: 3 additions & 2 deletions test/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ func (s *CLITestSuite) TestConnect() {
{args: "connect cluster list --cluster lkc-123 -o json", fixture: "connect/cluster/list-json.golden"},
{args: "connect cluster list --cluster lkc-123 -o yaml", fixture: "connect/cluster/list-yaml.golden"},
{args: "connect cluster list --cluster lkc-123", fixture: "connect/cluster/list.golden"},
{args: "connect cluster update lcc-123 --cluster lkc-123 --config-file test/fixtures/input/connect/config.yaml", fixture: "connect/cluster/update.golden"},
{args: "connect cluster update lcc-123 --cluster lkc-123 --config-file test/fixtures/input/connect/update-config.yaml", fixture: "connect/cluster/update.golden"},
{args: "connect event describe", fixture: "connect/event-describe.golden"},

// Tests based on new config
{args: "connect cluster create --cluster lkc-123 --config-file test/fixtures/input/connect/config-new-format.json -o json", fixture: "connect/cluster/create-new-config-json.golden"},
{args: "connect cluster create --cluster lkc-123 --config-file test/fixtures/input/connect/config-new-format.json -o yaml", fixture: "connect/cluster/create-yaml.golden"},
{args: "connect cluster create --cluster lkc-123 --config-file test/fixtures/input/connect/config-malformed-new.json", fixture: "connect/cluster/create-malformed-new.golden", exitCode: 1},
{args: "connect cluster create --cluster lkc-123 --config-file test/fixtures/input/connect/config-malformed-old.json", fixture: "connect/cluster/create-malformed-old.golden", exitCode: 1},
{args: "connect cluster update lcc-123 --cluster lkc-123 --config-file test/fixtures/input/connect/config-new-format.json", fixture: "connect/cluster/update.golden"},
{args: "connect cluster update lcc-123 --cluster lkc-123 --config-file test/fixtures/input/connect/update-config-new-format.json", fixture: "connect/cluster/update.golden"},
{args: "connect cluster update lcc-123 --cluster lkc-123 --config-file test/fixtures/input/connect/update-config-malformed.json", fixture: "connect/cluster/update-error.golden", exitCode: 1},
}

for _, test := range tests {
Expand Down
18 changes: 17 additions & 1 deletion test/fixtures/input/connect/config-new-format.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,21 @@
"time.interval": "HOURLY",
"topics": "apples",
"connector.class": "AzureBlobSink"
}
},
"offsets": [
{
"partition": {
"server": "dbzv2"
},
"offset": {
"event": 2,
"file": "mysql-bin.000600",
"pos": 2001,
"row": 1,
"server_id": 1,
"transaction_id": null,
"ts_sec": 1711788870
}
}
]
}
18 changes: 17 additions & 1 deletion test/fixtures/input/connect/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,21 @@
"tasks.max": "1",
"time.interval": "HOURLY",
"topics": "apples",
"connector.class": "AzureBlobSink"
"connector.class": "AzureBlobSink",
"offsets": [
{
"partition": {
"server": "dbzv2"
},
"offset": {
"event": 2,
"file": "mysql-bin.000600",
"pos": 2001,
"row": 1,
"server_id": 1,
"transaction_id": null,
"ts_sec": 1711788870
}
}
]
}
32 changes: 32 additions & 0 deletions test/fixtures/input/connect/update-config-malformed.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"name": "az-connector",
"config": {
"name": "az-connector",
"azblob.account.name": "azsink",
"azblob.account.key": "key",
"azblob.container.name": "azsink",
"data.format": "JSON",
"kafka.api.key": "key",
"kafka.api.secret": "key",
"tasks.max": "1",
"time.interval": "HOURLY",
"topics": "apples",
"connector.class": "AzureBlobSink"
},
"offsets": [
{
"partition": {
"server": "dbzv2"
},
"offset": {
"event": 2,
"file": "mysql-bin.000600",
"pos": 2001,
"row": 1,
"server_id": 1,
"transaction_id": null,
"ts_sec": 1711788870
}
}
]
}
16 changes: 16 additions & 0 deletions test/fixtures/input/connect/update-config-new-format.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "az-connector",
"config": {
"name": "az-connector",
"azblob.account.name": "azsink",
"azblob.account.key": "key",
"azblob.container.name": "azsink",
"data.format": "JSON",
"kafka.api.key": "key",
"kafka.api.secret": "key",
"tasks.max": "1",
"time.interval": "HOURLY",
"topics": "apples",
"connector.class": "AzureBlobSink"
}
}
13 changes: 13 additions & 0 deletions test/fixtures/input/connect/update-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"name": "az-connector",
"azblob.account.name": "azsink",
"azblob.account.key": "key",
"azblob.container.name": "azsink",
"data.format": "JSON",
"kafka.api.key": "key",
"kafka.api.secret": "key",
"tasks.max": "1",
"time.interval": "HOURLY",
"topics": "apples",
"connector.class": "AzureBlobSink"
}
30 changes: 30 additions & 0 deletions test/fixtures/output/connect/cluster/create-help.golden
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,36 @@ Usage:
confluent connect cluster create [flags]

Examples:
Create a configuration file with connector configs and offsets.

{
"name": "MyGcsLogsBucketConnector",
"config": {
"connector.class": "GcsSink",
"data.format": "BYTES",
"flush.size": "1000",
"gcs.bucket.name": "APILogsBucket",
"gcs.credentials.config": "****************",
"kafka.api.key": "****************",
"kafka.api.secret": "****************",
"name": "MyGcsLogsBucketConnector",
"tasks.max": "2",
"time.interval": "DAILY",
"topics": "APILogsTopic"
},
"offsets": [
{
"partition": {
"kafka_partition": 0,
"kafka_topic": "topic_A"
},
"offset": {
"kafka_offset": 1000
}
}
]
}

Create a connector in the current or specified Kafka cluster context.

$ confluent connect cluster create --config-file config.json
Expand Down
1 change: 1 addition & 0 deletions test/fixtures/output/connect/cluster/update-error.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Error: unable to read configuration file "test/fixtures/input/connect/update-config-malformed.json": offsets are not allowed in configuration file for `confluent connect cluster update`

0 comments on commit 5df247d

Please sign in to comment.