forked from sei-protocol/sei-tendermint
/
load.go
201 lines (169 loc) · 5.24 KB
/
load.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package main
import (
"container/ring"
"context"
"fmt"
"math/rand"
"time"
"github.com/ari-anchor/sei-tendermint/libs/log"
tmrand "github.com/ari-anchor/sei-tendermint/libs/rand"
rpchttp "github.com/ari-anchor/sei-tendermint/rpc/client/http"
e2e "github.com/ari-anchor/sei-tendermint/test/e2e/pkg"
"github.com/ari-anchor/sei-tendermint/types"
)
// Load generates transactions against the network until the given context is
// canceled.
func Load(ctx context.Context, logger log.Logger, r *rand.Rand, testnet *e2e.Testnet) error {
// Since transactions are executed across all nodes in the network, we need
// to reduce transaction load for larger networks to avoid using too much
// CPU. This gives high-throughput small networks and low-throughput large ones.
// This also limits the number of TCP connections, since each worker has
// a connection to all nodes.
concurrency := len(testnet.Nodes) * 2
if concurrency > 32 {
concurrency = 32
}
chTx := make(chan types.Tx)
chSuccess := make(chan int) // success counts per iteration
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Spawn job generator and processors.
logger.Info("starting transaction load",
"workers", concurrency,
"nodes", len(testnet.Nodes),
"tx", testnet.TxSize)
started := time.Now()
go loadGenerate(ctx, r, chTx, testnet.TxSize, len(testnet.Nodes))
for w := 0; w < concurrency; w++ {
go loadProcess(ctx, testnet, chTx, chSuccess)
}
// Montior transaction to ensure load propagates to the network
//
// This loop doesn't check or time out for stalls, since a stall here just
// aborts the load generator sooner and could obscure backpressure
// from the test harness, and there are other checks for
// stalls in the framework. Ideally we should monitor latency as a guide
// for when to give up, but we don't have a good way to track that yet.
success := 0
for {
select {
case numSeen := <-chSuccess:
success += numSeen
case <-ctx.Done():
if success == 0 {
return fmt.Errorf("failed to submit transactions in %s by %d workers",
time.Since(started), concurrency)
}
// TODO perhaps allow test networks to
// declare required transaction rates, which
// might allow us to avoid the special case
// around 0 txs above.
rate := float64(success) / time.Since(started).Seconds()
logger.Info("ending transaction load",
"dur_secs", time.Since(started).Seconds(),
"txns", success,
"workers", concurrency,
"rate", rate)
return nil
}
}
}
// loadGenerate generates jobs until the context is canceled.
//
// The chTx has multiple consumers, thus the rate limiting of the load
// generation is primarily the result of backpressure from the
// broadcast transaction, though there is still some timer-based
// limiting.
func loadGenerate(ctx context.Context, r *rand.Rand, chTx chan<- types.Tx, txSize int, networkSize int) {
timer := time.NewTimer(0)
defer timer.Stop()
defer close(chTx)
for {
select {
case <-ctx.Done():
return
case <-timer.C:
}
// Constrain the key space to avoid using too much
// space, while reduce the size of the data in the app.
id := r.Int63n(100)
tx := types.Tx(fmt.Sprintf("load-%X=%s", id, tmrand.StrFromSource(r, txSize)))
select {
case <-ctx.Done():
return
case chTx <- tx:
// sleep for a bit before sending the
// next transaction.
timer.Reset(loadGenerateWaitTime(r, networkSize))
}
}
}
func loadGenerateWaitTime(r *rand.Rand, size int) time.Duration {
const (
min = int64(250 * time.Millisecond)
max = int64(time.Second)
)
var (
baseJitter = r.Int63n(max-min+1) + min
sizeFactor = int64(size) * min
sizeJitter = r.Int63n(sizeFactor-min+1) + min
)
return time.Duration(baseJitter + sizeJitter)
}
// loadProcess processes transactions
func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- int) {
// Each worker gets its own client to each usable node, which
// allows for some concurrency while still bounding it.
clients := make([]*rpchttp.HTTP, 0, len(testnet.Nodes))
for idx := range testnet.Nodes {
// Construct a list of usable nodes for the creating
// load. Don't send load through seed nodes because
// they do not provide the RPC endpoints required to
// broadcast transaction.
if testnet.Nodes[idx].Mode == e2e.ModeSeed {
continue
}
client, err := testnet.Nodes[idx].Client()
if err != nil {
continue
}
clients = append(clients, client)
}
if len(clients) == 0 {
panic("no clients to process load")
}
// Put the clients in a ring so they can be used in a
// round-robin fashion.
clientRing := ring.New(len(clients))
for idx := range clients {
clientRing.Value = clients[idx]
clientRing = clientRing.Next()
}
successes := 0
for {
select {
case <-ctx.Done():
return
case tx := <-chTx:
clientRing = clientRing.Next()
client := clientRing.Value.(*rpchttp.HTTP)
if status, err := client.Status(ctx); err != nil {
continue
} else if status.SyncInfo.CatchingUp {
continue
}
if _, err := client.BroadcastTxSync(ctx, tx); err != nil {
continue
}
successes++
select {
case chSuccess <- successes:
successes = 0 // reset counter for the next iteration
continue
case <-ctx.Done():
return
default:
}
}
}
}