Skip to content

Commit

Permalink
Revert "Reuse write request from distributor to Ingesters (#5193)"
Browse files Browse the repository at this point in the history
This reverts commit 64b6c2b.
  • Loading branch information
alanprot committed Apr 11, 2024
1 parent 258bbc0 commit 5cc092a
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 219 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Expand Up @@ -168,8 +168,25 @@
* [CHANGE] Distributor/Ingester: Log warn level on push requests when they have status code 4xx. Do not log if status is 429. #5103
* [CHANGE] Tracing: Use the default OTEL trace sampler when `-tracing.otel.exporter-type` is set to `awsxray`. #5141
* [CHANGE] Ingester partial error log line to debug level. #5192
<<<<<<< HEAD
* [CHANGE] Change HTTP status code from 503/422 to 499 if a request is canceled. #5220
* [CHANGE] Store gateways summary metrics have been converted to histograms `cortex_bucket_store_series_blocks_queried`, `cortex_bucket_store_series_data_fetched`, `cortex_bucket_store_series_data_size_touched_bytes`, `cortex_bucket_store_series_data_size_fetched_bytes`, `cortex_bucket_store_series_data_touched`, `cortex_bucket_store_series_result_series` #5239
=======
* [ENHANCEMENT] Update Go version to 1.19.3. #4988
* [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976
* [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005
* [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010
* [ENHANCEMENT] Let blocks_cleaner delete blocks concurrently(default 16 goroutines). #5028
* [ENHANCEMENT] Query Frontend/Query Scheduler: Increase upper bound to 60s for queue duration histogram metric. #5029
* [ENHANCEMENT] Query Frontend: Log Vertical sharding information when `query_stats_enabled` is enabled. #5037
* [ENHANCEMENT] Ingester: The metadata APIs should honour `querier.query-ingesters-within` when `querier.query-store-for-labels-enabled` is true. #5027
* [ENHANCEMENT] Query Frontend: Skip instant query roundtripper if sharding is not applicable. #5062
* [ENHANCEMENT] Push reduce one hash operation of Labels. #4945 #5114
* [ENHANCEMENT] Alertmanager: Added `-alertmanager.enabled-tenants` and `-alertmanager.disabled-tenants` to explicitly enable or disable alertmanager for specific tenants. #5116
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.17`. #5132
* [ENHANCEMENT] Add retry logic to S3 bucket client. #5135
* [ENHANCEMENT] Update Go version to 1.20.1. #5159
>>>>>>> parent of 64b6c2b73 (Reuse write request from distributor to Ingesters (#5193))
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000
Expand Down
61 changes: 0 additions & 61 deletions pkg/cortexpb/slicesPool.go

This file was deleted.

30 changes: 0 additions & 30 deletions pkg/cortexpb/slicesPool_test.go

This file was deleted.

36 changes: 0 additions & 36 deletions pkg/cortexpb/timeseries.go
Expand Up @@ -37,15 +37,6 @@ var (
}
},
}

writeRequestPool = sync.Pool{
New: func() interface{} {
return &PreallocWriteRequest{
WriteRequest: WriteRequest{},
}
},
}
bytePool = newSlicePool(20)
)

// PreallocConfig configures how structures will be preallocated to optimise
Expand All @@ -62,7 +53,6 @@ func (PreallocConfig) RegisterFlags(f *flag.FlagSet) {
// PreallocWriteRequest is a WriteRequest which preallocs slices on Unmarshal.
type PreallocWriteRequest struct {
WriteRequest
data *[]byte
}

// Unmarshal implements proto.Message.
Expand All @@ -82,32 +72,6 @@ func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error {
return p.TimeSeries.Unmarshal(dAtA)
}

func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) {
size := p.Size()
p.data = bytePool.getSlice(size)
dAtA = *p.data
n, err := p.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}

func ReuseWriteRequest(req *PreallocWriteRequest) {
if req.data != nil {
bytePool.reuseSlice(req.data)
req.data = nil
}
req.Source = 0
req.Metadata = nil
req.Timeseries = nil
writeRequestPool.Put(req)
}

func PreallocWriteRequestFromPool() *PreallocWriteRequest {
return writeRequestPool.Get().(*PreallocWriteRequest)
}

// LabelAdapter is a labels.Label that can be marshalled to/from protos.
type LabelAdapter labels.Label

Expand Down
66 changes: 0 additions & 66 deletions pkg/cortexpb/timeseries_test.go
@@ -1,10 +1,8 @@
package cortexpb

import (
"fmt"
"testing"

"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -66,67 +64,3 @@ func TestTimeseriesFromPool(t *testing.T) {
assert.Len(t, reused.Samples, 0)
})
}

func BenchmarkMarshallWriteRequest(b *testing.B) {
ts := PreallocTimeseriesSliceFromPool()

for i := 0; i < 100; i++ {
ts = append(ts, PreallocTimeseries{TimeSeries: TimeseriesFromPool()})
ts[i].Labels = []LabelAdapter{
{Name: "foo", Value: "bar"},
{Name: "very long label name", Value: "very long label value"},
{Name: "very long label name 2", Value: "very long label value 2"},
{Name: "very long label name 3", Value: "very long label value 3"},
{Name: "int", Value: fmt.Sprint(i)},
}
ts[i].Samples = []Sample{{Value: 1, TimestampMs: 2}}
}

tests := []struct {
name string
writeRequestFactory func() proto.Marshaler
clean func(in interface{})
}{
{
name: "no-pool",
writeRequestFactory: func() proto.Marshaler {
return &WriteRequest{Timeseries: ts}
},
clean: func(in interface{}) {},
},
{
name: "byte pool",
writeRequestFactory: func() proto.Marshaler {
w := &PreallocWriteRequest{}
w.Timeseries = ts
return w
},
clean: func(in interface{}) {
ReuseWriteRequest(in.(*PreallocWriteRequest))
},
},
{
name: "byte and write pool",
writeRequestFactory: func() proto.Marshaler {
w := PreallocWriteRequestFromPool()
w.Timeseries = ts
return w
},
clean: func(in interface{}) {
ReuseWriteRequest(in.(*PreallocWriteRequest))
},
},
}

for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
w := tc.writeRequestFactory()
_, err := w.Marshal()
require.NoError(b, err)
tc.clean(w)
}
b.ReportAllocs()
})
}
}
18 changes: 6 additions & 12 deletions pkg/distributor/distributor.go
Expand Up @@ -892,20 +892,14 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
if err != nil {
return err
}
c := h.(ingester_client.HealthAndIngesterClient)
c := h.(ingester_client.IngesterClient)

req := cortexpb.PreallocWriteRequestFromPool()
req.Timeseries = timeseries
req.Metadata = metadata
req.Source = source

_, err = c.PushPreAlloc(ctx, req)

// We should not reuse the req in case of errors:
// See: https://github.com/grpc/grpc-go/issues/6355
if err == nil {
cortexpb.ReuseWriteRequest(req)
req := cortexpb.WriteRequest{
Timeseries: timeseries,
Metadata: metadata,
Source: source,
}
_, err = c.Push(ctx, &req)

if len(metadata) > 0 {
d.ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc()
Expand Down
16 changes: 2 additions & 14 deletions pkg/ingester/client/client.go
@@ -1,17 +1,15 @@
package client

import (
"context"
"flag"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util/grpcclient"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/cortexproject/cortex/pkg/util/grpcclient"
)

var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Expand All @@ -26,7 +24,6 @@ type HealthAndIngesterClient interface {
IngesterClient
grpc_health_v1.HealthClient
Close() error
PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error)
}

type closableHealthAndIngesterClient struct {
Expand All @@ -35,15 +32,6 @@ type closableHealthAndIngesterClient struct {
conn *grpc.ClientConn
}

func (c *closableHealthAndIngesterClient) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
out := new(cortexpb.WriteResponse)
err := c.conn.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

// MakeIngesterClient makes a new IngesterClient
func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error) {
dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(ingesterClientRequestDuration))
Expand Down

0 comments on commit 5cc092a

Please sign in to comment.