/
microservice_consumer.go
51 lines (42 loc) · 1.21 KB
/
microservice_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Create and maintain by Chaiyapong Lapliengtrakul (chaiyapong@3dsinteractive.com), All right reserved (2021 - Present)
package main
import (
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func (ms *Microservice) consumeSingle(servers string, topic string, groupID string, readTimeout time.Duration, h ServiceHandleFunc) {
c, err := ms.newKafkaConsumer(servers, groupID)
if err != nil {
return
}
defer c.Close()
c.Subscribe(topic, nil)
for {
if readTimeout <= 0 {
// readtimeout -1 indicates no timeout
readTimeout = -1
}
msg, err := c.ReadMessage(readTimeout)
if err != nil {
kafkaErr, ok := err.(kafka.Error)
if ok {
if kafkaErr.Code() == kafka.ErrTimedOut {
if readTimeout == -1 {
// No timeout just continue to read message again
continue
}
}
}
ms.Log("Consumer", err.Error())
ms.Stop()
return
}
// Execute Handler
h(NewConsumerContext(ms, string(msg.Value)))
}
}
// Consume register service endpoint for Consumer service
func (ms *Microservice) Consume(servers string, topic string, groupID string, readTimeout time.Duration, h ServiceHandleFunc) error {
go ms.consumeSingle(servers, topic, groupID, readTimeout, h)
return nil
}