forked from centrifuge/go-substrate-rpc-client
/
event_retriever.go
152 lines (125 loc) · 4.43 KB
/
event_retriever.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package retriever
import (
"time"
"github.com/Cerebellum-Network/go-substrate-rpc-client/v9/registry"
"github.com/Cerebellum-Network/go-substrate-rpc-client/v9/registry/exec"
"github.com/Cerebellum-Network/go-substrate-rpc-client/v9/registry/parser"
regState "github.com/Cerebellum-Network/go-substrate-rpc-client/v9/registry/state"
"github.com/Cerebellum-Network/go-substrate-rpc-client/v9/rpc/state"
"github.com/Cerebellum-Network/go-substrate-rpc-client/v9/types"
)
//nolint:lll
//go:generate mockery --name EventRetriever --structname EventRetrieverMock --filename event_retriever_mock.go --inpackage
// EventRetriever is the interface used for retrieving and decoding events.
type EventRetriever interface {
GetEvents(blockHash types.Hash) ([]*parser.Event, error)
}
// eventRetriever implements the EventRetriever interface.
type eventRetriever struct {
eventParser parser.EventParser
eventProvider regState.EventProvider
stateRPC state.State
registryFactory registry.Factory
eventStorageExecutor exec.RetryableExecutor[*types.StorageDataRaw]
eventParsingExecutor exec.RetryableExecutor[[]*parser.Event]
eventRegistry registry.EventRegistry
meta *types.Metadata
}
// NewEventRetriever creates a new EventRetriever.
func NewEventRetriever(
eventParser parser.EventParser,
eventProvider regState.EventProvider,
stateRPC state.State,
registryFactory registry.Factory,
eventStorageExecutor exec.RetryableExecutor[*types.StorageDataRaw],
eventParsingExecutor exec.RetryableExecutor[[]*parser.Event],
) (EventRetriever, error) {
retriever := &eventRetriever{
eventParser: eventParser,
eventProvider: eventProvider,
stateRPC: stateRPC,
registryFactory: registryFactory,
eventStorageExecutor: eventStorageExecutor,
eventParsingExecutor: eventParsingExecutor,
}
if err := retriever.updateInternalState(nil); err != nil {
return nil, ErrInternalStateUpdate.Wrap(err)
}
return retriever, nil
}
// NewDefaultEventRetriever creates a new EventRetriever using defaults for:
//
// - parser.EventParser
// - registry.Factory
// - exec.RetryableExecutor - used for retrieving event storage data.
// - exec.RetryableExecutor - used for parsing events.
func NewDefaultEventRetriever(
eventProvider regState.EventProvider,
stateRPC state.State,
fieldOverrides ...registry.FieldOverride,
) (EventRetriever, error) {
eventParser := parser.NewEventParser()
registryFactory := registry.NewFactory(fieldOverrides...)
eventStorageExecutor := exec.NewRetryableExecutor[*types.StorageDataRaw](exec.WithRetryTimeout(1 * time.Second))
eventParsingExecutor := exec.NewRetryableExecutor[[]*parser.Event](exec.WithMaxRetryCount(1))
return NewEventRetriever(
eventParser,
eventProvider,
stateRPC,
registryFactory,
eventStorageExecutor,
eventParsingExecutor,
)
}
// GetEvents retrieves the storage data for an Event and then parses it.
//
// Both the event storage data retrieval and the event parsing are handled via the exec.RetryableExecutor
// in order to ensure retries in case of network errors or parsing errors due to an outdated event registry.
func (e *eventRetriever) GetEvents(blockHash types.Hash) ([]*parser.Event, error) {
storageEvents, err := e.eventStorageExecutor.ExecWithFallback(
func() (*types.StorageDataRaw, error) {
return e.eventProvider.GetStorageEvents(e.meta, blockHash)
},
func() error {
return e.updateInternalState(&blockHash)
},
)
if err != nil {
return nil, ErrStorageEventRetrieval.Wrap(err)
}
events, err := e.eventParsingExecutor.ExecWithFallback(
func() ([]*parser.Event, error) {
return e.eventParser.ParseEvents(e.eventRegistry, storageEvents)
},
func() error {
return e.updateInternalState(&blockHash)
},
)
if err != nil {
return nil, ErrEventParsing.Wrap(err)
}
return events, nil
}
// updateInternalState will retrieve the metadata at the provided blockHash, if provided,
// create an event registry based on this metadata and store both.
func (e *eventRetriever) updateInternalState(blockHash *types.Hash) error {
var (
meta *types.Metadata
err error
)
if blockHash == nil {
meta, err = e.stateRPC.GetMetadataLatest()
} else {
meta, err = e.stateRPC.GetMetadata(*blockHash)
}
if err != nil {
return ErrMetadataRetrieval.Wrap(err)
}
eventRegistry, err := e.registryFactory.CreateEventRegistry(meta)
if err != nil {
return ErrEventRegistryCreation.Wrap(err)
}
e.meta = meta
e.eventRegistry = eventRegistry
return nil
}