/
mq.go
105 lines (88 loc) · 2.59 KB
/
mq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// Create and maintain by Chaiyapong Lapliengtrakul (chaiyapong@3dsinteractive.com), All right reserved (2021 - Present)
package main
import (
"context"
"fmt"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
// IMQ is interface to manage Kafka
type IMQ interface {
CreateTopic(topic string, partitions int, replications int) error
CreateTopicR(topic string, partitions int, replications int, retentionPeriod time.Duration) error
}
// MQ is message queue
type MQ struct {
ms *Microservice
servers string
}
// NewMQ return new MQ
func NewMQ(servers string, ms *Microservice) *MQ {
return &MQ{
ms: ms,
servers: servers,
}
}
func (q *MQ) getAdminClient() (*kafka.AdminClient, error) {
admin, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": q.servers})
if err != nil {
return nil, err
}
return admin, nil
}
// CreateTopicR create topic with retention period
func (q *MQ) CreateTopicR(topic string, partitions int, replications int, retentionPeriod time.Duration) error {
return q.createTopic(topic, partitions, replications, retentionPeriod)
}
// CreateTopic create the topic
func (q *MQ) CreateTopic(topic string, partitions int, replications int) error {
return q.createTopic(topic, partitions, replications, 0)
}
func (q *MQ) createTopic(topic string, partitions int, replications int, retentionPeriod time.Duration) error {
if retentionPeriod <= 0 {
retentionPeriod = 7 * (time.Hour * 24) // default = 7 days (Message will keep 7 days)
}
admin, err := q.getAdminClient()
if err != nil {
return err
}
defer admin.Close()
// Operation timeout for create topic = 5 minutes
timeout, err := time.ParseDuration("5m")
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
retentionPeriodMillisec := fmt.Sprintf("%d", int64(retentionPeriod/time.Millisecond))
var results []kafka.TopicResult
if timeout > 0 {
results, err = admin.CreateTopics(
ctx,
[]kafka.TopicSpecification{{
Topic: topic,
NumPartitions: partitions,
ReplicationFactor: replications,
Config: map[string]string{
"retention.ms": retentionPeriodMillisec,
}}},
kafka.SetAdminOperationTimeout(timeout))
} else {
results, err = admin.CreateTopics(
ctx,
[]kafka.TopicSpecification{{
Topic: topic,
NumPartitions: partitions,
ReplicationFactor: replications,
Config: map[string]string{
"retention.ms": retentionPeriodMillisec,
}}})
}
if err != nil {
return err
}
for _, result := range results {
q.ms.Log("MQ", result.String())
}
return nil
}