forked from cometbft/cometbft
/
rpc.go
134 lines (118 loc) · 3.53 KB
/
rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package main
import (
"context"
"errors"
"fmt"
"time"
rpchttp "github.com/ben2077/cometbft/rpc/client/http"
rpctypes "github.com/ben2077/cometbft/rpc/core/types"
e2e "github.com/ben2077/cometbft/test/e2e/pkg"
"github.com/ben2077/cometbft/types"
)
// waitForHeight waits for the network to reach a certain height (or above),
// returning the highest height seen. Errors if the network is not making
// progress at all.
func waitForHeight(ctx context.Context, testnet *e2e.Testnet, height int64) (*types.Block, *types.BlockID, error) {
var (
err error
maxResult *rpctypes.ResultBlock
clients = map[string]*rpchttp.HTTP{}
lastIncrease = time.Now()
)
timer := time.NewTimer(0)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case <-timer.C:
for _, node := range testnet.Nodes {
if node.Stateless() {
continue
}
client, ok := clients[node.Name]
if !ok {
client, err = node.Client()
if err != nil {
continue
}
clients[node.Name] = client
}
subctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
result, err := client.Block(subctx, nil)
if err == context.DeadlineExceeded || err == context.Canceled {
return nil, nil, ctx.Err()
}
if err != nil {
continue
}
if result.Block != nil && (maxResult == nil || result.Block.Height > maxResult.Block.Height) {
maxResult = result
lastIncrease = time.Now()
}
if maxResult != nil && maxResult.Block.Height >= height {
return maxResult.Block, &maxResult.BlockID, nil
}
}
if len(clients) == 0 {
return nil, nil, errors.New("unable to connect to any network nodes")
}
if time.Since(lastIncrease) >= 20*time.Second {
if maxResult == nil {
return nil, nil, errors.New("chain stalled at unknown height")
}
return nil, nil, fmt.Errorf("chain stalled at height %v", maxResult.Block.Height)
}
timer.Reset(1 * time.Second)
}
}
}
// waitForNode waits for a node to become available and catch up to the given block height.
func waitForNode(ctx context.Context, node *e2e.Node, height int64, timeout time.Duration) (*rpctypes.ResultStatus, error) {
client, err := node.Client()
if err != nil {
return nil, err
}
timer := time.NewTimer(0)
defer timer.Stop()
var curHeight int64
lastChanged := time.Now()
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timer.C:
status, err := client.Status(ctx)
switch {
case time.Since(lastChanged) > timeout:
return nil, fmt.Errorf("timed out waiting for %v to reach height %v", node.Name, height)
case err != nil:
case status.SyncInfo.LatestBlockHeight >= height && (height == 0 || !status.SyncInfo.CatchingUp):
return status, nil
case curHeight < status.SyncInfo.LatestBlockHeight:
curHeight = status.SyncInfo.LatestBlockHeight
lastChanged = time.Now()
}
timer.Reset(300 * time.Millisecond)
}
}
}
// waitForAllNodes waits for all nodes to become available and catch up to the given block height.
func waitForAllNodes(ctx context.Context, testnet *e2e.Testnet, height int64, timeout time.Duration) (int64, error) {
var lastHeight int64
deadline := time.Now().Add(timeout)
for _, node := range testnet.Nodes {
if node.Mode == e2e.ModeSeed {
continue
}
status, err := waitForNode(ctx, node, height, time.Until(deadline))
if err != nil {
return 0, err
}
if status.SyncInfo.LatestBlockHeight > lastHeight {
lastHeight = status.SyncInfo.LatestBlockHeight
}
}
return lastHeight, nil
}