Skip to content

Commit

Permalink
CLI-965 Remove broker information in ccloud kafka topic describe (#960
Browse files Browse the repository at this point in the history
)

* Update command_topic.go

* Update command_topic.go

* Update command_topic.go

* Update command_topic.go

* remove unused getPartitionDisplay()

* update topic describe int tests

* remove unused partitionData

* Update command_topic.go

* Update command_topic.go

* remove header in topic describe output
  • Loading branch information
MuweiHe committed Aug 11, 2021
1 parent 7818abc commit cb0c272
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 157 deletions.
133 changes: 10 additions & 123 deletions internal/cmd/kafka/command_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,36 +62,14 @@ type authenticatedTopicCommand struct {
completableChildren []*cobra.Command
}

type partitionDescribeDisplay struct {
Topic string `json:"topic" yaml:"topic"`
Partition uint32 `json:"partition" yaml:"partition"`
Leader uint32 `json:"leader" yaml:"leader"`
Replicas []uint32 `json:"replicas" yaml:"replicas"`
ISR []uint32 `json:"isr" yaml:"isr"`
}

type structuredDescribeDisplay struct {
TopicName string `json:"topic_name" yaml:"topic_name"`
PartitionCount int `json:"partition_count" yaml:"partition_count"`
ReplicationFactor int `json:"replication_factor" yaml:"replication_factor"`
Partitions []partitionDescribeDisplay `json:"partitions" yaml:"partitions"`
Config map[string]string `json:"config" yaml:"config"`
}

type partitionData struct {
TopicName string `json:"topic" yaml:"topic"`
PartitionId int32 `json:"partition" yaml:"partition"`
LeaderBrokerId int32 `json:"leader" yaml:"leader"`
ReplicaBrokerIds []int32 `json:"replicas" yaml:"replicas"`
InSyncReplicaBrokerIds []int32 `json:"isr" yaml:"isr"`
TopicName string `json:"topic_name" yaml:"topic_name"`
Config map[string]string `json:"config" yaml:"config"`
}

type topicData struct {
TopicName string `json:"topic_name" yaml:"topic_name"`
PartitionCount int `json:"partition_count" yaml:"partition_count"`
ReplicationFactor int `json:"replication_factor" yaml:"replication_factor"`
Partitions []partitionData `json:"partitions" yaml:"partitions"`
Configs map[string]string `json:"config" yaml:"config"`
TopicName string `json:"topic_name" yaml:"topic_name"`
Config map[string]string `json:"config" yaml:"config"`
}

// NewTopicCommand returns the Cobra command for Kafka topic.
Expand Down Expand Up @@ -459,7 +437,7 @@ func (a *authenticatedTopicCommand) describe(cmd *cobra.Command, args []string)
}
lkc := kafkaClusterConfig.ID

partitionsResp, httpResp, err := kafkaREST.Client.PartitionApi.ClustersClusterIdTopicsTopicNamePartitionsGet(kafkaREST.Context, lkc, topicName)
_, httpResp, err := kafkaREST.Client.PartitionApi.ClustersClusterIdTopicsTopicNamePartitionsGet(kafkaREST.Context, lkc, topicName)

if err != nil && httpResp != nil {
// Kafka REST is available, but there was an error
Expand All @@ -483,57 +461,16 @@ func (a *authenticatedTopicCommand) describe(cmd *cobra.Command, args []string)

topicData := &topicData{}
topicData.TopicName = topicName
topicData.PartitionCount = len(partitionsResp.Data)
topicData.Partitions = make([]partitionData, len(partitionsResp.Data))
replicaStatusDataList, httpResp, err := kafkaREST.Client.ReplicaStatusApi.ClustersClusterIdTopicsTopicNamePartitionsReplicaStatusGet(kafkaREST.Context, lkc, topicName)
if err != nil {
return kafkaRestError(kafkaREST.Client.GetConfig().BasePath, err, httpResp)
} else if replicaStatusDataList.Data == nil {
return errors.NewErrorWithSuggestions(errors.EmptyResponseMsg, errors.InternalServerErrorSuggestions)
}
partitionIdToData := make(map[int32]partitionData)
for _, replica := range replicaStatusDataList.Data {
if _, ok := partitionIdToData[replica.PartitionId]; !ok {
partitionIdToData[replica.PartitionId] = partitionData{
TopicName: replica.TopicName,
PartitionId: replica.PartitionId,
ReplicaBrokerIds: []int32{replica.BrokerId},
InSyncReplicaBrokerIds: []int32{},
}
} else {
tmp := partitionIdToData[replica.PartitionId]
tmp.ReplicaBrokerIds = append(partitionIdToData[replica.PartitionId].ReplicaBrokerIds, replica.BrokerId)
partitionIdToData[replica.PartitionId] = tmp
}
if replica.IsLeader {
tmp := partitionIdToData[replica.PartitionId]
tmp.LeaderBrokerId = replica.BrokerId
partitionIdToData[replica.PartitionId] = tmp
}
if replica.IsInIsr {
tmp := partitionIdToData[replica.PartitionId]
tmp.InSyncReplicaBrokerIds = append(partitionIdToData[replica.PartitionId].InSyncReplicaBrokerIds, replica.BrokerId)
partitionIdToData[replica.PartitionId] = tmp
}
}

for i, data := range partitionsResp.Data {
if i == 0 {
topicData.ReplicationFactor = len(partitionIdToData[data.PartitionId].ReplicaBrokerIds)
}
topicData.Partitions[i] = partitionIdToData[data.PartitionId]
}

// Get topic config
configsResp, httpResp, err := kafkaREST.Client.ConfigsApi.ClustersClusterIdTopicsTopicNameConfigsGet(kafkaREST.Context, lkc, topicName)
if err != nil {
return kafkaRestError(kafkaREST.Client.GetConfig().BasePath, err, httpResp)
} else if configsResp.Data == nil {
return errors.NewErrorWithSuggestions(errors.EmptyResponseMsg, errors.InternalServerErrorSuggestions)
}
topicData.Configs = make(map[string]string)
topicData.Config = make(map[string]string)
for _, config := range configsResp.Data {
topicData.Configs[config.Name] = *config.Value
topicData.Config[config.Name] = *config.Value
}

if outputOption == output.Human.String() {
Expand Down Expand Up @@ -1046,20 +983,10 @@ func validateTopic(topic string, cluster *v1.KafkaClusterConfig, clientID string
}

func printHumanDescribe(cmd *cobra.Command, topicData *topicData) error {
utils.Printf(cmd, "Topic: %s PartitionCount: %d ReplicationFactor: %d\n",
topicData.TopicName, topicData.PartitionCount, topicData.ReplicationFactor)
partitionsTableLabels := []string{"Topic", "Partition", "Leader", "Replicas", "ISR"}
partitionsTableEntries := make([][]string, topicData.PartitionCount)
for i, partition := range topicData.Partitions {
partitionsTableEntries[i] = printer.ToRow(&partition, []string{"TopicName", "PartitionId", "LeaderBrokerId", "ReplicaBrokerIds", "InSyncReplicaBrokerIds"})
}
printer.RenderCollectionTable(partitionsTableEntries, partitionsTableLabels)

utils.Print(cmd, "\nConfiguration\n\n")
configsTableLabels := []string{"Name", "Value"}
configsTableEntries := make([][]string, len(topicData.Configs))
configsTableEntries := make([][]string, len(topicData.Config))
i := 0
for name, value := range topicData.Configs {
for name, value := range topicData.Config {
configsTableEntries[i] = printer.ToRow(&struct {
name string
value string
Expand All @@ -1074,19 +1001,8 @@ func printHumanDescribe(cmd *cobra.Command, topicData *topicData) error {
}

func printHumanTopicDescription(cmd *cobra.Command, resp *schedv1.TopicDescription) error {
utils.Printf(cmd, "Topic: %s PartitionCount: %d ReplicationFactor: %d\n",
resp.Name, len(resp.Partitions), len(resp.Partitions[0].Replicas))

var partitions [][]string
titleRow := []string{"Topic", "Partition", "Leader", "Replicas", "ISR"}
for _, partition := range resp.Partitions {
partitions = append(partitions, printer.ToRow(getPartitionDisplay(partition, resp.Name), titleRow))
}

printer.RenderCollectionTable(partitions, titleRow)

var entries [][]string
titleRow = []string{"Name", "Value"}
titleRow := []string{"Name", "Value"}
for _, entry := range resp.Config {
record := &struct {
Name string
Expand All @@ -1100,49 +1016,20 @@ func printHumanTopicDescription(cmd *cobra.Command, resp *schedv1.TopicDescripti
sort.Slice(entries, func(i, j int) bool {
return entries[i][0] < entries[j][0]
})
utils.Print(cmd, "\nConfiguration\n\n")
printer.RenderCollectionTable(entries, titleRow)
return nil
}

func printStructuredTopicDescription(resp *schedv1.TopicDescription, format string) error {
structuredDisplay := &structuredDescribeDisplay{Config: make(map[string]string)}
structuredDisplay.TopicName = resp.Name
structuredDisplay.PartitionCount = len(resp.Partitions)
structuredDisplay.ReplicationFactor = len(resp.Partitions[0].Replicas)

var partitionList []partitionDescribeDisplay
for _, partition := range resp.Partitions {
partitionList = append(partitionList, *getPartitionDisplay(partition, resp.Name))
}
structuredDisplay.Partitions = partitionList

for _, entry := range resp.Config {
structuredDisplay.Config[entry.Name] = entry.Value
}
return output.StructuredOutput(format, structuredDisplay)
}

func getPartitionDisplay(partition *schedv1.TopicPartitionInfo, topicName string) *partitionDescribeDisplay {
var replicas []uint32
for _, replica := range partition.Replicas {
replicas = append(replicas, replica.Id)
}

var isr []uint32
for _, replica := range partition.Isr {
isr = append(isr, replica.Id)
}

return &partitionDescribeDisplay{
Topic: topicName,
Partition: partition.Partition,
Leader: partition.Leader.Id,
Replicas: replicas,
ISR: isr,
}
}

func (a *authenticatedTopicCommand) getTopics(cmd *cobra.Command) ([]*schedv1.TopicDescription, error) {
cluster, err := pcmd.KafkaCluster(cmd, a.Context)
if err != nil {
Expand Down
25 changes: 0 additions & 25 deletions test/fixtures/output/kafka/topic-describe-json-success.golden
Original file line number Diff line number Diff line change
@@ -1,30 +1,5 @@
{
"topic_name": "topic-exist",
"partition_count": 3,
"replication_factor": 3,
"partitions": [
{
"topic": "topic-exist",
"partition": 0,
"leader": 1001,
"replicas": [1001, 1002, 1003],
"isr": [1001, 1002, 1003]
},
{
"topic": "topic-exist",
"partition": 1,
"leader": 1002,
"replicas": [1001, 1002, 1003],
"isr": [1002, 1003]
},
{
"topic": "topic-exist",
"partition": 2,
"leader": 1003,
"replicas": [1001, 1002, 1003],
"isr": [1003]
}
],
"config": {
"cleanup.policy": "delete",
"compression.type": "producer",
Expand Down
9 changes: 0 additions & 9 deletions test/fixtures/output/kafka/topic-describe-success.golden
Original file line number Diff line number Diff line change
@@ -1,12 +1,3 @@
Topic: topic-exist PartitionCount: 3 ReplicationFactor: 3
Topic | Partition | Leader | Replicas | ISR
+-------------+-----------+--------+------------------+------------------+
topic-exist | 0 | 1001 | [1001 1002 1003] | [1001 1002 1003]
topic-exist | 1 | 1002 | [1001 1002 1003] | [1002 1003]
topic-exist | 2 | 1003 | [1001 1002 1003] | [1003]

Configuration

Name | Value
+------------------+-----------+
cleanup.policy | delete
Expand Down

0 comments on commit cb0c272

Please sign in to comment.