-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.go
105 lines (86 loc) · 2.78 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package consumer
import (
"context"
"encoding/json"
"github.com/DIMO-Network/meta-transaction-processor/internal/models"
"github.com/DIMO-Network/shared"
"github.com/DIMO-Network/shared/db"
"github.com/Shopify/sarama"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog"
"github.com/volatiletech/sqlboiler/v4/boil"
)
var requestsTotal = promauto.NewCounter(
prometheus.CounterOpts{
Namespace: "meta_transaction_processor",
Subsystem: "consumer",
Name: "requests_total",
},
)
type consumer struct {
logger *zerolog.Logger
dbs db.Store
}
type TransactionEventData struct {
ID string `json:"id"`
To common.Address `json:"to"`
Data hexutil.Bytes `json:"data"`
}
func (c *consumer) Setup(sarama.ConsumerGroupSession) error { return nil }
func (c *consumer) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case msg := <-claim.Messages():
requestsTotal.Inc()
logger := c.logger.With().Int32("partition", msg.Partition).Int64("offset", msg.Offset).Logger()
var event shared.CloudEvent[TransactionEventData]
if err := json.Unmarshal(msg.Value, &event); err != nil {
logger.Err(err).Msg("Couldn't parse request, skipping.")
session.MarkMessage(msg, "")
continue
}
data := event.Data
if len(data.ID) != 27 {
logger.Error().Msgf("Invalid request id: %s.", data.ID)
session.MarkMessage(msg, "")
continue
}
logger = logger.With().Str("requestId", data.ID).Str("contract", data.To.Hex()).Logger()
logger.Info().Msg("Got transaction request.")
tx := models.MetaTransactionRequest{
ID: data.ID,
To: data.To.Bytes(),
Data: data.Data,
}
// Don't really want to update.
if err := tx.Upsert(session.Context(), c.dbs.DBS().Writer, false, []string{models.MetaTransactionRequestColumns.ID}, boil.None(), boil.Infer()); err != nil {
logger.Err(err).Msg("Error saving transaction.")
return err
}
session.MarkMessage(msg, "")
case <-session.Context().Done():
return nil
}
}
}
func New(ctx context.Context, name string, topic string, kafkaClient sarama.Client, logger *zerolog.Logger, dbs db.Store) error {
group, err := sarama.NewConsumerGroupFromClient(name, kafkaClient)
if err != nil {
return err
}
consumer := &consumer{logger: logger, dbs: dbs}
for {
err := group.Consume(ctx, []string{topic}, consumer)
if err != nil {
logger.Err(err).Msg("Consumer group session did not terminate gracefully.")
}
if ctx.Err() != nil {
// Context canceled, so quit.
return nil
}
}
}