Skip to content

Commit

Permalink
perf(prompb): properly re-use slice pools
Browse files Browse the repository at this point in the history
Use same pool for all slices in a request. Reduces overall memory overhead.

```
cpu: AMD Ryzen 9 5950X 16-Core Processor
                         │   old.txt   │               new.txt               │
                         │   sec/op    │   sec/op     vs base                │
WriteRequestUnmarshal-32   166.8µ ± 1%   195.4µ ± 1%  +17.13% (p=0.000 n=15)

                         │  old.txt   │            new.txt             │
                         │    B/op    │    B/op     vs base            │
WriteRequestUnmarshal-32   0.000 ± 0%   0.000 ± 0%  ~ (p=1.000 n=15)

                         │  old.txt   │            new.txt             │
                         │ allocs/op  │ allocs/op   vs base            │
WriteRequestUnmarshal-32   0.000 ± 0%   0.000 ± 0%  ~ (p=1.000 n=15)
```
  • Loading branch information
tdakkota committed Mar 11, 2024
1 parent 7feb161 commit 826502b
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 72 deletions.
91 changes: 69 additions & 22 deletions internal/prompb/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,86 @@ package prompb
// Reset resets wr.
func (wr *WriteRequest) Reset() {
for i := range wr.Timeseries {
wr.Timeseries[i].Reset()
wr.Timeseries[i] = TimeSeries{}
}
wr.Timeseries = wr.Timeseries[:0]
wr.Pools.Reset()
}

// Reset resets [TimeSeries] fields.
func (ts *TimeSeries) Reset() {
ts.Labels = ts.Labels[:0]
ts.Samples = ts.Samples[:0]
type Pools struct {
Labels *slicepool[Label]
Samples *slicepool[Sample]

for i := range ts.Exemplars {
ts.Exemplars[i].Reset()
// [Exemplar] fields pools.
Exemplars *slicepool[Exemplar]
ExemplarLabels *slicepool[Label]

// [Histogram] fields pools.
Histograms *slicepool[Histogram]
// Negative spans.
HistogramNegativeSpans *slicepool[BucketSpan]
HistogramNegativeDeltas *slicepool[int64]
HistogramNegativeCounts *slicepool[float64]
// Positive spans.
HistogramPositiveSpans *slicepool[BucketSpan]
HistogramPositiveDeltas *slicepool[int64]
HistogramPositiveCounts *slicepool[float64]
}

func (p *Pools) Reset() {
if p == nil {
return
}
ts.Exemplars = ts.Exemplars[:0]
p.Labels.Reset()
p.Samples.Reset()

p.Exemplars.Reset()
p.ExemplarLabels.Reset()

p.Histograms.Reset()
p.HistogramNegativeSpans.Reset()
p.HistogramNegativeDeltas.Reset()
p.HistogramNegativeCounts.Reset()
p.HistogramPositiveSpans.Reset()
p.HistogramPositiveDeltas.Reset()
p.HistogramPositiveCounts.Reset()
}

for i := range ts.Histograms {
ts.Histograms[i].Reset()
type slicepool[T any] struct {
pool []T
offset int
}

func (p *slicepool[T]) Reset() {
var zero T
for i := range p.pool {
p.pool[i] = zero
}
ts.Histograms = ts.Histograms[:0]
p.pool = p.pool[:0]
p.offset = 0
}

// Reset resets [Exemplar] fields.
func (e *Exemplar) Reset() {
e.Labels = e.Labels[:0]
func (p *slicepool[T]) Push(v T) {
p.pool = append(p.pool, v)
}

// Reset resets [Histogram] fields.
func (h *Histogram) Reset() {
h.NegativeSpans = h.NegativeSpans[:0]
h.NegativeDeltas = h.NegativeDeltas[:0]
h.NegativeCounts = h.NegativeCounts[:0]
func (p *slicepool[T]) GetNext() *T {
if len(p.pool)+1 < cap(p.pool) {
// Re-use existing item.
p.pool = p.pool[:len(p.pool)+1]
} else {
// Allocate a new one.
var zero T
p.pool = append(p.pool, zero)
}
return &p.pool[len(p.pool)-1]
}

h.PositiveSpans = h.PositiveSpans[:0]
h.PositiveDeltas = h.PositiveDeltas[:0]
h.PositiveCounts = h.PositiveCounts[:0]
func (p *slicepool[T]) Cut() []T {
cut := p.pool[p.offset:len(p.pool):len(p.pool)]
if len(cut) == 0 {
return nil
}
p.offset = len(p.pool)
return cut
}
33 changes: 21 additions & 12 deletions internal/prompb/remote.pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,28 @@ import (
// WriteRequest represents Prometheus remote write API request
type WriteRequest struct {
Timeseries []TimeSeries
Pools *Pools
}

// Unmarshal unmarshals WriteRequest from src.
func (req *WriteRequest) Unmarshal(src []byte) (err error) {
var (
fc easyproto.FieldContext
tss = req.Timeseries
)
if req.Pools == nil {
req.Pools = &Pools{
Labels: new(slicepool[Label]),
Samples: new(slicepool[Sample]),
Exemplars: new(slicepool[Exemplar]),
ExemplarLabels: new(slicepool[Label]),
Histograms: new(slicepool[Histogram]),
HistogramNegativeSpans: new(slicepool[BucketSpan]),
HistogramNegativeDeltas: new(slicepool[int64]),
HistogramNegativeCounts: new(slicepool[float64]),
HistogramPositiveSpans: new(slicepool[BucketSpan]),
HistogramPositiveDeltas: new(slicepool[int64]),
HistogramPositiveCounts: new(slicepool[float64]),
}
}

var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
if err != nil {
Expand All @@ -27,17 +41,12 @@ func (req *WriteRequest) Unmarshal(src []byte) (err error) {
if !ok {
return errors.New("read timeseries data")
}
if len(tss)+1 < cap(tss) {
tss = tss[:len(tss)+1]
} else {
tss = append(tss, TimeSeries{})
}
ts := &tss[len(tss)-1]
if err := ts.Unmarshal(data); err != nil {
var ts TimeSeries
if err := ts.Unmarshal(req.Pools, data); err != nil {
return errors.Wrapf(err, "read timeseries (field %d)", fc.FieldNum)
}
req.Timeseries = append(req.Timeseries, ts)
}
}
req.Timeseries = tss
return nil
}
Loading

0 comments on commit 826502b

Please sign in to comment.