-
Notifications
You must be signed in to change notification settings - Fork 2
/
consumer.go
43 lines (38 loc) · 995 Bytes
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package mq
import (
"lib/mq"
"lib/mq/kafka"
)
type MQConsumer interface {
RegisterByTopic(topic string, callback mq.IConsumerCallback) error
Register(callback mq.IConsumerCallback) error
Start() error
Close() error
}
type ConsumerTopicModel struct {
Group string
Topic string
}
func NewMQConsumer(kind string, cluster string, confPath string, logPath string, topicModels []*ConsumerTopicModel) MQConsumer {
//translate mq topics
mqTopics := translateToMqTopicModel(topicModels)
//choose
switch kind {
case "kafka":
return kafka.NewMQConsumer(confPath, mqTopics, logPath)
default:
return kafka.NewMQConsumer(confPath, mqTopics, logPath)
}
}
//translate
func translateToMqTopicModel(topicModels []*ConsumerTopicModel) []*mq.ConsumerTopicModel {
returns := make([]*mq.ConsumerTopicModel, 0, len(topicModels))
for _, o := range topicModels {
item := mq.ConsumerTopicModel{
Group: o.Group,
Topic: o.Topic,
}
returns = append(returns, &item)
}
return returns
}