Skip to content

Commit

Permalink
feat: Show zero lag consumer groups and include group state in lagCmd…
Browse files Browse the repository at this point in the history
… output
  • Loading branch information
jack authored and jack committed Jul 9, 2024
1 parent aaae395 commit 0c17de1
Showing 1 changed file with 40 additions and 16 deletions.
56 changes: 40 additions & 16 deletions cmd/kaf/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ var deleteTopicCmd = &cobra.Command{
}
},
}

var lagCmd = &cobra.Command{
Use: "lag",
Short: "Display the total lags for each consumer group",
Expand All @@ -344,49 +345,68 @@ var lagCmd = &cobra.Command{
admin := getClusterAdmin()
defer admin.Close()

topicDetails, err := admin.DescribeTopics([]string{args[0]})
if err != nil {
// Describe the topic
topicDetails, err := admin.DescribeTopics([]string{topic})
if err != nil || len(topicDetails) == 0 {
errorExit("Unable to describe topics: %v\n", err)
}

// Get the list of partitions for the topic
partitions := make([]int32, 0, len(topicDetails[0].Partitions))
for _, partition := range topicDetails[0].Partitions {
partitions = append(partitions, partition.ID)
}
highWatermarks := getHighWatermarks(args[0], partitions)
highWatermarks := getHighWatermarks(topic, partitions)

var groups []string
// List all consumer groups
consumerGroups, err := admin.ListConsumerGroups()
if err != nil {
errorExit("Unable to list consumer groups: %v\n", err)
}

var groups []string
for group := range consumerGroups {
groups = append(groups, group)
}

// Describe all consumer groups
groupsInfo, err := admin.DescribeConsumerGroups(groups)
if err != nil {
errorExit("Unable to describe consumer groups: %v\n", err)
}
var lagInfo = make(map[string]int64)
for _, v := range groupsInfo {

// Calculate lag for each group
lagInfo := make(map[string]int64)
groupStates := make(map[string]string) // To store the state of each group
for _, group := range groupsInfo {
var sum int64
for _, member := range v.Members {
show := false
for _, member := range group.Members {
assignment, err := member.GetMemberAssignment()
if err != nil || assignment == nil {
continue
}

metadata, err := member.GetMemberMetadata()
if err != nil || metadata == nil {
continue
}

if topicPartitions, exist := assignment.Topics[topic]; exist {
resp, err := admin.ListConsumerGroupOffsets(v.GroupId, map[string][]int32{topic: topicPartitions})
show = true
resp, err := admin.ListConsumerGroupOffsets(group.GroupId, map[string][]int32{topic: topicPartitions})
if err != nil {
fmt.Fprintf(os.Stderr, "Error fetching offsets for group %s: %v\n", v.GroupId, err)
fmt.Fprintf(os.Stderr, "Error fetching offsets for group %s: %v\n", group.GroupId, err)
continue
}

if blocks, ok := resp.Blocks[topic]; ok {
for pid, block := range blocks {
if hwm, ok := highWatermarks[pid]; ok {
if block.Offset > hwm {
fmt.Fprintf(os.Stderr, "Warning: Consumer offset (%d) is greater than high watermark (%d) for partition %d in group %s\n", block.Offset, hwm, pid, v.GroupId)
// You might choose to set the lag to 0 in this case
// sum += 0
fmt.Fprintf(os.Stderr, "Warning: Consumer offset (%d) is greater than high watermark (%d) for partition %d in group %s\n", block.Offset, hwm, pid, group.GroupId)
} else if block.Offset < 0 {
// Skip partitions with negative offsets
} else {
sum += hwm - block.Offset
}
Expand All @@ -395,16 +415,20 @@ var lagCmd = &cobra.Command{
}
}
}
if sum > 0 {
lagInfo[v.GroupId] = sum

if show && sum >= 0 {
lagInfo[group.GroupId] = sum
groupStates[group.GroupId] = group.State // Store the state of the group
}
}

// Print the lag information along with group state
w := tabwriter.NewWriter(outWriter, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)
if !noHeaderFlag {
fmt.Fprintf(w, "GROUP ID\tLAG\n")
fmt.Fprintf(w, "GROUP ID\tSTATE\tLAG\n")
}
for group, lag := range lagInfo {
fmt.Fprintf(w, "%v\t%v\n", group, lag)
fmt.Fprintf(w, "%v\t%v\t%v\n", group, groupStates[group], lag)
}
w.Flush()
},
Expand Down

0 comments on commit 0c17de1

Please sign in to comment.