Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

benchmark: Add sleepBetweenRPCs and connections parameters #6299

Merged
merged 20 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5c9f7a4
Add sleepBetweenRPCs parameter to benchmarks
s-matyukevich May 18, 2023
e4c5b51
Use random distribution
s-matyukevich May 18, 2023
65ccca3
Remove delay from the client receive function and add it to the serve…
s-matyukevich May 19, 2023
b3d96ed
Add parameter to control the number of connections
s-matyukevich May 19, 2023
0014165
fix linter error
s-matyukevich May 19, 2023
88afecc
rename parameter
s-matyukevich May 19, 2023
57a049a
fix linter error
s-matyukevich May 19, 2023
2bf89d6
Sleep before sending the first request
s-matyukevich May 23, 2023
ff36d9e
Merge branch 'master' of github.com:grpc/grpc-go into add-sleep-to-be…
s-matyukevich May 23, 2023
2aee78f
configure the number of connections
s-matyukevich May 23, 2023
659b192
Merge branch 'master' of github.com:grpc/grpc-go into add-sleep-to-be…
s-matyukevich May 24, 2023
23c1278
Skip warmup if SleepBetweenRPCs > 0, use graceful stop on the server.
s-matyukevich May 24, 2023
3615d73
fix warmup condition
s-matyukevich May 24, 2023
5a8d7a7
Revert s.GracefulStop() change
s-matyukevich May 24, 2023
60ac14b
Allow codes.Canceled in unconstrained run
s-matyukevich May 24, 2023
907b01d
Fix unconstrained mode with preloader=on
s-matyukevich May 25, 2023
1905a66
Merge branch 'master' of github.com:grpc/grpc-go into add-sleep-to-be…
s-matyukevich May 25, 2023
accfb30
Merge branch 'master' of github.com:grpc/grpc-go into add-sleep-to-be…
s-matyukevich Jun 6, 2023
fa1f079
rename index parameter and import name
s-matyukevich Jun 6, 2023
7d0cd0d
add messing period
s-matyukevich Jun 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 134 additions & 83 deletions benchmark/benchmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"fmt"
"io"
"log"
"math/rand"
"net"
"os"
"reflect"
Expand Down Expand Up @@ -109,6 +110,8 @@ var (
clientWriteBufferSize = flags.IntSlice("clientWriteBufferSize", []int{-1}, "Configures the client write buffer size in bytes. If negative, use the default - may be a a comma-separated list")
serverReadBufferSize = flags.IntSlice("serverReadBufferSize", []int{-1}, "Configures the server read buffer size in bytes. If negative, use the default - may be a a comma-separated list")
serverWriteBufferSize = flags.IntSlice("serverWriteBufferSize", []int{-1}, "Configures the server write buffer size in bytes. If negative, use the default - may be a a comma-separated list")
sleepBetweenRPCs = flags.DurationSlice("sleepBetweenRPCs", []time.Duration{0}, "Configures the maximum amount of time the client should sleep between consecutive RPCs - may be a a comma-separated list")
connections = flag.Int("connections", 1, "The number of connections. Each connection will handle maxConcurrentCalls RPC streams")

logger = grpclog.Component("benchmark")
)
Expand Down Expand Up @@ -194,9 +197,9 @@ func runModesFromWorkloads(workload string) runModes {
type startFunc func(mode string, bf stats.Features)
type stopFunc func(count uint64)
type ucStopFunc func(req uint64, resp uint64)
type rpcCallFunc func(pos int)
type rpcSendFunc func(pos int)
type rpcRecvFunc func(pos int)
type rpcCallFunc func(cn, pos int)
type rpcSendFunc func(cn, pos int)
type rpcRecvFunc func(cn, pos int)
type rpcCleanupFunc func()

func unaryBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
Expand Down Expand Up @@ -233,40 +236,46 @@ func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Fea

bmEnd := time.Now().Add(bf.BenchTime + warmuptime)
var wg sync.WaitGroup
wg.Add(2 * bf.MaxConcurrentCalls)
for i := 0; i < bf.MaxConcurrentCalls; i++ {
go func(pos int) {
defer wg.Done()
for {
t := time.Now()
if t.After(bmEnd) {
return
wg.Add(2 * bf.Connections * bf.MaxConcurrentCalls)
maxSleep := int(bf.SleepBetweenRPCs)
for cn := 0; cn < bf.Connections; cn++ {
for pos := 0; pos < bf.MaxConcurrentCalls; pos++ {
go func(cn, pos int) {
defer wg.Done()
for {
if maxSleep > 0 {
time.Sleep(time.Duration(rand.Intn(maxSleep)))
}
t := time.Now()
if t.After(bmEnd) {
return
}
sender(cn, pos)
atomic.AddUint64(&req, 1)
}
sender(pos)
atomic.AddUint64(&req, 1)
}
}(i)
go func(pos int) {
defer wg.Done()
for {
t := time.Now()
if t.After(bmEnd) {
return
}(cn, pos)
go func(cn, pos int) {
defer wg.Done()
for {
t := time.Now()
if t.After(bmEnd) {
return
}
recver(cn, pos)
atomic.AddUint64(&resp, 1)
}
dfawley marked this conversation as resolved.
Show resolved Hide resolved
recver(pos)
atomic.AddUint64(&resp, 1)
}
}(i)
}(cn, pos)
}
}
wg.Wait()
stop(req, resp)
}

// makeClient returns a gRPC client for the grpc.testing.BenchmarkService
// makeClients returns a gRPC client (or multiple clients) for the grpc.testing.BenchmarkService
// service. The client is configured using the different options in the passed
// 'bf'. Also returns a cleanup function to close the client and release
// resources.
func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) {
func makeClients(bf stats.Features) ([]testpb.BenchmarkServiceClient, func()) {
nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU}
opts := []grpc.DialOption{}
sopts := []grpc.ServerOption{}
Expand Down Expand Up @@ -346,16 +355,24 @@ func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) {
}
lis = nw.Listener(lis)
stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
conn := bm.NewClientConn("" /* target not used */, opts...)
return testgrpc.NewBenchmarkServiceClient(conn), func() {
conn.Close()
conns := make([]*grpc.ClientConn, bf.Connections)
clients := make([]testpb.BenchmarkServiceClient, bf.Connections)
for cn := 0; cn < bf.Connections; cn++ {
conns[cn] = bm.NewClientConn("" /* target not used */, opts...)
clients[cn] = testgrpc.NewBenchmarkServiceClient(conns[cn])
}

return clients, func() {
for _, conn := range conns {
conn.Close()
}
stopper()
}
}

func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
tc, cleanup := makeClient(bf)
return func(int) {
clients, cleanup := makeClients(bf)
return func(cn, pos int) {
reqSizeBytes := bf.ReqSizeBytes
respSizeBytes := bf.RespSizeBytes
if bf.ReqPayloadCurve != nil {
Expand All @@ -364,23 +381,28 @@ func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
if bf.RespPayloadCurve != nil {
respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
}
unaryCaller(tc, reqSizeBytes, respSizeBytes)
unaryCaller(clients[cn], reqSizeBytes, respSizeBytes)
}, cleanup
}

func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
tc, cleanup := makeClient(bf)
clients, cleanup := makeClients(bf)

streams := make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
for i := 0; i < bf.MaxConcurrentCalls; i++ {
stream, err := tc.StreamingCall(context.Background())
if err != nil {
logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
streams := make([][]testgrpc.BenchmarkService_StreamingCallClient, bf.Connections)
for cn := 0; cn < bf.Connections; cn++ {
tc := clients[cn]
streams[cn] = make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
for pos := 0; pos < bf.MaxConcurrentCalls; pos++ {

stream, err := tc.StreamingCall(context.Background())
if err != nil {
logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
streams[cn][pos] = stream
}
streams[i] = stream
}

return func(pos int) {
return func(cn, pos int) {
reqSizeBytes := bf.ReqSizeBytes
respSizeBytes := bf.RespSizeBytes
if bf.ReqPayloadCurve != nil {
Expand All @@ -389,51 +411,59 @@ func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
if bf.RespPayloadCurve != nil {
respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
}
streamCaller(streams[pos], reqSizeBytes, respSizeBytes)
streamCaller(streams[cn][pos], reqSizeBytes, respSizeBytes)
}, cleanup
}

func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
streams, req, cleanup := setupUnconstrainedStream(bf)

preparedMsg := make([]*grpc.PreparedMsg, len(streams))
for i, stream := range streams {
preparedMsg[i] = &grpc.PreparedMsg{}
err := preparedMsg[i].Encode(stream, req)
if err != nil {
logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[i], req, stream, err)
preparedMsg := make([][]*grpc.PreparedMsg, len(streams))
for cn, connStreams := range streams {
preparedMsg[cn] = make([]*grpc.PreparedMsg, len(connStreams))
for pos, stream := range connStreams {
preparedMsg[cn][pos] = &grpc.PreparedMsg{}
err := preparedMsg[cn][pos].Encode(stream, req)
if err != nil {
logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[cn][pos], req, stream, err)
}
}
}

return func(pos int) {
streams[pos].SendMsg(preparedMsg[pos])
}, func(pos int) {
streams[pos].Recv()
return func(cn, pos int) {
streams[cn][pos].SendMsg(preparedMsg[cn][pos])
}, func(cn, pos int) {
streams[cn][pos].Recv()
}, cleanup
}

func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
streams, req, cleanup := setupUnconstrainedStream(bf)

return func(pos int) {
streams[pos].Send(req)
}, func(pos int) {
streams[pos].Recv()
return func(cn, pos int) {
streams[cn][pos].Send(req)
}, func(cn, pos int) {
streams[cn][pos].Recv()
}, cleanup
}

func setupUnconstrainedStream(bf stats.Features) ([]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
tc, cleanup := makeClient(bf)
func setupUnconstrainedStream(bf stats.Features) ([][]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
clients, cleanup := makeClients(bf)

streams := make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1")
streams := make([][]testgrpc.BenchmarkService_StreamingCallClient, bf.Connections)
md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1",
benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String())
ctx := metadata.NewOutgoingContext(context.Background(), md)
for i := 0; i < bf.MaxConcurrentCalls; i++ {
stream, err := tc.StreamingCall(ctx)
if err != nil {
logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
for cn := 0; cn < bf.Connections; cn++ {
tc := clients[cn]
streams[cn] = make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
for pos := 0; pos < bf.MaxConcurrentCalls; pos++ {
stream, err := tc.StreamingCall(ctx)
if err != nil {
logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
streams[cn][pos] = stream
}
streams[i] = stream
}

pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes)
Expand Down Expand Up @@ -461,32 +491,45 @@ func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize,
}

func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats, mode string) {
// Warm up connection.
for i := 0; i < warmupCallCount; i++ {
caller(0)
// if SleepBetweenRPCs > 0 we skip the warmup because otherwise
// we are going to send a set of simultaneous requests on every connection,
// which is something we are trying to avoid when using SleepBetweenRPCs.
if bf.SleepBetweenRPCs == 0 {
// Warm up connections.
for i := 0; i < warmupCallCount; i++ {
for cn := 0; cn < bf.Connections; cn++ {
caller(cn, 0)
}
}
}

// Run benchmark.
start(mode, bf)
var wg sync.WaitGroup
wg.Add(bf.MaxConcurrentCalls)
wg.Add(bf.Connections * bf.MaxConcurrentCalls)
bmEnd := time.Now().Add(bf.BenchTime)
maxSleep := int(bf.SleepBetweenRPCs)
var count uint64
for i := 0; i < bf.MaxConcurrentCalls; i++ {
go func(pos int) {
defer wg.Done()
for {
t := time.Now()
if t.After(bmEnd) {
return
for cn := 0; cn < bf.Connections; cn++ {
for pos := 0; pos < bf.MaxConcurrentCalls; pos++ {
go func(cn, pos int) {
defer wg.Done()
for {
if maxSleep > 0 {
time.Sleep(time.Duration(rand.Intn(maxSleep)))
}
t := time.Now()
if t.After(bmEnd) {
return
}
start := time.Now()
caller(cn, pos)
elapse := time.Since(start)
atomic.AddUint64(&count, 1)
s.AddDuration(elapse)
}
start := time.Now()
caller(pos)
elapse := time.Since(start)
atomic.AddUint64(&count, 1)
s.AddDuration(elapse)
}
}(i)
}(cn, pos)
}
}
wg.Wait()
stop(count)
Expand All @@ -504,6 +547,7 @@ type benchOpts struct {
benchmarkResultFile string
useBufconn bool
enableKeepalive bool
connections int
features *featureOpts
}

Expand All @@ -528,6 +572,7 @@ type featureOpts struct {
clientWriteBufferSize []int
serverReadBufferSize []int
serverWriteBufferSize []int
sleepBetweenRPCs []time.Duration
}

// makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each
Expand Down Expand Up @@ -572,6 +617,8 @@ func makeFeaturesNum(b *benchOpts) []int {
featuresNum[i] = len(b.features.serverReadBufferSize)
case stats.ServerWriteBufferSize:
featuresNum[i] = len(b.features.serverWriteBufferSize)
case stats.SleepBetweenRPCs:
featuresNum[i] = len(b.features.sleepBetweenRPCs)
default:
log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex)
}
Expand Down Expand Up @@ -625,6 +672,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
UseBufConn: b.useBufconn,
EnableKeepalive: b.enableKeepalive,
BenchTime: b.benchTime,
Connections: b.connections,
// These features can potentially change for each iteration.
EnableTrace: b.features.enableTrace[curPos[stats.EnableTraceIndex]],
Latency: b.features.readLatencies[curPos[stats.ReadLatenciesIndex]],
Expand All @@ -638,6 +686,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
ClientWriteBufferSize: b.features.clientWriteBufferSize[curPos[stats.ClientWriteBufferSize]],
ServerReadBufferSize: b.features.serverReadBufferSize[curPos[stats.ServerReadBufferSize]],
ServerWriteBufferSize: b.features.serverWriteBufferSize[curPos[stats.ServerWriteBufferSize]],
SleepBetweenRPCs: b.features.sleepBetweenRPCs[curPos[stats.SleepBetweenRPCs]],
}
if len(b.features.reqPayloadCurves) == 0 {
f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]]
Expand Down Expand Up @@ -693,6 +742,7 @@ func processFlags() *benchOpts {
benchmarkResultFile: *benchmarkResultFile,
useBufconn: *useBufconn,
enableKeepalive: *enableKeepalive,
connections: *connections,
features: &featureOpts{
enableTrace: setToggleMode(*traceMode),
readLatencies: append([]time.Duration(nil), *readLatency...),
Expand All @@ -708,6 +758,7 @@ func processFlags() *benchOpts {
clientWriteBufferSize: append([]int(nil), *clientWriteBufferSize...),
serverReadBufferSize: append([]int(nil), *serverReadBufferSize...),
serverWriteBufferSize: append([]int(nil), *serverWriteBufferSize...),
sleepBetweenRPCs: append([]time.Duration(nil), *sleepBetweenRPCs...),
},
}

Expand Down
Loading