/
consumer_kafka.go
134 lines (120 loc) · 3.22 KB
/
consumer_kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package server
import (
"fmt"
"strings"
"sync"
"github.com/Shopify/sarama"
"github.com/coinsky/trade-server/core"
toml "github.com/pelletier/go-toml"
log "github.com/sirupsen/logrus"
)
type TradeConsumer struct {
sarama.Consumer
topic string
stopChan chan byte
quitChan chan byte
hub *core.Hub
writer MsgWriter
}
func NewKafkaConsumer(svrConfig *toml.Tree, topic string, hub *core.Hub) (*TradeConsumer, error) {
var (
writer MsgWriter
err error
consumer sarama.Consumer
)
if writer, err = initBackupWriter(svrConfig); err != nil {
return nil, err
}
if consumer, err = newKafka(svrConfig); err != nil {
return nil, err
}
return &TradeConsumer{
Consumer: consumer,
topic: topic,
stopChan: make(chan byte, 1),
quitChan: make(chan byte, 1),
hub: hub,
writer: writer,
}, nil
}
func newKafka(svrConfig *toml.Tree) (sarama.Consumer, error) {
addrs := svrConfig.GetDefault("kafka-addrs", "").(string)
if len(addrs) == 0 {
log.Error("kafka address is empty")
return nil, fmt.Errorf("kafka address is empty")
}
sarama.Logger = log.StandardLogger()
consumer, err := sarama.NewConsumer(strings.Split(addrs, ","), nil)
if err != nil {
log.WithError(err).Error("create consumer error")
return nil, err
}
return consumer, nil
}
func (tc *TradeConsumer) Consume() {
defer close(tc.stopChan)
partitionList, err := tc.Partitions(tc.topic)
if err != nil {
panic(err)
}
log.WithField("size", len(partitionList)).Info("consumer partitions")
wg := &sync.WaitGroup{}
for _, partition := range partitionList {
offset := tc.hub.LoadOffset(partition)
if offset == 0 {
// start from the oldest offset
offset = sarama.OffsetOldest
} else {
// start from next offset
offset++
}
pc, err := tc.ConsumePartition(tc.topic, partition, offset)
if err != nil {
log.WithError(err).Errorf("Failed to start consumer for partition %d", partition)
continue
}
wg.Add(1)
go func(pc sarama.PartitionConsumer, partition int32) {
log.WithFields(log.Fields{"partition": partition, "offset": offset}).Info("PartitionConsumer start")
defer func() {
pc.AsyncClose()
wg.Done()
log.WithFields(log.Fields{"partition": partition, "offset": offset}).Info("PartitionConsumer close")
}()
for {
select {
case msg := <-pc.Messages():
// update offset, and then commit to db
tc.hub.UpdateOffset(msg.Partition, msg.Offset)
tc.hub.ConsumeMessage(string(msg.Key), msg.Value)
offset = msg.Offset
if tc.writer != nil {
if err := tc.writer.WriteKV(msg.Key, msg.Value); err != nil {
log.WithError(err).Error("write file failed")
}
}
log.WithFields(log.Fields{"key": string(msg.Key), "value": string(msg.Value), "offset": offset}).Debug("consume message")
case <-tc.quitChan:
return
}
}
}(pc, partition)
}
wg.Wait()
}
func (tc *TradeConsumer) Close() {
close(tc.quitChan)
<-tc.stopChan
if err := tc.Consumer.Close(); err != nil {
log.WithError(err).Error("consumer close failed")
}
if tc.writer != nil {
if err := tc.writer.Close(); err != nil {
log.WithError(err).Error("file close failed")
}
}
log.Info("Consumer close")
}
func (tc *TradeConsumer) String() string {
return "kafka-consumer"
}