From db32c5bfeb563e7ce6661b37d6a55688cbeb4a20 Mon Sep 17 00:00:00 2001 From: Sergey Matyukevich Date: Tue, 11 Jul 2023 19:02:15 +0200 Subject: [PATCH] Fix preloader mode in benchmarks (#6359) --- benchmark/benchmain/main.go | 88 +++++++++++++++++++++++-------------- benchmark/benchmark.go | 58 +++++++++++++++++++++--- 2 files changed, 106 insertions(+), 40 deletions(-) diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index 1366c18c972..f4b96a5d460 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -53,6 +53,7 @@ import ( "reflect" "runtime" "runtime/pprof" + "strconv" "strings" "sync" "sync/atomic" @@ -81,7 +82,8 @@ var ( traceMode = flags.StringWithAllowedValues("trace", toggleModeOff, fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff, - fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) + fmt.Sprintf("Preloader mode - One of: %v, preloader works only in streaming and unconstrained modes and will be ignored in unary mode", + strings.Join(allToggleModes, ", ")), allToggleModes) channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff, fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) compressorMode = flags.StringWithAllowedValues("compression", compModeOff, @@ -401,20 +403,11 @@ func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) { } func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) { - clients, cleanup := makeClients(bf) + streams, req, cleanup := setupStream(bf, false) - streams := make([][]testgrpc.BenchmarkService_StreamingCallClient, bf.Connections) - for cn := 0; cn < bf.Connections; cn++ { - tc := clients[cn] - streams[cn] = make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls) - for pos := 0; pos < bf.MaxConcurrentCalls; pos++ { - - stream, err := tc.StreamingCall(context.Background()) - if err != nil { - logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) - } - streams[cn][pos] = stream - } + var preparedMsg [][]*grpc.PreparedMsg + if bf.EnablePreloader { + preparedMsg = prepareMessages(streams, req) } return func(cn, pos int) { @@ -426,24 +419,25 @@ func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) { if bf.RespPayloadCurve != nil { respSizeBytes = bf.RespPayloadCurve.ChooseRandom() } - streamCaller(streams[cn][pos], reqSizeBytes, respSizeBytes) + var req interface{} + if bf.EnablePreloader { + req = preparedMsg[cn][pos] + } else { + pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, reqSizeBytes) + req = &testpb.SimpleRequest{ + ResponseType: pl.Type, + ResponseSize: int32(respSizeBytes), + Payload: pl, + } + } + streamCaller(streams[cn][pos], req) }, cleanup } func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) { - streams, req, cleanup := setupUnconstrainedStream(bf) + streams, req, cleanup := setupStream(bf, true) - preparedMsg := make([][]*grpc.PreparedMsg, len(streams)) - for cn, connStreams := range streams { - preparedMsg[cn] = make([]*grpc.PreparedMsg, len(connStreams)) - for pos, stream := range connStreams { - preparedMsg[cn][pos] = &grpc.PreparedMsg{} - err := preparedMsg[cn][pos].Encode(stream, req) - if err != nil { - logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[cn][pos], req, stream, err) - } - } - } + preparedMsg := prepareMessages(streams, req) return func(cn, pos int) { streams[cn][pos].SendMsg(preparedMsg[cn][pos]) @@ -453,7 +447,7 @@ func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRe } func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) { - streams, req, cleanup := setupUnconstrainedStream(bf) + streams, req, cleanup := setupStream(bf, true) return func(cn, pos int) { streams[cn][pos].Send(req) @@ -462,13 +456,19 @@ func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, r }, cleanup } -func setupUnconstrainedStream(bf stats.Features) ([][]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) { +func setupStream(bf stats.Features, unconstrained bool) ([][]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) { clients, cleanup := makeClients(bf) streams := make([][]testgrpc.BenchmarkService_StreamingCallClient, bf.Connections) - md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1", - benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String()) - ctx := metadata.NewOutgoingContext(context.Background(), md) + ctx := context.Background() + if unconstrained { + md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1", benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String()) + ctx = metadata.NewOutgoingContext(ctx, md) + } + if bf.EnablePreloader { + md := metadata.Pairs(benchmark.PreloadMsgSizeHeader, strconv.Itoa(bf.RespSizeBytes), benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String()) + ctx = metadata.NewOutgoingContext(ctx, md) + } for cn := 0; cn < bf.Connections; cn++ { tc := clients[cn] streams[cn] = make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls) @@ -491,6 +491,20 @@ func setupUnconstrainedStream(bf stats.Features) ([][]testgrpc.BenchmarkService_ return streams, req, cleanup } +func prepareMessages(streams [][]testgrpc.BenchmarkService_StreamingCallClient, req *testpb.SimpleRequest) [][]*grpc.PreparedMsg { + preparedMsg := make([][]*grpc.PreparedMsg, len(streams)) + for cn, connStreams := range streams { + preparedMsg[cn] = make([]*grpc.PreparedMsg, len(connStreams)) + for pos, stream := range connStreams { + preparedMsg[cn][pos] = &grpc.PreparedMsg{} + if err := preparedMsg[cn][pos].Encode(stream, req); err != nil { + logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[cn][pos], req, stream, err) + } + } + } + return preparedMsg +} + // Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and // request and response sizes. func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int) { @@ -499,8 +513,8 @@ func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int) } } -func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) { - if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil { +func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, req interface{}) { + if err := bm.DoStreamingRoundTripPreloaded(stream, req); err != nil { logger.Fatalf("DoStreamingRoundTrip failed: %v", err) } } @@ -790,6 +804,9 @@ func processFlags() *benchOpts { if len(opts.features.reqSizeBytes) != 0 { log.Fatalf("you may not specify -reqPayloadCurveFiles and -reqSizeBytes at the same time") } + if len(opts.features.enablePreloader) != 0 { + log.Fatalf("you may not specify -reqPayloadCurveFiles and -preloader at the same time") + } for _, file := range *reqPayloadCurveFiles { pc, err := stats.NewPayloadCurve(file) if err != nil { @@ -807,6 +824,9 @@ func processFlags() *benchOpts { if len(opts.features.respSizeBytes) != 0 { log.Fatalf("you may not specify -respPayloadCurveFiles and -respSizeBytes at the same time") } + if len(opts.features.enablePreloader) != 0 { + log.Fatalf("you may not specify -respPayloadCurveFiles and -preloader at the same time") + } for _, file := range *respPayloadCurveFiles { pc, err := stats.NewPayloadCurve(file) if err != nil { diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index 2e11167004d..27101954aa3 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -28,6 +28,7 @@ import ( "log" "math/rand" "net" + "strconv" "time" "google.golang.org/grpc" @@ -83,13 +84,35 @@ const UnconstrainedStreamingHeader = "unconstrained-streaming" // the server should sleep between consecutive RPC responses. const UnconstrainedStreamingDelayHeader = "unconstrained-streaming-delay" +// PreloadMsgSizeHeader indicates that the client is going to ask for +// a fixed response size and passes this size to the server. +// The server is expected to preload the response on startup. +const PreloadMsgSizeHeader = "preload-msg-size" + func (s *testServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error { + preloadMsgSize := 0 + if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[PreloadMsgSizeHeader]) != 0 { + val := md[PreloadMsgSizeHeader][0] + var err error + preloadMsgSize, err = strconv.Atoi(val) + if err != nil { + return fmt.Errorf("%q header value is not an integer: %s", PreloadMsgSizeHeader, err) + } + } + if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[UnconstrainedStreamingHeader]) != 0 { - return s.UnconstrainedStreamingCall(stream) + return s.UnconstrainedStreamingCall(stream, preloadMsgSize) } response := &testpb.SimpleResponse{ Payload: new(testpb.Payload), } + preloadedResponse := &grpc.PreparedMsg{} + if preloadMsgSize > 0 { + setPayload(response.Payload, testpb.PayloadType_COMPRESSABLE, preloadMsgSize) + if err := preloadedResponse.Encode(stream, response); err != nil { + return err + } + } in := new(testpb.SimpleRequest) for { // use ServerStream directly to reuse the same testpb.SimpleRequest object @@ -101,14 +124,19 @@ func (s *testServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCal if err != nil { return err } - setPayload(response.Payload, in.ResponseType, int(in.ResponseSize)) - if err := stream.Send(response); err != nil { + if preloadMsgSize > 0 { + err = stream.SendMsg(preloadedResponse) + } else { + setPayload(response.Payload, in.ResponseType, int(in.ResponseSize)) + err = stream.Send(response) + } + if err != nil { return err } } } -func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error { +func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer, preloadMsgSize int) error { maxSleep := 0 if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[UnconstrainedStreamingDelayHeader]) != 0 { val := md[UnconstrainedStreamingDelayHeader][0] @@ -135,6 +163,13 @@ func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService } setPayload(response.Payload, in.ResponseType, int(in.ResponseSize)) + preloadedResponse := &grpc.PreparedMsg{} + if preloadMsgSize > 0 { + if err := preloadedResponse.Encode(stream, response); err != nil { + return err + } + } + go func() { for { // Using RecvMsg rather than Recv to prevent reallocation of SimpleRequest. @@ -154,7 +189,12 @@ func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService if maxSleep > 0 { time.Sleep(time.Duration(rand.Intn(maxSleep))) } - err := stream.Send(response) + var err error + if preloadMsgSize > 0 { + err = stream.SendMsg(preloadedResponse) + } else { + err = stream.Send(response) + } switch status.Code(err) { case codes.Unavailable, codes.Canceled: return @@ -258,7 +298,13 @@ func DoStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, ResponseSize: int32(respSize), Payload: pl, } - if err := stream.Send(req); err != nil { + return DoStreamingRoundTripPreloaded(stream, req) +} + +// DoStreamingRoundTripPreloaded performs a round trip for a single streaming rpc with preloaded payload. +func DoStreamingRoundTripPreloaded(stream testgrpc.BenchmarkService_StreamingCallClient, req interface{}) error { + // req could be either *testpb.SimpleRequest or *grpc.PreparedMsg + if err := stream.SendMsg(req); err != nil { return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want ", err) } if _, err := stream.Recv(); err != nil {