Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: eth call load test flag #96

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions cmd/loadtest/loadtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package loadtest

import (
"bufio"
"bytes"
"context"
"crypto/ecdsa"
"encoding/hex"
Expand Down Expand Up @@ -55,6 +57,7 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
ethrpc "github.com/ethereum/go-ethereum/rpc"

_ "embed"
"github.com/maticnetwork/polygon-cli/contracts"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -188,6 +191,9 @@ var LoadtestCmd = &cobra.Command{
},
}

//go:embed payloads.txt
var payloadData []byte

type (
blockSummary struct {
Block *rpctypes.RawBlockResponse
Expand Down Expand Up @@ -229,6 +235,7 @@ type (
ByteCount *uint64
Seed *int64
IsAvail *bool
RPCLoadTest *bool
LtAddress *string
DelAddress *string
ContractCallNumberOfBlocksToWaitFor *uint64
Expand Down Expand Up @@ -256,6 +263,17 @@ type (
Pending any `json:"pending"`
Queued any `json:"queued"`
}

RPCRequest struct {
ID interface{} `json:"id"`
Method string `json:"method"`
Params []interface{} `json:"params"`
}

RPCResponse struct {
ID interface{} `json:"id"`
Result interface{} `json:"result"`
}
)

func init() {
Expand Down Expand Up @@ -295,6 +313,7 @@ r - random modes
ltp.ByteCount = LoadtestCmd.PersistentFlags().Uint64P("byte-count", "b", 1024, "If we're in store mode, this controls how many bytes we'll try to store in our contract")
ltp.Seed = LoadtestCmd.PersistentFlags().Int64("seed", 123456, "A seed for generating random values and addresses")
ltp.IsAvail = LoadtestCmd.PersistentFlags().Bool("data-avail", false, "Is this a test of avail rather than an EVM / Geth Chain")
ltp.RPCLoadTest = LoadtestCmd.PersistentFlags().Bool("rpc-load-test", false, "Perform an RPC style load test (no tx submission) on a specified node")
ltp.LtAddress = LoadtestCmd.PersistentFlags().String("lt-address", "", "A pre-deployed load test contract address")
ltp.DelAddress = LoadtestCmd.PersistentFlags().String("del-address", "", "A pre-deployed delegator contract address")
ltp.ContractCallNumberOfBlocksToWaitFor = LoadtestCmd.PersistentFlags().Uint64("contract-call-nb-blocks-to-wait-for", 30, "The number of blocks to wait for before giving up on a contract call")
Expand Down Expand Up @@ -426,6 +445,8 @@ func runLoadTest(ctx context.Context) error {
return availLoop(ctx, api)
}

} else if *inputLoadTestParams.RPCLoadTest {
return rpcLoadTestLoop(ctx, rpc)
} else {
log.Info().Msg("Starting Load Test")
loopFunc = func() error {
Expand Down Expand Up @@ -475,6 +496,121 @@ func runLoadTest(ctx context.Context) error {
return nil
}

func readPayloadsFromEmbeddedData(data []byte) ([]string, error) {
payloads := make([]string, 0)

scanner := bufio.NewScanner(bytes.NewReader(data))
for scanner.Scan() {
payloads = append(payloads, scanner.Text())
}

if err := scanner.Err(); err != nil {
return nil, err
}

return payloads, nil
}

func rpcLoadTestLoop(ctx context.Context, rpc *ethrpc.Client) error {
ltp := inputLoadTestParams
requestCount := int(*ltp.Requests)

// Keep track of key stats
successfulRespCount := 0
invalidJSONCount := 0
archiveNodeRequiredCount := 0
miscErrorCount := 0
reqTimeoutCount := 0

// Create wg that waits for all reqs to finish
var wg sync.WaitGroup
wg.Add(requestCount)

// Create channel that collects all responses
responseChan := make(chan RPCResponse)

// Create a semaphore to limit the number of concurrent goroutines
sem := make(chan struct{}, int(*ltp.Concurrency))

// Start goroutine that collects responses from channel
go func() {
for range responseChan {
successfulRespCount++
wg.Done()
}
}()

// Read the payloads from the file
payloads, err := readPayloadsFromEmbeddedData(payloadData)
if err != nil {
log.Fatal().Err(err).Msg("Unable to read sample payloads file")
}

startTime := time.Now()
// Send multiple concurrent requests
for i := 0; i < requestCount; i++ {
sem <- struct{}{} // Acquire a slot from the semaphore, limiting the concurrency
go func(index int) {
defer func() {
<-sem // Release the slot when the goroutine is finished
}()

// Create a new context with a timeout for each request
ctxWithTimeout, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

jsonPayload := payloads[index%len(payloads)]
sanitizedPayload := jsonPayload[strings.Index(jsonPayload, `{"`):]

// Define and replace the regular expression pattern
pattern := `{"blockHash":"[^"]+"}`
regex := regexp.MustCompile(pattern)
sanitizedPayload = regex.ReplaceAllString(sanitizedPayload, `"latest"`)

// Unmarshal the JSON payload into the RPCRequest struct
var rpcRequest RPCRequest
unmarshalErr := json.Unmarshal([]byte(sanitizedPayload), &rpcRequest)
if unmarshalErr != nil {
invalidJSONCount++
wg.Done()
return
}

// Send RPC request
var result interface{}
err := rpc.CallContext(ctxWithTimeout, &result, rpcRequest.Method, rpcRequest.Params...)
if err != nil {
errMsg := err.Error()
if strings.Contains(errMsg, "missing trie node") {
archiveNodeRequiredCount++
} else if strings.Contains(errMsg, "EOF") || strings.Contains(errMsg, "context deadline exceeded") {
reqTimeoutCount++
} else {
miscErrorCount++
}
wg.Done()
return
}

// Send the response to the channel
responseChan <- RPCResponse{
ID: rpcRequest.ID,
Result: result,
}
}(i)
}

// Wait for all requests to finish
wg.Wait()
close(responseChan)

elapsedTime := time.Since(startTime)
fmt.Printf("Completed %d requests in %s\n", requestCount, elapsedTime)
fmt.Printf("Requests per second: %.2f\n", float64(successfulRespCount+miscErrorCount)/elapsedTime.Seconds())
fmt.Printf("Successes: %d, Invalid Req: %d, Archive Node Required: %d, Misc Errors: %d, Req Timeouts: %d", successfulRespCount, invalidJSONCount, archiveNodeRequiredCount, miscErrorCount, reqTimeoutCount)
return nil
}

func printResults(lts []loadTestSample) {
if len(lts) == 0 {
log.Error().Msg("No results recorded")
Expand Down
Loading