-
Notifications
You must be signed in to change notification settings - Fork 91
/
event_manager.go
111 lines (98 loc) · 3.37 KB
/
event_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package indexer_manager
import (
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender"
)
type IndexerEventManager interface {
Enabled() bool
AddTxnEvent(ctx sdk.Context, subType string, data string, version uint32)
SendOffchainData(message msgsender.Message)
SendOnchainData(block *IndexerTendermintBlock)
ProduceBlock(ctx sdk.Context) *IndexerTendermintBlock
AddBlockEvent(
ctx sdk.Context,
subType string,
data string,
blockEvent IndexerTendermintEvent_BlockEvent,
version uint32,
)
ClearEvents(ctx sdk.Context)
}
// Ensure the `IndexerEventManager` interface is implemented at compile time.
var _ IndexerEventManager = (*indexerEventManagerImpl)(nil)
type indexerEventManagerImpl struct {
indexerMessageSender msgsender.IndexerMessageSender
indexerEventsTransientStoreKey storetypes.StoreKey
sendOffchainData bool
}
func NewIndexerEventManager(
indexerMessageSender msgsender.IndexerMessageSender,
indexerEventsTransientStoreKey storetypes.StoreKey,
sendOffchainData bool,
) IndexerEventManager {
return &indexerEventManagerImpl{
indexerMessageSender: indexerMessageSender,
indexerEventsTransientStoreKey: indexerEventsTransientStoreKey,
sendOffchainData: sendOffchainData,
}
}
func (i *indexerEventManagerImpl) Enabled() bool {
return i.indexerMessageSender.Enabled()
}
func (i *indexerEventManagerImpl) GetIndexerEventsTransientStoreKey() storetypes.StoreKey {
return i.indexerEventsTransientStoreKey
}
func (i *indexerEventManagerImpl) SendOffchainData(message msgsender.Message) {
if i.indexerMessageSender.Enabled() && i.sendOffchainData {
i.indexerMessageSender.SendOffchainData(message)
}
}
func (i *indexerEventManagerImpl) SendOnchainData(block *IndexerTendermintBlock) {
if i.indexerMessageSender.Enabled() {
message := CreateIndexerBlockEventMessage(block)
i.indexerMessageSender.SendOnchainData(message)
}
}
// AddTxnEvent adds a transaction event to the context's transient store of indexer events.
func (i *indexerEventManagerImpl) AddTxnEvent(
ctx sdk.Context,
subType string,
data string,
version uint32,
) {
if i.indexerMessageSender.Enabled() {
addTxnEvent(ctx, subType, data, version, i.indexerEventsTransientStoreKey)
}
}
// ClearEvents clears all events in the context's transient store of indexer events.
func (i *indexerEventManagerImpl) ClearEvents(
ctx sdk.Context,
) {
if i.indexerMessageSender.Enabled() {
clearEvents(ctx, i.indexerEventsTransientStoreKey)
}
}
// AddBlockEvent adds a block event to the context's transient store of indexer events.
func (i *indexerEventManagerImpl) AddBlockEvent(
ctx sdk.Context,
subType string,
data string,
blockEvent IndexerTendermintEvent_BlockEvent,
version uint32,
) {
if i.indexerMessageSender.Enabled() {
addBlockEvent(ctx, subType, data, i.indexerEventsTransientStoreKey, blockEvent, version)
}
}
// ProduceBlock returns an `IndexerTendermintBlock` containing all the indexer events in the block.
// It should only be called in EndBlocker when the transient store contains all onchain events from
// a ready-to-be-committed block.
func (i *indexerEventManagerImpl) ProduceBlock(
ctx sdk.Context,
) *IndexerTendermintBlock {
if i.indexerMessageSender.Enabled() {
return produceBlock(ctx, i.indexerEventsTransientStoreKey)
}
return nil
}