/
node.go
229 lines (197 loc) · 7.21 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package operator
import (
"context"
"fmt"
"github.com/bloxapp/ssv/network"
spectypes "github.com/bloxapp/ssv-spec/types"
"go.uber.org/zap"
"github.com/bloxapp/ssv/eth/executionclient"
"github.com/bloxapp/ssv/exporter/api"
qbftstorage "github.com/bloxapp/ssv/ibft/storage"
"github.com/bloxapp/ssv/logging"
"github.com/bloxapp/ssv/logging/fields"
"github.com/bloxapp/ssv/networkconfig"
"github.com/bloxapp/ssv/operator/duties"
"github.com/bloxapp/ssv/operator/duties/dutystore"
"github.com/bloxapp/ssv/operator/fee_recipient"
"github.com/bloxapp/ssv/operator/slotticker"
"github.com/bloxapp/ssv/operator/storage"
"github.com/bloxapp/ssv/operator/validator"
beaconprotocol "github.com/bloxapp/ssv/protocol/v2/blockchain/beacon"
"github.com/bloxapp/ssv/storage/basedb"
)
// Node represents the behavior of SSV node
type Node interface {
Start(logger *zap.Logger) error
}
// Options contains options to create the node
type Options struct {
// NetworkName is the network name of this node
NetworkName string `yaml:"Network" env:"NETWORK" env-default:"mainnet" env-description:"Network is the network of this node"`
Network networkconfig.NetworkConfig
BeaconNode beaconprotocol.BeaconNode // TODO: consider renaming to ConsensusClient
ExecutionClient *executionclient.ExecutionClient
P2PNetwork network.P2PNetwork
Context context.Context
DB basedb.Database
ValidatorController validator.Controller
ValidatorOptions validator.ControllerOptions `yaml:"ValidatorOptions"`
DutyStore *dutystore.Store
WS api.WebSocketServer
WsAPIPort int
Metrics nodeMetrics
}
// operatorNode implements Node interface
type operatorNode struct {
network networkconfig.NetworkConfig
context context.Context
validatorsCtrl validator.Controller
validatorOptions validator.ControllerOptions
consensusClient beaconprotocol.BeaconNode
executionClient *executionclient.ExecutionClient
net network.P2PNetwork
storage storage.Storage
qbftStorage *qbftstorage.QBFTStores
dutyScheduler *duties.Scheduler
feeRecipientCtrl fee_recipient.RecipientController
ws api.WebSocketServer
wsAPIPort int
metrics nodeMetrics
}
// New is the constructor of operatorNode
func New(logger *zap.Logger, opts Options, slotTickerProvider slotticker.Provider) Node {
storageMap := qbftstorage.NewStores()
roles := []spectypes.BeaconRole{
spectypes.BNRoleAttester,
spectypes.BNRoleProposer,
spectypes.BNRoleAggregator,
spectypes.BNRoleSyncCommittee,
spectypes.BNRoleSyncCommitteeContribution,
spectypes.BNRoleValidatorRegistration,
spectypes.BNRoleVoluntaryExit,
}
for _, role := range roles {
storageMap.Add(role, qbftstorage.New(opts.DB, role.String()))
}
node := &operatorNode{
context: opts.Context,
validatorsCtrl: opts.ValidatorController,
validatorOptions: opts.ValidatorOptions,
network: opts.Network,
consensusClient: opts.BeaconNode,
executionClient: opts.ExecutionClient,
net: opts.P2PNetwork,
storage: opts.ValidatorOptions.RegistryStorage,
qbftStorage: storageMap,
dutyScheduler: duties.NewScheduler(&duties.SchedulerOptions{
Ctx: opts.Context,
BeaconNode: opts.BeaconNode,
ExecutionClient: opts.ExecutionClient,
Network: opts.Network,
ValidatorController: opts.ValidatorController,
IndicesChg: opts.ValidatorController.IndicesChangeChan(),
ValidatorExitCh: opts.ValidatorController.ValidatorExitChan(),
ExecuteDuty: opts.ValidatorController.ExecuteDuty,
BuilderProposals: opts.ValidatorOptions.BuilderProposals,
DutyStore: opts.DutyStore,
SlotTickerProvider: slotTickerProvider,
}),
feeRecipientCtrl: fee_recipient.NewController(&fee_recipient.ControllerOptions{
Ctx: opts.Context,
BeaconClient: opts.BeaconNode,
Network: opts.Network,
ShareStorage: opts.ValidatorOptions.RegistryStorage.Shares(),
RecipientStorage: opts.ValidatorOptions.RegistryStorage,
OperatorDataStore: opts.ValidatorOptions.OperatorDataStore,
SlotTickerProvider: slotTickerProvider,
}),
ws: opts.WS,
wsAPIPort: opts.WsAPIPort,
metrics: opts.Metrics,
}
if node.metrics == nil {
node.metrics = nopMetrics{}
}
return node
}
// Start starts to stream duties and run IBFT instances
func (n *operatorNode) Start(logger *zap.Logger) error {
logger.Named(logging.NameOperator)
logger.Info("All required services are ready. OPERATOR SUCCESSFULLY CONFIGURED AND NOW RUNNING!")
go func() {
err := n.startWSServer(logger)
if err != nil {
// TODO: think if we need to panic
return
}
}()
// Start the duty scheduler, and a background goroutine to crash the node
// in case there were any errors.
if err := n.dutyScheduler.Start(n.context, logger); err != nil {
return fmt.Errorf("failed to run duty scheduler: %w", err)
}
n.validatorsCtrl.StartNetworkHandlers()
if n.validatorOptions.Exporter {
// Subscribe to all subnets.
err := n.net.SubscribeAll(logger)
if err != nil {
logger.Error("failed to subscribe to all subnets", zap.Error(err))
}
}
go n.net.UpdateSubnets(logger)
n.validatorsCtrl.StartValidators()
go n.reportOperators(logger)
go n.feeRecipientCtrl.Start(logger)
go n.validatorsCtrl.UpdateValidatorMetaDataLoop()
if err := n.dutyScheduler.Wait(); err != nil {
logger.Fatal("duty scheduler exited with error", zap.Error(err))
}
return nil
}
// HealthCheck returns a list of issues regards the state of the operator node
func (n *operatorNode) HealthCheck() error {
// TODO: previously this checked availability of consensus & execution clients.
// However, currently the node crashes when those clients are down,
// so this health check is currently a positive no-op.
return nil
}
// handleQueryRequests waits for incoming messages and
func (n *operatorNode) handleQueryRequests(logger *zap.Logger, nm *api.NetworkMessage) {
if nm.Err != nil {
nm.Msg = api.Message{Type: api.TypeError, Data: []string{"could not parse network message"}}
}
logger.Debug("got incoming export request",
zap.String("type", string(nm.Msg.Type)))
switch nm.Msg.Type {
case api.TypeDecided:
api.HandleDecidedQuery(logger, n.qbftStorage, nm)
case api.TypeError:
api.HandleErrorQuery(logger, nm)
default:
api.HandleUnknownQuery(logger, nm)
}
}
func (n *operatorNode) startWSServer(logger *zap.Logger) error {
if n.ws != nil {
logger.Info("starting WS server")
n.ws.UseQueryHandler(n.handleQueryRequests)
if err := n.ws.Start(logger, fmt.Sprintf(":%d", n.wsAPIPort)); err != nil {
return err
}
}
return nil
}
func (n *operatorNode) reportOperators(logger *zap.Logger) {
operators, err := n.storage.ListOperators(nil, 0, 1000) // TODO more than 1000?
if err != nil {
logger.Warn("failed to get all operators for reporting", zap.Error(err))
return
}
logger.Debug("reporting operators", zap.Int("count", len(operators)))
for i := range operators {
n.metrics.OperatorPublicKey(operators[i].ID, operators[i].PublicKey)
logger.Debug("report operator public key",
fields.OperatorID(operators[i].ID),
fields.PubKey(operators[i].PublicKey))
}
}