Skip to content
This repository has been archived by the owner on Jun 17, 2022. It is now read-only.

Commit

Permalink
tx-storm creates test accounts on the fly
Browse files Browse the repository at this point in the history
  • Loading branch information
a.guzev committed Nov 19, 2019
1 parent d2f7e68 commit ca674d1
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 79 deletions.
10 changes: 5 additions & 5 deletions cmd/tx-storm/feedback.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (f *feedback) Start() <-chan big.Int {
f.work.Add(1)
go f.background(blocks)

f.Log.Info("started")
f.Log.Info("Started")
return blocks
}

Expand All @@ -61,7 +61,7 @@ func (f *feedback) Stop() {
f.work.Wait()
f.done = nil

f.Log.Info("stopped")
f.Log.Info("Stopped")
}

func (f *feedback) background(blocks chan<- big.Int) {
Expand Down Expand Up @@ -89,7 +89,7 @@ func (f *feedback) background(blocks chan<- big.Int) {

client, err = ethclient.Dial(f.url)
if err != nil {
f.Log.Error("connect to", "url", f.url, "err", err)
f.Log.Error("Connect to", "url", f.url, "err", err)
client = nil
continue connecting
}
Expand All @@ -116,7 +116,7 @@ func (f *feedback) background(blocks chan<- big.Int) {
known.Sub(header.Number, big.NewInt(1))
}

f.Log.Info("header", "num", header.Number)
f.Log.Info("Header", "num", header.Number)

for ; header.Number.Cmp(known) > 0; known.Add(known, big.NewInt(1)) {
block, err := client.BlockByNumber(context.TODO(), known)
Expand All @@ -127,7 +127,7 @@ func (f *feedback) background(blocks chan<- big.Int) {

select {
case blocks <- *known:
f.Log.Info("block", "num", header.Number, "txs", block.Transactions())
f.Log.Info("Block", "num", header.Number, "txs", block.Transactions().Len())
case <-f.done:
return
}
Expand Down
12 changes: 7 additions & 5 deletions cmd/tx-storm/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ import (
"fmt"
"strconv"
"strings"
"time"

"gopkg.in/urfave/cli.v1"
)

var AccountsFlag = cli.IntFlag{
Name: "accs",
Usage: "total fake account count to use",
var PeriodFlag = cli.IntFlag{
Name: "period",
Usage: "seconds before reusing of account",
Value: 60,
}

func getAccCount(ctx *cli.Context) uint {
return uint(ctx.GlobalInt(AccountsFlag.Name))
func getPeriod(ctx *cli.Context) time.Duration {
return time.Second * time.Duration(ctx.GlobalInt(PeriodFlag.Name))
}

var TxnsRateFlag = cli.IntFlag{
Expand Down
13 changes: 5 additions & 8 deletions cmd/tx-storm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ var (
func init() {
// Flags.
flags = []cli.Flag{
AccountsFlag,
TxnsRateFlag,
NumberFlag,
PeriodFlag,
TxnsRateFlag,
utils.MetricsEnabledFlag,
MetricsPrometheusEndpointFlag,
VerbosityFlag,
Expand Down Expand Up @@ -77,13 +77,10 @@ func generatorMain(ctx *cli.Context) error {
url := args[0]
num, ofTotal := getNumber(ctx)
maxTxnsPerSec := getTxnsRate(ctx)
accs := getAccCount(ctx)

count := accs / ofTotal
accMin := count * num
accMax := accMin + count
period := getPeriod(ctx)

tt := newThreads(url, num, ofTotal, maxTxnsPerSec, accMin, accMax)
tt := newThreads(url, num, ofTotal, maxTxnsPerSec, period)
tt.SetName("Threads")
tt.Start()

waitForSignal()
Expand Down
18 changes: 9 additions & 9 deletions cmd/tx-storm/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *sender) Start(c <-chan *Transaction) {
s.work.Add(1)
go s.background()

s.Log.Info("started")
s.Log.Info("Started")
}

func (s *sender) Stop() {
Expand All @@ -62,7 +62,7 @@ func (s *sender) Stop() {
s.work.Wait()
s.done = nil

s.Log.Info("stopped")
s.Log.Info("Stopped")
}

func (s *sender) background() {
Expand All @@ -88,7 +88,7 @@ func (s *sender) background() {
client, err = ethclient.Dial(s.url)
if err != nil {
client = nil
s.Log.Error("connect to", "url", s.url, "err", err)
s.Log.Error("Connect to", "url", s.url, "err", err)
select {
case <-time.After(time.Second):
continue connecting
Expand All @@ -104,26 +104,26 @@ func (s *sender) background() {
err = client.SendTransaction(ctx, tx.Raw)
cancel()
if err == nil {
s.Log.Info("tx sending ok", "info", info, "amount", tx.Raw.Value(), "nonce", tx.Raw.Nonce())
s.Log.Info("Tx sending ok", "info", info, "amount", tx.Raw.Value(), "nonce", tx.Raw.Nonce())
txCountSentMeter.Inc(1)
break sending
}

switch err.Error() {
case fmt.Sprintf("known transaction: %x", tx.Raw.Hash()):
s.Log.Info("tx sending skip", "info", info, "amount", tx.Raw.Value(), "cause", err, "nonce", tx.Raw.Nonce())
s.Log.Info("Tx sending skip", "info", info, "amount", tx.Raw.Value(), "cause", err, "nonce", tx.Raw.Nonce())
break sending
case evm_core.ErrNonceTooLow.Error():
s.Log.Info("tx sending skip", "info", info, "amount", tx.Raw.Value(), "cause", err, "nonce", tx.Raw.Nonce())
s.Log.Info("Tx sending skip", "info", info, "amount", tx.Raw.Value(), "cause", err, "nonce", tx.Raw.Nonce())
break sending
case evm_core.ErrReplaceUnderpriced.Error():
s.Log.Info("tx sending skip", "info", info, "amount", tx.Raw.Value(), "cause", err, "nonce", tx.Raw.Nonce())
s.Log.Info("Tx sending skip", "info", info, "amount", tx.Raw.Value(), "cause", err, "nonce", tx.Raw.Nonce())
break sending
default:
s.Log.Error("tx sending err", "info", info, "amount", tx.Raw.Value(), "cause", err, "nonce", tx.Raw.Nonce())
s.Log.Error("Tx sending err", "info", info, "amount", tx.Raw.Value(), "cause", err, "nonce", tx.Raw.Nonce())
select {
case <-s.blocks:
s.Log.Error("try to send tx again", "info", info, "amount", tx.Raw.Value(), "nonce", tx.Raw.Nonce())
s.Log.Error("Try to send tx again", "info", info, "amount", tx.Raw.Value(), "nonce", tx.Raw.Nonce())
case <-s.done:
return
}
Expand Down
29 changes: 18 additions & 11 deletions cmd/tx-storm/threads.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newThreads(
nodeUrl string,
num, ofTotal uint,
maxTxnsPerSec uint,
accMin, accMax uint,
period time.Duration,
) *threads {
if num >= ofTotal {
panic("num is a generator number of total generators count")
Expand All @@ -43,17 +43,18 @@ func newThreads(

Instance: logger.MakeInstance(),
}
tt.SetName("Threads")

tt.maxTxnsPerSec = maxTxnsPerSec / ofTotal
accs := accMax - accMin
accsOnThread := accs / uint(count)
accs := tt.maxTxnsPerSec * uint(period.Milliseconds()/1000)
accsOnThread := approximate(accs / uint(count))
accs = accsOnThread * uint(count)
offset := accs * (num + 1)
tt.Log.Info("Will create", "accounts", accs, "from", offset, "to", accs+offset)

donor := num
for i := range tt.generators {
min := accMin + uint(i)*accsOnThread
max := min + accsOnThread
tt.generators[i] = newTxGenerator(min, max)
tt.generators[i].SetName(fmt.Sprintf("Generator-%d-%d", min, max))
tt.generators[i] = newTxGenerator(donor, uint(i), accsOnThread, offset)
tt.generators[i].SetName(fmt.Sprintf("Generator-%d", i))
}

for i := range tt.senders {
Expand All @@ -77,7 +78,7 @@ func (tt *threads) Start() {

destinations := make([]chan<- *Transaction, len(tt.senders))
for i, s := range tt.senders {
destination := make(chan *Transaction, X)
destination := make(chan *Transaction, multiplicator)
destinations[i] = destination
s.Start(destination)
}
Expand All @@ -86,6 +87,12 @@ func (tt *threads) Start() {
tt.work.Add(1)
go tt.txTransfer(source, destinations)

for i, t := range tt.generators {
// first transactions from donor: one by one
tx, _ := t.Yield(uint(i))
destinations[0] <- tx
}

for _, t := range tt.generators {
t.Start(source)
}
Expand All @@ -94,7 +101,7 @@ func (tt *threads) Start() {
tt.work.Add(1)
go tt.blockNotify(blocks, tt.senders)

tt.Log.Info("started")
tt.Log.Info("Started")
}

func (tt *threads) Stop() {
Expand Down Expand Up @@ -131,7 +138,7 @@ func (tt *threads) Stop() {
tt.work.Wait()
tt.done = nil

tt.Log.Info("stopped")
tt.Log.Info("Stopped")
}

func (tt *threads) blockNotify(blocks <-chan big.Int, senders []*sender) {
Expand Down
Loading

0 comments on commit ca674d1

Please sign in to comment.