/
topic.go
65 lines (52 loc) · 1.45 KB
/
topic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package kafka
import (
"fmt"
"os"
"github.com/electric-saw/kafta/pkg/cmd/util"
"github.com/Shopify/sarama"
)
func ListAllTopics(conn *KafkaConnection) map[string]sarama.TopicDetail {
topics, err := conn.Admin.ListTopics()
util.CheckErr(err)
return topics
}
func DescribeTopics(conn *KafkaConnection, topics []string) []*sarama.TopicMetadata {
response, err := conn.Admin.DescribeTopics(topics)
util.CheckErr(err)
return response
}
func CreateTopic(conn *KafkaConnection, topic string, numPartitions int32, replicationFactor int16, configs map[string]*string) error {
if topic == "" {
fmt.Println("Topic name is required")
os.Exit(0)
}
if err := conn.Admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: numPartitions,
ReplicationFactor: replicationFactor,
ConfigEntries: configs,
}, false); err == nil {
fmt.Println("Topic created")
return err
} else {
return err
}
}
func DeleteTopic(conn *KafkaConnection, topic string) error {
if err := conn.Admin.DeleteTopic(topic); err == nil {
fmt.Println("Topic deleted")
return err
} else {
return err
}
}
func GetTopicOffsets(conn *KafkaConnection, topic string) map[int32]int64 {
result := make(map[int32]int64)
partitions, err := conn.Client.Partitions(topic)
util.CheckErr(err)
for _, partition := range partitions {
offset, err := conn.Client.GetOffset(topic, partition, sarama.OffsetNewest)
util.CheckErr(err)
result[partition] = offset
}
return result
}