/
eth2.go
executable file
·303 lines (259 loc) · 7.99 KB
/
eth2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
package eth2
import (
"fmt"
"strconv"
"time"
"github.com/NethermindEth/posmoni/configs"
"github.com/NethermindEth/posmoni/pkg/eth2/db"
net "github.com/NethermindEth/posmoni/pkg/eth2/networking"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
log "github.com/sirupsen/logrus"
)
// Middleware for ETH2 validators monitoring
type eth2Monitor struct {
// Interface for data access
repository db.Repository
// Interface for Beacon chain API interaction
beaconClient net.BeaconAPI
// Interface for ETH1 json-rpc API interaction
executionClient net.ExecutionAPI
// Configuration options for events subscriber
subscriberOpts net.SubscribeOpts
// Configuration data for eth2Monitor
config eth2Config
}
/*
DefaultEth2Monitor :
Factory for eth2Monitor with recommended settings.
params :-
a. opts ConfigOpts
Monitor configuration options
returns :-
a. *eth2Monitor
Monitor middleware intialized with default settings
b. error
Error if any
*/
func DefaultEth2Monitor(opts ConfigOpts) (*eth2Monitor, error) {
// notest
// Setup database
ormdb, err := gorm.Open(sqlite.Open("eth2_monitor.db"), &gorm.Config{})
if err != nil {
return nil, fmt.Errorf(SQLiteCreationError, err)
}
monitor := ð2Monitor{
repository: &db.SQLiteRepository{DB: ormdb},
beaconClient: &net.BeaconClient{RetryDuration: time.Minute},
executionClient: &net.ExecutionClient{RetryDuration: time.Minute},
subscriberOpts: net.SubscribeOpts{
StreamURL: net.FinalizedCkptTopic,
Subscriber: &net.SSESubscriber{},
},
}
err = monitor.setup(opts)
if err != nil {
return nil, fmt.Errorf(SetupError, err)
}
return monitor, nil
}
/*
NewEth2Monitor :
Factory for eth2Monitor.
params :-
a. r db.Repository
Interface implementation for data access
b. bc networking.BeaconAPI
Interface implementation for Beacon chain API interaction
c. so networking.SubscribeOpts
Configuration options for events subscriber. Should include implementation for Subscriber interface.
d. opts ConfigOpts
Monitor configuration options
returns :-
a. *eth2Monitor
Monitor middleware intialized with desired settings
*/
func NewEth2Monitor(r db.Repository, bc net.BeaconAPI, ex net.ExecutionAPI, so net.SubscribeOpts, opts ConfigOpts) (*eth2Monitor, error) {
monitor := ð2Monitor{
repository: r,
beaconClient: bc,
executionClient: ex,
subscriberOpts: so,
}
err := monitor.setup(opts)
if err != nil {
return nil, fmt.Errorf(SetupError, err)
}
return monitor, nil
}
/*
setup :
Handle eth2Monitor configuration.
params :-
a. handleCfg bool
True if configuration setup (configuration file setup or enviroment variables setup) should be handled
b. config *eth2Config
Configuration data. Should be used when is not desired to use config file or enviroment variables to get configuration data.
returns :-
a. error
Error if any
*/
func (e *eth2Monitor) setup(opts ConfigOpts) error {
if opts.HandleCfg {
configs.InitConfig()
}
// TODO: Handle empty opts for uses cases like TrackSync only
cfg, err := Init(opts.Checkers)
if err != nil {
fmt.Println(err)
return err
}
e.config = cfg
// setup beacon nodes endpoints
e.subscriberOpts.Endpoints = e.config.consensus
e.beaconClient.SetEndpoints(e.config.consensus)
if opts.handleLogs {
// setup logger
configs.InitLogging()
}
log.Debugf("Configuration object: %+v", e.config)
if err := e.repository.Migrate(); err != nil {
return fmt.Errorf(MigrationError, err)
}
return nil
}
/*
Monitor :
Pipeline and entrypoint for validator monitoring.
params :-
a. handleCfg bool
True if configuration setup (configuration file setup or enviroment variables setup) should be handled
returns :-
a. []chan struct{}
List of channels to be closed when monitoring is done
b. error
Error if any
*/
func (e *eth2Monitor) Monitor() ([]chan struct{}, error) {
subDone := make(chan struct{})
chkps := net.Subscribe(subDone, e.subscriberOpts)
go e.getValidatorBalance(chkps, e.config.validators)
go e.setupAlerts(chkps)
return []chan struct{}{subDone}, nil
}
/*
getValidatorBalance :
Track validator balance and performance.
params :-
a. chkps <-chan networking.Checkpoint
Channel to get new checkpoints from
b. validatorsIdxs []string
List of validator indexes to track
returns :-
none
*/
func (e *eth2Monitor) getValidatorBalance(chkps <-chan net.Checkpoint, validatorsIdxs []string) {
logFields := log.Fields{configs.Component: "ETH2 Monitor", "Method": "getValidatorBalance"}
for c := range chkps {
log.WithFields(logFields).Infof("Got Checkpoint: %+v", c)
// New finalized checkpoint. Fetch validator balances
// Hardcoding head state for now
vbs, err := e.beaconClient.ValidatorBalances("head", validatorsIdxs)
if err != nil {
log.WithFields(logFields).Errorf(ValidatorBalancesError, err)
continue
}
for _, vb := range vbs {
log.WithFields(logFields).Debugf("Validator Balance fetched: %+v", vb)
// Get validator index from response data
idx, err := parseUint(vb.Index)
if err != nil {
log.WithFields(logFields).Errorf(ParseUintError, err)
continue
}
// Get validator balance from response data
newBalance, err := strconv.ParseUint(vb.Balance, 10, 64)
if err != nil {
log.WithFields(logFields).Errorf(ParseUintError, err)
continue
}
// Get validator from db
v, err := e.repository.FirstOrCreate(db.Validator{Idx: idx, Balance: newBalance})
if err != nil {
log.WithFields(logFields).Errorf(ValidatorNotFoundError, err)
continue
}
if newBalance < v.Balance {
log.WithFields(logFields).Warnf("Attestation has been missed by %d, count: %d", v.Idx, v.MissedAtts+1)
e.repository.Update(db.Validator{
Idx: v.Idx,
Balance: newBalance,
MissedAtts: v.MissedAtts + 1,
MissedAttsTotal: v.MissedAttsTotal + 1,
})
} else {
e.repository.Update(db.Validator{
Idx: v.Idx,
Balance: newBalance,
MissedAtts: 0,
MissedAttsTotal: v.MissedAttsTotal,
})
}
}
}
}
func (e *eth2Monitor) setupAlerts(<-chan net.Checkpoint) {
}
func (e *eth2Monitor) TrackSync(done <-chan struct{}, beaconEndpoints, executionEndpoints []string, wait time.Duration) <-chan EndpointSyncStatus {
logFields := log.Fields{configs.Component: "ETH2 Monitor", "Method": "TrackSync"}
c := make(chan EndpointSyncStatus, len(executionEndpoints)+len(beaconEndpoints))
var w time.Duration
go func() {
for {
select {
case <-done:
close(c)
return
case <-time.After(w):
if w == 0 {
// Don't wait the first time
w = wait
}
// TODO: Benchmark this and check what happens if the processing is longer than the wait
// Check sync progress of beacon nodes
log.WithFields(logFields).Info("Tracking sync progress of consensus nodes...")
bStatus := e.beaconClient.SyncStatus(beaconEndpoints)
for _, s := range bStatus {
if s.Error != nil {
log.WithFields(logFields).Errorf(CheckingSyncStatusError, s.Endpoint, s.Error)
c <- EndpointSyncStatus{Endpoint: s.Endpoint, Error: s.Error}
} else {
if s.IsSyncing {
log.WithFields(logFields).Infof("Endpoint %s is syncing", s.Endpoint)
} else {
log.WithFields(logFields).Infof("Endpoint %s is synced", s.Endpoint)
}
c <- EndpointSyncStatus{Endpoint: s.Endpoint, Synced: !s.IsSyncing}
}
}
// Check sync progress of execution nodes. Rule of Three not acomplished yet, so no harm in repetition :)
log.WithFields(logFields).Info("Tracking sync progress of execution nodes...")
eStatus := e.executionClient.SyncStatus(executionEndpoints)
for _, s := range eStatus {
if s.Error != nil {
log.WithFields(logFields).Errorf(CheckingSyncStatusError, s.Endpoint, s.Error)
c <- EndpointSyncStatus{Endpoint: s.Endpoint, Error: s.Error}
} else {
if s.IsSyncing {
log.WithFields(logFields).Infof("Endpoint %s is syncing", s.Endpoint)
} else {
log.WithFields(logFields).Infof("Endpoint %s is synced", s.Endpoint)
}
c <- EndpointSyncStatus{Endpoint: s.Endpoint, Synced: !s.IsSyncing}
}
}
}
}
}()
return c
}