Skip to content

Commit

Permalink
Use RunParallel for benchmarks (#139)
Browse files Browse the repository at this point in the history
Simplifies the benchmarking to 4 cases, baseline and with interceptors
for both unary and streaming cases. Switched to `RunParallel` to measure
overhead per RPC request rather than grouped by the concurrency setting.
Reduced streaming to a single round trip to better compare against unary
flows:
```
pkg: connectrpc.com/otelconnect
BenchmarkStreamingBase-8                   19152             58334 ns/op          534279 B/op        204 allocs/op
BenchmarkStreamingWithInterceptor-8        19467             62326 ns/op          546378 B/op        302 allocs/op
BenchmarkUnaryBase-8                       41461             27704 ns/op           14514 B/op        156 allocs/op
BenchmarkUnaryWithInterceptor-8            37642             30383 ns/op           25723 B/op        241 allocs/op
PASS
ok      connectrpc.com/otelconnect      8.435s
```

Previously:
```
goos: darwin
goarch: arm64
pkg: connectrpc.com/otelconnect
BenchmarkStreamingServerNoOptions-8                  693           1491135 ns/op         2763098 B/op       1779 allocs/op
BenchmarkStreamingServerClientOption-8               639           1732927 ns/op         3163066 B/op       4751 allocs/op
BenchmarkStreamingServerOption-8                     646           1579154 ns/op         2981825 B/op       3525 allocs/op
BenchmarkStreamingClientOption-8                     688           1515430 ns/op         2918297 B/op       3002 allocs/op
BenchmarkUnaryOtel-8                                5924            198928 ns/op          129339 B/op       1216 allocs/op
2023/10/26 13:37:10 http: TLS handshake error from 127.0.0.1:60015: read tcp 127.0.0.1:60013->127.0.0.1:60015: use of closed network connection
BenchmarkUnary-8                                    6601            183334 ns/op           73278 B/op        791 allocs/op
PASS
ok      connectrpc.com/otelconnect      10.268s
```
  • Loading branch information
emcfarlane committed Nov 1, 2023
1 parent a00063c commit 42fc67a
Showing 1 changed file with 33 additions and 59 deletions.
92 changes: 33 additions & 59 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,94 +18,68 @@ import (
"context"
"net/http"
"net/http/httptest"
"sync"
"testing"

connect "connectrpc.com/connect"
pingv1 "connectrpc.com/otelconnect/internal/gen/observability/ping/v1"
"connectrpc.com/otelconnect/internal/gen/observability/ping/v1/pingv1connect"
)

const (
concurrency = 5
messagesToSend = 10
)

func BenchmarkStreamingServerNoOptions(b *testing.B) {
testStreaming(b, nil, nil)
}

func BenchmarkStreamingServerClientOption(b *testing.B) {
testStreaming(b, []connect.HandlerOption{connect.WithInterceptors(NewInterceptor())}, []connect.ClientOption{connect.WithInterceptors(NewInterceptor())})
}

func BenchmarkStreamingServerOption(b *testing.B) {
testStreaming(b, []connect.HandlerOption{connect.WithInterceptors(NewInterceptor())}, []connect.ClientOption{})
func BenchmarkStreamingBase(b *testing.B) {
benchStreaming(b, nil, nil)
}

func BenchmarkStreamingClientOption(b *testing.B) {
testStreaming(b, []connect.HandlerOption{}, []connect.ClientOption{connect.WithInterceptors(NewInterceptor())})
func BenchmarkStreamingWithInterceptor(b *testing.B) {
benchStreaming(b,
[]connect.HandlerOption{connect.WithInterceptors(NewInterceptor())},
[]connect.ClientOption{connect.WithInterceptors(NewInterceptor())},
)
}

func BenchmarkUnaryOtel(b *testing.B) {
benchUnary(b, []connect.HandlerOption{connect.WithInterceptors(NewInterceptor())}, []connect.ClientOption{connect.WithInterceptors(NewInterceptor())})
func BenchmarkUnaryBase(b *testing.B) {
benchUnary(b, nil, nil)
}

func BenchmarkUnary(b *testing.B) {
benchUnary(b, nil, nil)
func BenchmarkUnaryWithInterceptor(b *testing.B) {
benchUnary(b,
[]connect.HandlerOption{connect.WithInterceptors(NewInterceptor())},
[]connect.ClientOption{connect.WithInterceptors(NewInterceptor())},
)
}

func benchUnary(b *testing.B, handleropts []connect.HandlerOption, clientopts []connect.ClientOption) {
b.Helper()
svr, client := startBenchServer(handleropts, clientopts)
b.Cleanup(svr.Close)
b.ResetTimer()
for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
for j := 0; j < concurrency; j++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := client.Ping(context.Background(), &connect.Request[pingv1.PingRequest]{
Msg: &pingv1.PingRequest{Data: []byte("Sentence")},
})
if err != nil {
b.Log(err)
}
}()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := client.Ping(context.Background(), &connect.Request[pingv1.PingRequest]{
Msg: &pingv1.PingRequest{Data: []byte("Sentence")},
})
if err != nil {
b.Log(err)
}
}
wg.Wait()
}
})
}

func testStreaming(b *testing.B, handleropts []connect.HandlerOption, clientopts []connect.ClientOption) {
func benchStreaming(b *testing.B, handleropts []connect.HandlerOption, clientopts []connect.ClientOption) {
b.Helper()
_, client := startBenchServer(handleropts, clientopts)
req := &pingv1.CumSumRequest{Number: 12}
b.ResetTimer()
for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
for j := 0; j < concurrency; j++ {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
stream := client.CumSum(context.Background())
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < messagesToSend; j++ {
err := stream.Send(req)
if err != nil {
b.Error(err)
}
}
for j := 0; j < messagesToSend; j++ {
_, err := stream.Receive()
if err != nil {
b.Error(err)
}
}
}()
if err := stream.Send(req); err != nil {
b.Error(err)
}
if _, err := stream.Receive(); err != nil {
b.Error(err)
}
}
wg.Wait()
}
})
}

func startBenchServer(handleropts []connect.HandlerOption, clientopts []connect.ClientOption) (*httptest.Server, pingv1connect.PingServiceClient) {
Expand Down

0 comments on commit 42fc67a

Please sign in to comment.