Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions internal/cmd/kafka/command_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
ckafka "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/spf13/cobra"

"github.com/confluentinc/cli/internal/pkg/ccloudv2"
pcmd "github.com/confluentinc/cli/internal/pkg/cmd"
v1 "github.com/confluentinc/cli/internal/pkg/config/v1"
dynamicconfig "github.com/confluentinc/cli/internal/pkg/dynamic-config"
"github.com/confluentinc/cli/internal/pkg/ccloudv2"
"github.com/confluentinc/cli/internal/pkg/errors"
"github.com/confluentinc/cli/internal/pkg/kafkarest"
"github.com/confluentinc/cli/internal/pkg/log"
Expand All @@ -24,10 +24,7 @@ const (
unknownTopicOrPartitionErrorCode = 40403
)

const (
defaultReplicationFactor = 3
partitionCount = "num.partitions"
)
const partitionCount = "num.partitions"

type hasAPIKeyTopicCommand struct {
*pcmd.HasAPIKeyCLICommand
Expand Down
31 changes: 18 additions & 13 deletions internal/cmd/kafka/command_topic_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,22 @@ func (c *authenticatedTopicCommand) newCreateCommand() *cobra.Command {
),
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin},
}
cmd.Flags().Int32("partitions", 6, "Number of topic partitions.")

cmd.Flags().Uint32("partitions", 0, "Number of topic partitions.")
cmd.Flags().StringSlice("config", nil, `A comma-separated list of configuration overrides ("key=value") for the topic being created.`)
cmd.Flags().Bool("dry-run", false, "Run the command without committing changes to Kafka.")
cmd.Flags().Bool("if-not-exists", false, "Exit gracefully if topic already exists.")
pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)

return cmd
}

func (c *authenticatedTopicCommand) create(cmd *cobra.Command, args []string) error {
topicName := args[0]

numPartitions, err := cmd.Flags().GetInt32("partitions")
partitions, err := cmd.Flags().GetUint32("partitions")
if err != nil {
return err
}
Expand Down Expand Up @@ -75,8 +77,8 @@ func (c *authenticatedTopicCommand) create(cmd *cobra.Command, args []string) er
if err != nil {
return err
}
err = c.provisioningClusterCheck(kafkaClusterConfig.ID)
if err != nil {

if err := c.provisioningClusterCheck(kafkaClusterConfig.ID); err != nil {
return err
}

Expand All @@ -93,14 +95,15 @@ func (c *authenticatedTopicCommand) create(cmd *cobra.Command, args []string) er
}

data := kafkarestv3.CreateTopicRequestData{
TopicName: topicName,
PartitionsCount: &numPartitions,
ReplicationFactor: utils.Int32Ptr(defaultReplicationFactor),
Configs: &topicConfigs,
TopicName: topicName,
Configs: &topicConfigs,
}

_, httpResp, err := kafkaREST.CloudClient.CreateKafkaTopic(kafkaClusterConfig.ID, data)
if cmd.Flags().Changed("partitions") {
data.PartitionsCount = utils.Int32Ptr(int32(partitions))
}

_, httpResp, err := kafkaREST.CloudClient.CreateKafkaTopic(kafkaClusterConfig.ID, data)
if err != nil && httpResp != nil {
// Kafka REST is available, but there was an error
restErr, parseErr := kafkarest.ParseOpenAPIErrorCloud(err)
Expand Down Expand Up @@ -146,14 +149,16 @@ func (c *authenticatedTopicCommand) create(cmd *cobra.Command, args []string) er

topic := &schedv1.Topic{
Spec: &schedv1.TopicSpecification{
Name: topicName,
NumPartitions: numPartitions,
ReplicationFactor: defaultReplicationFactor,
Configs: configMap,
Name: topicName,
Configs: configMap,
},
Validate: dryRun,
}

if cmd.Flags().Changed("partitions") {
topic.Spec.NumPartitions = int32(partitions)
}

if err := c.Client.Kafka.CreateTopic(context.Background(), cluster, topic); err != nil {
err = errors.CatchTopicExistsError(err, cluster.Id, topic.Spec.Name, ifNotExists)
err = errors.CatchClusterNotReadyError(err, cluster.Id)
Expand Down
53 changes: 34 additions & 19 deletions internal/cmd/kafka/command_topic_create_onprem.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"fmt"

"github.com/antihax/optional"
"github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3"
"github.com/spf13/cobra"

"github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3"

pcmd "github.com/confluentinc/cli/internal/pkg/cmd"
"github.com/confluentinc/cli/internal/pkg/errors"
"github.com/confluentinc/cli/internal/pkg/examples"
Expand All @@ -21,7 +22,7 @@ func (c *authenticatedTopicCommand) newCreateCommandOnPrem() *cobra.Command {
cmd := &cobra.Command{
Use: "create <topic>",
Short: "Create a Kafka topic.",
Args: cobra.ExactArgs(1), // <topic>
Args: cobra.ExactArgs(1),
RunE: c.onPremCreate,
Example: examples.BuildExampleString(
examples.Example{
Expand All @@ -31,37 +32,42 @@ func (c *authenticatedTopicCommand) newCreateCommandOnPrem() *cobra.Command {
examples.Example{
Text: "Create a topic named `my_topic_2` with specified configuration parameters.",
Code: "confluent kafka topic create my_topic_2 --url http://localhost:8082 --config cleanup.policy=compact,compression.type=gzip",
}),
},
),
}
cmd.Flags().AddFlagSet(pcmd.OnPremKafkaRestSet()) //includes url, ca-cert-path, client-cert-path, client-key-path, and no-auth flags
cmd.Flags().Int32("partitions", 6, "Number of topic partitions.")
cmd.Flags().Int32("replication-factor", 3, "Number of replicas.")

cmd.Flags().AddFlagSet(pcmd.OnPremKafkaRestSet())
cmd.Flags().Uint32("partitions", 0, "Number of topic partitions.")
cmd.Flags().Uint32("replication-factor", 0, "Number of replicas.")
cmd.Flags().StringSlice("config", nil, `A comma-separated list of topic configuration ("key=value") overrides for the topic being created.`)
cmd.Flags().Bool("if-not-exists", false, "Exit gracefully if topic already exists.")

return cmd
}

func (c *authenticatedTopicCommand) onPremCreate(cmd *cobra.Command, args []string) error {
// Parse arguments
topicName := args[0]

restClient, restContext, err := initKafkaRest(c.AuthenticatedCLICommand, cmd)
if err != nil {
return err
}

clusterId, err := getClusterIdForRestRequests(restClient, restContext)
if err != nil {
return err
}
// Parse remaining arguments
numPartitions, err := cmd.Flags().GetInt32("partitions")

partitions, err := cmd.Flags().GetUint32("partitions")
if err != nil {
return err
}
replicationFactor, err := cmd.Flags().GetInt32("replication-factor")

replicationFactor, err := cmd.Flags().GetUint32("replication-factor")
if err != nil {
return err
}

ifNotExists, err := cmd.Flags().GetBool("if-not-exists")
if err != nil {
return err
Expand All @@ -75,6 +81,7 @@ func (c *authenticatedTopicCommand) onPremCreate(cmd *cobra.Command, args []stri
if err != nil {
return err
}

topicConfigs := make([]kafkarestv3.CreateTopicRequestDataConfigs, len(configMap))
i := 0
for k, v := range configMap {
Expand All @@ -85,16 +92,24 @@ func (c *authenticatedTopicCommand) onPremCreate(cmd *cobra.Command, args []stri
}
i++
}

data := kafkarestv3.CreateTopicRequestData{
TopicName: topicName,
Configs: topicConfigs,
}

if cmd.Flags().Changed("partitions") {
data.PartitionsCount = int32(partitions)
}

if cmd.Flags().Changed("replication-factor") {
data.ReplicationFactor = int32(replicationFactor)
}

opts := &kafkarestv3.CreateKafkaTopicOpts{CreateTopicRequestData: optional.NewInterface(data)}

// Create new topic
_, resp, err := restClient.TopicV3Api.CreateKafkaTopic(restContext, clusterId, &kafkarestv3.CreateKafkaTopicOpts{
CreateTopicRequestData: optional.NewInterface(kafkarestv3.CreateTopicRequestData{
TopicName: topicName,
PartitionsCount: numPartitions,
ReplicationFactor: replicationFactor,
Configs: topicConfigs,
}),
})
if err != nil {
if _, resp, err := restClient.TopicV3Api.CreateKafkaTopic(restContext, clusterId, opts); err != nil {
// catch topic exists error
if openAPIError, ok := err.(kafkarestv3.GenericOpenAPIError); ok {
var decodedError kafkarest.V3Error
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
Error: invalid argument "-2" for "--partitions" flag: strconv.ParseUint: parsing "-2": invalid syntax
Usage:
confluent kafka topic create <topic> [flags]

Examples:
Create a topic named `my_topic` with default options at specified cluster (providing Kafka REST Proxy endpoint).

$ confluent kafka topic create my_topic --url http://localhost:8082

Create a topic named `my_topic_2` with specified configuration parameters.

$ confluent kafka topic create my_topic_2 --url http://localhost:8082 --config cleanup.policy=compact,compression.type=gzip

Flags:
--url string Base URL of REST Proxy Endpoint of Kafka Cluster (include /kafka for embedded Rest Proxy). Must set flag or CONFLUENT_REST_URL.
--ca-cert-path string Path to a PEM-encoded CA to verify the Confluent REST Proxy.
--client-cert-path string Path to client cert to be verified by Confluent REST Proxy, include for mTLS authentication.
--client-key-path string Path to client private key, include for mTLS authentication.
--no-auth Include if requests should be made without authentication headers, and user will not be prompted for credentials.
--prompt Bypass use of available login credentials and prompt for Kafka Rest credentials.
--partitions uint32 Number of topic partitions.
--replication-factor uint32 Number of replicas.
--config strings A comma-separated list of topic configuration ("key=value") overrides for the topic being created.
--if-not-exists Exit gracefully if topic already exists.

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which may contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
Error: invalid argument "-2" for "--replication-factor" flag: strconv.ParseUint: parsing "-2": invalid syntax
Usage:
confluent kafka topic create <topic> [flags]

Examples:
Create a topic named `my_topic` with default options at specified cluster (providing Kafka REST Proxy endpoint).

$ confluent kafka topic create my_topic --url http://localhost:8082

Create a topic named `my_topic_2` with specified configuration parameters.

$ confluent kafka topic create my_topic_2 --url http://localhost:8082 --config cleanup.policy=compact,compression.type=gzip

Flags:
--url string Base URL of REST Proxy Endpoint of Kafka Cluster (include /kafka for embedded Rest Proxy). Must set flag or CONFLUENT_REST_URL.
--ca-cert-path string Path to a PEM-encoded CA to verify the Confluent REST Proxy.
--client-cert-path string Path to client cert to be verified by Confluent REST Proxy, include for mTLS authentication.
--client-key-path string Path to client private key, include for mTLS authentication.
--no-auth Include if requests should be made without authentication headers, and user will not be prompted for credentials.
--prompt Bypass use of available login credentials and prompt for Kafka Rest credentials.
--partitions uint32 Number of topic partitions.
--replication-factor uint32 Number of replicas.
--config strings A comma-separated list of topic configuration ("key=value") overrides for the topic being created.
--if-not-exists Exit gracefully if topic already exists.

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which may contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

2 changes: 1 addition & 1 deletion test/fixtures/output/kafka/topic/create.golden
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Create a topic named "my_topic" with default options.
$ confluent kafka topic create my_topic

Flags:
--partitions int32 Number of topic partitions. (default 6)
--partitions uint32 Number of topic partitions.
--config strings A comma-separated list of configuration overrides ("key=value") for the topic being created.
--dry-run Run the command without committing changes to Kafka.
--if-not-exists Exit gracefully if topic already exists.
Expand Down
4 changes: 2 additions & 2 deletions test/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,10 @@ func (s *CLITestSuite) TestKafkaTopicCreate() {
{args: fmt.Sprintf("kafka topic create --url %s --no-auth", kafkaRestURL), contains: "Error: accepts 1 arg(s), received 0", wantErrCode: 1, name: "missing topic-name should return error"},
{args: fmt.Sprintf("kafka topic create topic-exist --url %s --no-auth", kafkaRestURL), contains: "Error: topic \"topic-exist\" already exists for the Kafka cluster\n\nSuggestions:\n To list topics for the cluster, use `confluent kafka topic list --url <url>`.", wantErrCode: 1, name: "creating topic with existing topic name should fail"},
// --partitions errors
{args: fmt.Sprintf("kafka topic create topic-X --url %s --partitions -2 --no-auth", kafkaRestURL), contains: "Error: REST request failed: Number of partitions must be larger than 0. (40002)\n", wantErrCode: 1, name: "creating topic with negative partitions name should fail"},
{args: fmt.Sprintf("kafka topic create topic-X --url %s --partitions -2 --no-auth", kafkaRestURL), wantErrCode: 1, name: "creating topic with negative partitions name should fail", fixture: "kafka/topic/create-negative-partitions.golden"},
// --replication-factor errors
{args: fmt.Sprintf("kafka topic create topic-X --url %s --replication-factor 4 --no-auth", kafkaRestURL), contains: "Error: REST request failed: Replication factor: 4 larger than available brokers: 3. (40002)\n", wantErrCode: 1, name: "creating topic with larger replication factor than num. brokers should fail"},
{args: fmt.Sprintf("kafka topic create topic-X --url %s --replication-factor -2 --no-auth", kafkaRestURL), contains: "Error: REST request failed: Replication factor must be larger than 0. (40002)\n", wantErrCode: 1, name: "creating topic with negative replication factor should fail"},
{args: fmt.Sprintf("kafka topic create topic-X --url %s --replication-factor -2 --no-auth", kafkaRestURL), wantErrCode: 1, name: "creating topic with negative replication factor should fail", fixture: "kafka/topic/create-negative-replication-factor.golden"},
// --config errors
{args: fmt.Sprintf("kafka topic create topic-X --url %s --config asdf=1 --no-auth", kafkaRestURL), contains: "Error: REST request failed: Unknown topic config name: asdf (40002)\n", wantErrCode: 1, name: "creating topic with incorrect config name should fail"},
{args: fmt.Sprintf("kafka topic create topic-X --url %s --config retention.ms=as --no-auth", kafkaRestURL), contains: "Error: REST request failed: Invalid value as for configuration retention.ms: Not a number of type LONG (40002)\n", wantErrCode: 1, name: "creating topic with correct key incorrect config value should fail"},
Expand Down
6 changes: 0 additions & 6 deletions test/test-server/kafka_rest_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,6 @@ func (r KafkaRestProxyRouter) HandleKafkaRPTopics(t *testing.T) http.HandlerFunc
} else if requestData.TopicName == "topic-exceed-limit" {
require.NoError(t, writeErrorResponse(w, http.StatusBadRequest, 40002, "Adding the requested number of partitions will exceed 9000 total partitions."))
return
} else if requestData.PartitionsCount < -1 || requestData.PartitionsCount == 0 { // check partition
require.NoError(t, writeErrorResponse(w, http.StatusBadRequest, 40002, "Number of partitions must be larger than 0."))
return
} else if requestData.ReplicationFactor < -1 || requestData.ReplicationFactor == 0 { // check replication factor
require.NoError(t, writeErrorResponse(w, http.StatusBadRequest, 40002, "Replication factor must be larger than 0."))
return
} else if requestData.ReplicationFactor > 3 {
require.NoError(t, writeErrorResponse(w, http.StatusBadRequest, 40002, "Replication factor: 4 larger than available brokers: 3."))
return
Expand Down