Skip to content

Commit

Permalink
Merge pull request #279 from jackcipher/jack/20231006_lags-by-topic
Browse files Browse the repository at this point in the history
feature: show total lags of each consumer
  • Loading branch information
birdayz committed Oct 19, 2023
2 parents 5f9f639 + 589a048 commit a03979f
Showing 1 changed file with 58 additions and 0 deletions.
58 changes: 58 additions & 0 deletions cmd/kaf/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func init() {
topicCmd.AddCommand(addConfigCmd)
topicCmd.AddCommand(topicSetConfig)
topicCmd.AddCommand(updateTopicCmd)
topicCmd.AddCommand(lagCmd)

createTopicCmd.Flags().Int32VarP(&partitionsFlag, "partitions", "p", int32(1), "Number of partitions")
createTopicCmd.Flags().Int16VarP(&replicasFlag, "replicas", "r", int16(1), "Number of replicas")
Expand Down Expand Up @@ -335,3 +336,60 @@ var deleteTopicCmd = &cobra.Command{
}
},
}
var lagCmd = &cobra.Command{
Use: "lag",
Short: "Display the total lags for each consumer group",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
topic := args[0]
admin := getClusterAdmin()
topicDetails, err := admin.DescribeTopics([]string{args[0]})
if err != nil {
errorExit("Unable to describe topics: %v\n", err)
}
partitions := make([]int32, 0, len(topicDetails[0].Partitions))
for _, partition := range topicDetails[0].Partitions {
partitions = append(partitions, partition.ID)
}
highWatermarks := getHighWatermarks(args[0], partitions)

var groups []string
rst, err := admin.ListConsumerGroups()
if err != nil {
errorExit("Unable to list consumer info: %v\n", err)
}
for group, _ := range rst {
groups = append(groups, group)
}
groupsInfo, err := admin.DescribeConsumerGroups(groups)
if err != nil {
errorExit("Unable to list consumer info: %v\n", err)
}
var lagInfo = make(map[string]int64)
for _, v := range groupsInfo {
for _, member := range v.Members {
assignment, _ := member.GetMemberAssignment()
if _, exist := assignment.Topics[topic]; exist {
var sum int64
resp, _ := admin.ListConsumerGroupOffsets(v.GroupId, assignment.Topics)
for _, v1 := range resp.Blocks {
for pid, v2 := range v1 {
sum += highWatermarks[pid] - v2.Offset
}
}
lagInfo[v.GroupId] = sum
}

}
}
w := tabwriter.NewWriter(outWriter, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)
if !noHeaderFlag {
fmt.Fprintf(w, "GROUP ID\tLAG\n")
}
for group, lag := range lagInfo {
fmt.Fprintf(w, "%v\t%v\n", group, lag)
}
w.Flush()

},
}

0 comments on commit a03979f

Please sign in to comment.