-
Notifications
You must be signed in to change notification settings - Fork 0
/
staker.go
140 lines (111 loc) · 3.33 KB
/
staker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package funder
import (
"context"
"encoding/json"
"fmt"
"math/big"
"net/http"
"strings"
"sync"
"sync/atomic"
"github.com/ethersphere/bee/pkg/bigint"
"github.com/ethersphere/beekeeper/pkg/logging"
"github.com/ethersphere/node-funder/pkg/wallet"
"k8s.io/utils/strings/slices"
)
func Stake(ctx context.Context, cfg Config, nl NodeLister, options ...FunderOptions) error {
opts := DefaultOptions()
for _, opt := range options {
opt(opts)
}
opts.log.Infof("node staking started...")
defer opts.log.Info("node staking finished")
if nl == nil {
var err error
nl, err = newNodeLister()
if err != nil {
return fmt.Errorf("create node lister: %w", err)
}
}
nodes, err := nl.List(ctx, cfg.Namespace)
if err != nil {
return fmt.Errorf("listing nodes failed: %w", err)
}
nodes, omitted := filterBeeNodes(nodes)
if len(omitted) > 0 {
opts.log.Infof("ignoring pods %v", omitted)
}
stakeAllNodes(ctx, nodes, cfg.MinAmounts.SwarmToken, opts.log)
return nil
}
func stakeAllNodes(ctx context.Context, nodes []NodeInfo, min float64, log logging.Logger) {
wg := sync.WaitGroup{}
wg.Add(len(nodes))
var skipped, staked atomic.Int32
for _, n := range nodes {
go func(node NodeInfo) {
defer wg.Done()
si, err := fetchStakeInfo(ctx, node.Address)
if err != nil {
log.Infof("get stake info for node[%s] failed; reason: %s", node.Name, err)
return
}
amount := calcTopUpAmount(min, si.StakedAmount, wallet.SwarmTokenDecimals)
if amount.Cmp(big.NewInt(0)) <= 0 {
skipped.Add(1)
log.Infof("node[%s] - already staked", node.Name)
// Top up is not needed, current stake value is sufficient
return
}
if err = stakeNode(ctx, node.Address, amount); err != nil {
log.Infof("node[%s] - staking failed; reason: %s", node.Name, err)
} else {
log.Infof("node[%s] - staked", node.Name)
staked.Add(1)
}
}(n)
}
wg.Wait()
log.Infof("staked %d", staked.Load())
log.Infof("skipped %d", skipped.Load())
log.Infof("failed %d", len(nodes)-int(staked.Load())-int(skipped.Load()))
log.Infof("total %d", len(nodes))
}
func stakeNode(ctx context.Context, nodeAddress string, amount *big.Int) error {
_, err := sendHTTPRequest(ctx, http.MethodPost, nodeAddress+"/stake/"+amount.String())
return err
}
type stakeInfo struct {
StakedAmount *big.Int
}
func fetchStakeInfo(ctx context.Context, nodeAddress string) (stakeInfo, error) {
responseBytes, err := sendHTTPRequest(ctx, http.MethodGet, nodeAddress+"/stake")
if err != nil {
return stakeInfo{}, fmt.Errorf("get bee stake info failed: %w", err)
}
response := struct {
StakedAmount *bigint.BigInt `json:"stakedAmount"`
}{}
if err := json.Unmarshal(responseBytes, &response); err != nil {
return stakeInfo{}, fmt.Errorf("failed to unmarshal response :%w", err)
}
return stakeInfo{
StakedAmount: response.StakedAmount.Int,
}, nil
}
func filterBeeNodes(nodes []NodeInfo) ([]NodeInfo, []NodeInfo) {
result := make([]NodeInfo, 0, len(nodes))
omitted := make([]NodeInfo, 0)
for _, n := range nodes {
parts := strings.Split(n.Name, "-")
if slices.Contains(parts, "bee") {
result = append(result, n)
} else {
omitted = append(omitted, n)
}
}
return result, omitted
}