Skip to content

Commit

Permalink
Reuse write request from distributor to Ingesters (#5193)
Browse files Browse the repository at this point in the history
* wip

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* branchmark

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* fix some linting / test

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* No allocation

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* min pool size

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* min pool size

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* fuzzy test

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* changelog

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* more benchmark

Signed-off-by: Alan Protasio <alanprot@gmail.com>

* fix bug on the reuse

Signed-off-by: Alan Protasio <alanprot@gmail.com>

---------

Signed-off-by: Alan Protasio <alanprot@gmail.com>
  • Loading branch information
alanprot committed Mar 7, 2023
1 parent 713542c commit 64b6c2b
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -21,6 +21,7 @@
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.17`. #5132
* [ENHANCEMENT] Add retry logic to S3 bucket client. #5135
* [ENHANCEMENT] Update Go version to 1.20.1. #5159
* [ENHANCEMENT] Distributor: Reuse byte slices when serializing requests from distributors to ingesters. #5193
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000
Expand Down
61 changes: 61 additions & 0 deletions pkg/cortexpb/slicesPool.go
@@ -0,0 +1,61 @@
package cortexpb

import (
"math"
"sync"
)

const (
minPoolSizePower = 5
)

type byteSlicePools struct {
pools []sync.Pool
}

func newSlicePool(pools int) *byteSlicePools {
sp := byteSlicePools{}
sp.init(pools)
return &sp
}

func (sp *byteSlicePools) init(pools int) {
sp.pools = make([]sync.Pool, pools)
for i := 0; i < pools; i++ {
size := int(math.Pow(2, float64(i+minPoolSizePower)))
sp.pools[i] = sync.Pool{
New: func() interface{} {
buf := make([]byte, 0, size)
return &buf
},
}
}
}

func (sp *byteSlicePools) getSlice(size int) *[]byte {
index := int(math.Ceil(math.Log2(float64(size)))) - minPoolSizePower

if index >= len(sp.pools) {
buf := make([]byte, size)
return &buf
}

// if the size is < than the minPoolSizePower we return an array from the first pool
if index < 0 {
index = 0
}

s := sp.pools[index].Get().(*[]byte)
*s = (*s)[:size]
return s
}

func (sp *byteSlicePools) reuseSlice(s *[]byte) {
index := int(math.Floor(math.Log2(float64(cap(*s))))) - minPoolSizePower

if index >= len(sp.pools) || index < 0 {
return
}

sp.pools[index].Put(s)
}
30 changes: 30 additions & 0 deletions pkg/cortexpb/slicesPool_test.go
@@ -0,0 +1,30 @@
package cortexpb

import (
"math"
"math/rand"
"testing"

"github.com/stretchr/testify/assert"
)

func TestFuzzyByteSlicePools(t *testing.T) {
sut := newSlicePool(20)
maxByteSize := int(math.Pow(2, 20+minPoolSizePower-1))

for i := 0; i < 1000; i++ {
size := rand.Int() % maxByteSize
s := sut.getSlice(size)
assert.Equal(t, len(*s), size)
sut.reuseSlice(s)
}
}

func TestReturnSliceSmallerThanMin(t *testing.T) {
sut := newSlicePool(20)
size := 3
buff := make([]byte, 0, size)
sut.reuseSlice(&buff)
buff2 := sut.getSlice(size * 2)
assert.Equal(t, len(*buff2), size*2)
}
36 changes: 36 additions & 0 deletions pkg/cortexpb/timeseries.go
Expand Up @@ -37,6 +37,15 @@ var (
}
},
}

writeRequestPool = sync.Pool{
New: func() interface{} {
return &PreallocWriteRequest{
WriteRequest: WriteRequest{},
}
},
}
bytePool = newSlicePool(20)
)

// PreallocConfig configures how structures will be preallocated to optimise
Expand All @@ -53,6 +62,7 @@ func (PreallocConfig) RegisterFlags(f *flag.FlagSet) {
// PreallocWriteRequest is a WriteRequest which preallocs slices on Unmarshal.
type PreallocWriteRequest struct {
WriteRequest
data *[]byte
}

// Unmarshal implements proto.Message.
Expand All @@ -72,6 +82,32 @@ func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error {
return p.TimeSeries.Unmarshal(dAtA)
}

func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) {
size := p.Size()
p.data = bytePool.getSlice(size)
dAtA = *p.data
n, err := p.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}

func ReuseWriteRequest(req *PreallocWriteRequest) {
if req.data != nil {
bytePool.reuseSlice(req.data)
req.data = nil
}
req.Source = 0
req.Metadata = nil
req.Timeseries = nil
writeRequestPool.Put(req)
}

func PreallocWriteRequestFromPool() *PreallocWriteRequest {
return writeRequestPool.Get().(*PreallocWriteRequest)
}

// LabelAdapter is a labels.Label that can be marshalled to/from protos.
type LabelAdapter labels.Label

Expand Down
66 changes: 66 additions & 0 deletions pkg/cortexpb/timeseries_test.go
@@ -1,8 +1,10 @@
package cortexpb

import (
"fmt"
"testing"

"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -64,3 +66,67 @@ func TestTimeseriesFromPool(t *testing.T) {
assert.Len(t, reused.Samples, 0)
})
}

func BenchmarkMarshallWriteRequest(b *testing.B) {
ts := PreallocTimeseriesSliceFromPool()

for i := 0; i < 100; i++ {
ts = append(ts, PreallocTimeseries{TimeSeries: TimeseriesFromPool()})
ts[i].Labels = []LabelAdapter{
{Name: "foo", Value: "bar"},
{Name: "very long label name", Value: "very long label value"},
{Name: "very long label name 2", Value: "very long label value 2"},
{Name: "very long label name 3", Value: "very long label value 3"},
{Name: "int", Value: fmt.Sprint(i)},
}
ts[i].Samples = []Sample{{Value: 1, TimestampMs: 2}}
}

tests := []struct {
name string
writeRequestFactory func() proto.Marshaler
clean func(in interface{})
}{
{
name: "no-pool",
writeRequestFactory: func() proto.Marshaler {
return &WriteRequest{Timeseries: ts}
},
clean: func(in interface{}) {},
},
{
name: "byte pool",
writeRequestFactory: func() proto.Marshaler {
w := &PreallocWriteRequest{}
w.Timeseries = ts
return w
},
clean: func(in interface{}) {
ReuseWriteRequest(in.(*PreallocWriteRequest))
},
},
{
name: "byte and write pool",
writeRequestFactory: func() proto.Marshaler {
w := PreallocWriteRequestFromPool()
w.Timeseries = ts
return w
},
clean: func(in interface{}) {
ReuseWriteRequest(in.(*PreallocWriteRequest))
},
},
}

for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
w := tc.writeRequestFactory()
_, err := w.Marshal()
require.NoError(b, err)
tc.clean(w)
}
b.ReportAllocs()
})
}
}
15 changes: 8 additions & 7 deletions pkg/distributor/distributor.go
Expand Up @@ -835,14 +835,15 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
if err != nil {
return err
}
c := h.(ingester_client.IngesterClient)
c := h.(ingester_client.HealthAndIngesterClient)

req := cortexpb.WriteRequest{
Timeseries: timeseries,
Metadata: metadata,
Source: source,
}
_, err = c.Push(ctx, &req)
req := cortexpb.PreallocWriteRequestFromPool()
req.Timeseries = timeseries
req.Metadata = metadata
req.Source = source

_, err = c.PushPreAlloc(ctx, req)
cortexpb.ReuseWriteRequest(req)

if len(metadata) > 0 {
d.ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc()
Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/distributor_test.go
Expand Up @@ -2688,6 +2688,10 @@ func (i *mockIngester) Close() error {
return nil
}

func (i *mockIngester) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
return i.Push(ctx, &in.WriteRequest, opts...)
}

func (i *mockIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
i.Lock()
defer i.Unlock()
Expand Down
16 changes: 14 additions & 2 deletions pkg/ingester/client/client.go
@@ -1,15 +1,17 @@
package client

import (
"context"
"flag"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util/grpcclient"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/cortexproject/cortex/pkg/util/grpcclient"
)

var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Expand All @@ -24,6 +26,7 @@ type HealthAndIngesterClient interface {
IngesterClient
grpc_health_v1.HealthClient
Close() error
PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error)
}

type closableHealthAndIngesterClient struct {
Expand All @@ -32,6 +35,15 @@ type closableHealthAndIngesterClient struct {
conn *grpc.ClientConn
}

func (c *closableHealthAndIngesterClient) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
out := new(cortexpb.WriteResponse)
err := c.conn.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

// MakeIngesterClient makes a new IngesterClient
func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error) {
dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(ingesterClientRequestDuration))
Expand Down

0 comments on commit 64b6c2b

Please sign in to comment.