-
Notifications
You must be signed in to change notification settings - Fork 31
/
main.go
116 lines (104 loc) · 3.11 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package main
import (
"flag"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/Shopify/sarama/tools/tls"
)
var (
brokerList = flag.String("brokers", defaultPeers(), "The comma separated list of brokers in the Kafka cluster")
topic = flag.String("topic", "input-topic", "The topic to")
verbose = flag.Bool("verbose", false, "Whether to turn on sarama logging")
tlsEnabled = flag.Bool("tls-enabled", false, "Whether to enable TLS")
tlsSkipVerify = flag.Bool("tls-skip-verify", false, "Whether skip TLS server cert verification")
tlsClientCert = flag.String("tls-client-cert", "", "Client cert for client authentication (use with -tls-enabled and -tls-client-key)")
tlsClientKey = flag.String("tls-client-key", "", "Client key for client authentication (use with tls-enabled and -tls-client-cert)")
logger = log.New(os.Stdout, "", log.LstdFlags)
)
func defaultPeers() string {
if env, ok := os.LookupEnv("KAFKA_PEERS"); ok {
return env
}
return "kafka-0.broker:9092"
}
func main() {
flag.Parse()
if *brokerList == "" {
panic(fmt.Errorf("you have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable"))
}
if *topic == "" {
panic(fmt.Errorf("-topic is required"))
}
if *verbose {
sarama.Logger = logger
}
config := sarama.NewConfig()
if *tlsEnabled {
tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
if err != nil {
panic(err)
}
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Config.InsecureSkipVerify = *tlsSkipVerify
}
addrs := strings.Split(*brokerList, ",")
admin, err := sarama.NewClusterAdmin(addrs, config)
if err != nil {
panic(err)
}
defer admin.Close()
producer, err := sarama.NewAsyncProducer(addrs, config)
if err != nil {
panic(err)
}
defer producer.Close()
err = func() error {
cmd := flag.Args()[0]
switch cmd {
case "create-topic":
return createTopicCmd(admin)
case "pump-topic":
return pumpTopicCmd(producer)
default:
return fmt.Errorf("unknown comand %q", cmd)
}
}()
if err != nil {
panic(err)
}
}
var sleep = flag.String("sleep", "1s", "how long to sleep")
func pumpTopicCmd(producer sarama.AsyncProducer) error {
duration, err := time.ParseDuration(*sleep)
if err != nil {
return err
}
_, _ = fmt.Fprintf(os.Stdout, "topic %q sleep %v \n", *topic, duration)
start := time.Now()
for i := 0; ; i++ {
x := fmt.Sprintf("%s-%d", FunnyAnimal(), i)
producer.Input() <- &sarama.ProducerMessage{
Topic: *topic,
Value: sarama.StringEncoder(x),
}
_, _ = fmt.Fprintf(os.Stdout, "sent %q (%.0f/s)\n", x, (1+float64(i))/time.Since(start).Seconds())
time.Sleep(duration)
}
}
func createTopicCmd(admin sarama.ClusterAdmin) error {
if err := admin.CreateTopic(*topic, &sarama.TopicDetail{NumPartitions: 2, ReplicationFactor: 1}, false); err != nil {
if terr, ok := err.(*sarama.TopicError); ok && terr.Err == sarama.ErrTopicAlreadyExists {
_, _ = fmt.Fprintf(os.Stdout, "topic %q already exists\n", *topic)
return nil
} else {
return err
}
}
_, _ = fmt.Fprintf(os.Stdout, "topic %q created\n", *topic)
return nil
}