-
Notifications
You must be signed in to change notification settings - Fork 0
/
msgsender_kafka.go
196 lines (171 loc) · 5.65 KB
/
msgsender_kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package msgsender
import (
"sync"
"time"
"github.com/Shopify/sarama"
"github.com/burdiyan/kafkautil"
"github.com/cometbft/cometbft/libs/log"
"github.com/cosmos/cosmos-sdk/telemetry"
"github.com/furyanprotocol/v4-chain/protocol/indexer"
"github.com/furyanprotocol/v4-chain/protocol/indexer/msgsender/types"
"github.com/furyanprotocol/v4-chain/protocol/lib/metrics"
)
// Ensure the `IndexerMessageSender` interface is implemented at compile time.
var _ IndexerMessageSender = (*IndexerMessageSenderKafka)(nil)
// Implementation of the IndexerMessageSender interface that sends data to Kafka.
// Will be used when the V4 application is connected to an Indexer.
// NOTE: This struct is go-routine safe. Messages are sent by writing to a single-channel, and a
// mutex and boolean variable is used to ensure `Close` only closes the underlying Kafka producer
// once.
type IndexerMessageSenderKafka struct {
mutex sync.Mutex
closed bool
inputsDone sync.WaitGroup
producer sarama.AsyncProducer
logger log.Logger
successes int
errors int
}
func NewIndexerMessageSenderKafka(
indexerFlags indexer.IndexerFlags,
config *sarama.Config,
logger log.Logger,
) (*IndexerMessageSenderKafka, error) {
if config == nil {
config = sarama.NewConfig()
}
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config.Producer.Retry.Max = indexerFlags.MaxRetries
config.Producer.MaxMessageBytes = 4194304 // 4MB
// Use the JVM compatible parititoner to match `kafkajs` which is used in the indexer services.
config.Producer.Partitioner = kafkautil.NewJVMCompatiblePartitioner
producer, err := sarama.NewAsyncProducer(indexerFlags.KafkaAddrs, config)
if err != nil {
return nil, err
}
sender := NewIndexerMessageSenderKafkaWithProducer(producer, logger)
return sender, nil
}
func NewIndexerMessageSenderKafkaWithProducer(
producer sarama.AsyncProducer,
logger log.Logger,
) *IndexerMessageSenderKafka {
sender := &IndexerMessageSenderKafka{
inputsDone: sync.WaitGroup{},
closed: false,
producer: producer,
logger: logger,
successes: 0,
errors: 0,
}
// The wait group waits for successes and errors which is why it is 2.
sender.inputsDone.Add(2)
go sender.handleSuccesses()
go sender.handleErrors()
return sender
}
func (msgSender *IndexerMessageSenderKafka) Enabled() bool {
return true
}
// SendOnchainData sends a key/value pair of byte slices to the on-chain data kafka topic.
// This method is go-routine safe.
func (msgSender *IndexerMessageSenderKafka) SendOnchainData(message Message) {
defer telemetry.ModuleMeasureSince(
types.ModuleName,
time.Now(),
metrics.SendOnchainData,
metrics.Latency,
)
value := sarama.ByteEncoder(message.Value)
telemetry.SetGauge(float32(value.Length()), types.ModuleName, metrics.OnchainMessageLength)
msgSender.send(&sarama.ProducerMessage{
Topic: ON_CHAIN_KAFKA_TOPIC,
Key: sarama.ByteEncoder(message.Key),
Value: value,
Headers: message.Headers,
})
}
// SendOffchainData sends a key/value pair of byte slices to the off-chain data kafka topic.
// This method is go-routine safe.
func (msgSender *IndexerMessageSenderKafka) SendOffchainData(message Message) {
defer telemetry.ModuleMeasureSince(
types.ModuleName,
time.Now(),
metrics.SendOffchainData,
metrics.Latency,
)
value := sarama.ByteEncoder(message.Value)
telemetry.SetGauge(float32(value.Length()), types.ModuleName, metrics.OffchainMessageLength)
msgSender.send(&sarama.ProducerMessage{
Topic: OFF_CHAIN_KAFKA_TOPIC,
Key: sarama.ByteEncoder(message.Key),
Value: value,
Headers: message.Headers,
})
}
// send sends a message to Kafka. This method is go-routine safe.
func (msgSender *IndexerMessageSenderKafka) send(message *sarama.ProducerMessage) {
msgSender.mutex.Lock()
defer msgSender.mutex.Unlock()
if msgSender.closed {
msgSender.logger.Error("Cannot send to a closed IndexerMessageSenderKafka.")
return
}
msgSender.producer.Input() <- message
}
// Close closes the underlying `AsyncProducer` and waits for all errors/success messages to be
// processed before returning.
func (msgSender *IndexerMessageSenderKafka) Close() error {
// Lock to make this function go-routine safe.
msgSender.mutex.Lock()
defer msgSender.mutex.Unlock()
// Ensure that the producer is only closed once.
if msgSender.closed {
return ErrKafkaAlreadyClosed
}
err := msgSender.producer.Close()
if err != nil {
return err
}
// Wait for success and error messages from the `AsyncProducer` to finish processing before
// returning. Each goroutine will signal the channel.
msgSender.inputsDone.Wait()
msgSender.closed = true
return nil
}
// handleSuccesses reads messages from the success channel of the `AsyncProducer`
// This is required so that the producer will not deadlock due to the channel becoming full.
func (msgSender *IndexerMessageSenderKafka) handleSuccesses() {
c := msgSender.producer.Successes()
for {
_, ok := <-c
if !ok {
msgSender.inputsDone.Done()
return
}
msgSender.successes = msgSender.successes + 1
telemetry.IncrCounter(1, types.ModuleName, metrics.MessageSendSuccess)
}
}
// handleErrors reads messages from the error channel of the `AsyncProducer`
// This is required so that the producer will not deadlock due to the channel becoming full.
func (msgSender *IndexerMessageSenderKafka) handleErrors() {
c := msgSender.producer.Errors()
for {
err, ok := <-c
if !ok {
msgSender.inputsDone.Done()
return
}
msgSender.logger.Error(
"Failed to deliver message to Indexer",
"message",
err.Msg,
"error",
err.Err,
)
msgSender.errors = msgSender.errors + 1
telemetry.IncrCounter(1, types.ModuleName, metrics.MessageSendError)
}
}