forked from tryfix/kstream
-
Notifications
You must be signed in to change notification settings - Fork 7
/
logger.go
37 lines (28 loc) · 929 Bytes
/
logger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package state_stores
import (
"context"
"time"
"github.com/gmbyapa/kstream/v2/kafka"
"github.com/gmbyapa/kstream/v2/streams/topology"
)
type changeLogger struct {
producer kafka.Producer
tp kafka.TopicPartition
}
type changeLoggerBuilder struct {
topicFormatter ChangelogTopicFormatter
}
type ChangelogLoggerBuilder interface {
Build(ctx topology.SubTopologyContext, store string) (topology.ChangeLogger, error)
}
func NewChangeLogger(producer kafka.Producer, tp kafka.TopicPartition) topology.ChangeLogger {
return &changeLogger{producer: producer, tp: tp}
}
func (c *changeLogger) Log(ctx context.Context, key, value []byte) error {
record := c.producer.NewRecord(ctx, key, value, c.tp.Topic, c.tp.Partition, time.Now(), nil, ``)
if txPrd, ok := c.producer.(kafka.TransactionalProducer); ok {
return txPrd.ProduceAsync(ctx, record)
}
_, _, err := c.producer.ProduceSync(ctx, record)
return err
}