-
Notifications
You must be signed in to change notification settings - Fork 21
/
kafka.go
85 lines (69 loc) · 1.78 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package command
import (
"flag"
"fmt"
"log"
"os"
"strings"
"github.com/Shopify/sarama"
"github.com/funkygao/gocli"
"github.com/funkygao/golib/color"
)
type Kafka struct {
Ui cli.Ui
Cmd string
}
func (this *Kafka) Run(args []string) (exitCode int) {
cmdFlags := flag.NewFlagSet("kafka", flag.ContinueOnError)
cmdFlags.Usage = func() { this.Ui.Output(this.Help()) }
if err := cmdFlags.Parse(args); err != nil {
return 2
}
if len(args) == 0 {
this.Ui.Error("missing <host:port>")
return 2
}
sarama.Logger = log.New(os.Stderr, color.Magenta("[sarama]"), log.LstdFlags|log.Lshortfile)
broker := args[len(args)-1]
kfk, err := sarama.NewClient([]string{broker}, saramaConfig())
if err != nil {
this.Ui.Error(err.Error())
return
}
defer kfk.Close()
this.Ui.Info("get topics")
topics, err := kfk.Topics()
swallow(err)
if len(topics) == 0 {
return
}
for _, topic := range topics {
this.Ui.Infof(" get writable partitions of topic[%s]", topic)
alivePartitions, err := kfk.WritablePartitions(topic)
swallow(err)
this.Ui.Infof(" get partitions of topic[%s]", topic)
partions, err := kfk.Partitions(topic)
swallow(err)
if len(alivePartitions) != len(partions) {
this.Ui.Errorf(" topic[%s] has %d readonly partitions", topic, len(partions)-len(alivePartitions))
}
for _, partitionID := range alivePartitions {
this.Ui.Infof(" get replicas of %s#%d", topic, partitionID)
_, err := kfk.Replicas(topic, partitionID)
if err != nil {
this.Ui.Errorf(" <-%s", err.Error())
}
}
}
return
}
func (*Kafka) Synopsis() string {
return "Debug a kafka broker with kafka protocol"
}
func (this *Kafka) Help() string {
help := fmt.Sprintf(`
Usage: %s kafka <host:port>
%s
`, this.Cmd, this.Synopsis())
return strings.TrimSpace(help)
}