/
kafka_types.go
executable file
·124 lines (110 loc) · 3.56 KB
/
kafka_types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Copyright [2021] - [2022], AssetMantle Pte. Ltd. and the code contributors
// SPDX-License-Identifier: Apache-2.0
package queuing
import (
"github.com/Shopify/sarama"
"github.com/cosmos/cosmos-sdk/client"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/rest"
dbm "github.com/tendermint/tm-db"
)
// TicketID : is a type that implements string
type TicketID string
// kafkaMsg : is a store that can be stored in kafka queues
type kafkaMsg struct {
Msg sdk.Msg `json:"msg"`
TicketID TicketID `json:"TicketID"`
BaseRequest rest.BaseReq `json:"base_req"`
KafkaCliCtx kafkaCliCtx `json:"kafkaCliCtx"`
}
// NewKafkaMsgFromRest : makes a msg to send to kafka queue
func NewKafkaMsgFromRest(msg sdk.Msg, ticketID TicketID, baseRequest rest.BaseReq, context client.Context) kafkaMsg {
kafkaCtx := kafkaCliCtx{
OutputFormat: context.OutputFormat,
ChainID: context.ChainID,
Height: context.Height,
HomeDir: context.HomeDir,
NodeURI: context.NodeURI,
From: context.From,
UseLedger: context.UseLedger,
BroadcastMode: context.BroadcastMode,
Simulate: context.Simulate,
GenerateOnly: context.GenerateOnly,
FromAddress: context.FromAddress,
FromName: context.FromName,
SkipConfirm: context.SkipConfirm,
}
// TODO return pointer
return kafkaMsg{
Msg: msg,
TicketID: ticketID,
BaseRequest: baseRequest,
KafkaCliCtx: kafkaCtx,
}
}
// cliCtxFromKafkaMsg : sets the transaction and cli contexts again to consume
func cliCtxFromKafkaMsg(kafkaMsg kafkaMsg, context client.Context) client.Context {
context.OutputFormat = kafkaMsg.KafkaCliCtx.OutputFormat
context.ChainID = kafkaMsg.KafkaCliCtx.ChainID
context.Height = kafkaMsg.KafkaCliCtx.Height
context.HomeDir = kafkaMsg.KafkaCliCtx.HomeDir
context.NodeURI = kafkaMsg.KafkaCliCtx.NodeURI
context.From = kafkaMsg.KafkaCliCtx.From
context.UseLedger = kafkaMsg.KafkaCliCtx.UseLedger
context.BroadcastMode = kafkaMsg.KafkaCliCtx.BroadcastMode
context.Simulate = kafkaMsg.KafkaCliCtx.Simulate
context.GenerateOnly = kafkaMsg.KafkaCliCtx.GenerateOnly
context.FromAddress = kafkaMsg.KafkaCliCtx.FromAddress
context.FromName = kafkaMsg.KafkaCliCtx.FromName
context.SkipConfirm = kafkaMsg.KafkaCliCtx.SkipConfirm
return context
}
// kafkaCliCtx : client tx without codec
type kafkaCliCtx struct {
FromAddress sdk.AccAddress
OutputFormat string
ChainID string
HomeDir string
NodeURI string
From string
BroadcastMode string
FromName string
Height int64
UseLedger bool
Simulate bool
GenerateOnly bool
Offline bool
Indent bool
SkipConfirm bool
}
// kafkaState : is a struct showing the state of kafka
type kafkaState struct {
KafkaDB *dbm.GoLevelDB
Admin sarama.ClusterAdmin
Consumer sarama.Consumer
Consumers map[string]sarama.PartitionConsumer
Producer sarama.SyncProducer
Topics []string
IsEnabled bool
}
// NewKafkaState : returns a kafka state
func NewKafkaState(nodeList []string) *kafkaState {
kafkaDB, _ := dbm.NewGoLevelDB("KafkaDB", defaultCLIHome)
admin := kafkaAdmin(nodeList)
producer := newProducer(nodeList)
consumer := newConsumer(nodeList)
var consumers = make(map[string]sarama.PartitionConsumer)
for _, topic := range topics {
partitionConsumer := partitionConsumers(consumer, topic)
consumers[topic] = partitionConsumer
}
return &kafkaState{
KafkaDB: kafkaDB,
Admin: admin,
Consumer: consumer,
Consumers: consumers,
Producer: producer,
Topics: topics,
IsEnabled: true,
}
}