-
Notifications
You must be signed in to change notification settings - Fork 391
/
load.go
170 lines (152 loc) · 4.66 KB
/
load.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package main
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/google/uuid"
"github.com/cometbft/cometbft/libs/log"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
e2e "github.com/cometbft/cometbft/test/e2e/pkg"
"github.com/cometbft/cometbft/test/loadtime/payload"
"github.com/cometbft/cometbft/types"
)
const workerPoolSize = 16
// Load generates transactions against the network until the given context is
// canceled.
func Load(ctx context.Context, testnet *e2e.Testnet) error {
initialTimeout := 1 * time.Minute
stallTimeout := 30 * time.Second
chSuccess := make(chan struct{})
chFailed := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
defer cancel()
logger.Info("load", "msg", log.NewLazySprintf("Starting transaction load (%v workers)...", workerPoolSize))
started := time.Now()
u := [16]byte(uuid.New()) // generate run ID on startup
txCh := make(chan types.Tx)
go loadGenerate(ctx, txCh, testnet, u[:])
for _, n := range testnet.Nodes {
if n.SendNoLoad {
continue
}
for w := 0; w < testnet.LoadTxConnections; w++ {
go loadProcess(ctx, txCh, chSuccess, chFailed, n)
}
}
// Monitor successful and failed transactions, and abort on stalls.
success, failed := 0, 0
timeout := initialTimeout
for {
rate := log.NewLazySprintf("%.1f", float64(success)/time.Since(started).Seconds())
select {
case <-chSuccess:
success++
timeout = stallTimeout
case <-chFailed:
failed++
case <-time.After(timeout):
return fmt.Errorf("unable to submit transactions for %v", timeout)
case <-ctx.Done():
if success == 0 {
return errors.New("failed to submit any transactions")
}
logger.Info("load", "msg", log.NewLazySprintf("Ending transaction load after %v txs (%v tx/s)...", success, rate))
return nil
}
// Log every ~1 second the number of sent transactions.
total := success + failed
if total%testnet.LoadTxBatchSize == 0 {
successRate := float64(success) / float64(total)
logger.Debug("load", "success", success, "failed", failed, "success/total", log.NewLazySprintf("%.2f", successRate), "tx/s", rate)
}
// Check if reached max number of allowed transactions to send.
if testnet.LoadMaxTxs > 0 && success >= testnet.LoadMaxTxs {
logger.Info("load", "msg", log.NewLazySprintf("Ending transaction load after reaching %v txs (%v tx/s)...", success, rate))
return nil
}
}
}
// loadGenerate generates jobs until the context is canceled.
func loadGenerate(ctx context.Context, txCh chan<- types.Tx, testnet *e2e.Testnet, id []byte) {
t := time.NewTimer(0)
defer t.Stop()
for {
select {
case <-t.C:
case <-ctx.Done():
close(txCh)
return
}
t.Reset(time.Second)
// A context with a timeout is created here to time the createTxBatch
// function out. If createTxBatch has not completed its work by the time
// the next batch is set to be sent out, then the context is canceled so that
// the current batch is halted, allowing the next batch to begin.
tctx, cf := context.WithTimeout(ctx, time.Second)
createTxBatch(tctx, txCh, testnet, id)
cf()
}
}
// createTxBatch creates new transactions and sends them into the txCh. createTxBatch
// returns when either a full batch has been sent to the txCh or the context
// is canceled.
func createTxBatch(ctx context.Context, txCh chan<- types.Tx, testnet *e2e.Testnet, id []byte) {
wg := &sync.WaitGroup{}
genCh := make(chan struct{})
for i := 0; i < workerPoolSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for range genCh {
tx, err := payload.NewBytes(&payload.Payload{
Id: id,
Size: uint64(testnet.LoadTxSizeBytes),
Rate: uint64(testnet.LoadTxBatchSize),
Connections: uint64(testnet.LoadTxConnections),
})
if err != nil {
panic(fmt.Sprintf("Failed to generate tx: %v", err))
}
select {
case txCh <- tx:
case <-ctx.Done():
return
}
}
}()
}
FOR_LOOP:
for i := 0; i < testnet.LoadTxBatchSize; i++ {
select {
case genCh <- struct{}{}:
case <-ctx.Done():
break FOR_LOOP
}
}
close(genCh)
wg.Wait()
}
// loadProcess processes transactions by sending transactions received on the txCh
// to the client.
func loadProcess(ctx context.Context, txCh <-chan types.Tx, chSuccess chan<- struct{}, chFailed chan<- struct{}, n *e2e.Node) {
var client *rpchttp.HTTP
var err error
s := struct{}{}
for tx := range txCh {
if client == nil {
client, err = n.Client()
if err != nil {
logger.Info("non-fatal error creating node client", "error", err)
continue
}
}
if _, err = client.BroadcastTxSync(ctx, tx); err != nil {
logger.Error("failed to send transaction", "err", err)
chFailed <- s
continue
}
chSuccess <- s
}
}