Skip to content

Commit

Permalink
Merge pull request thanos-io#122 from Shopify/fix-histogram-compaction
Browse files Browse the repository at this point in the history
Handle notAppendable histogram case
  • Loading branch information
fpetkovski committed Mar 24, 2023
2 parents 33dfe2a + 6e7da13 commit b199846
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 22 deletions.
57 changes: 46 additions & 11 deletions internal/mimir-prometheus/storage/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"sort"

"github.com/prometheus/prometheus/model/histogram"

"github.com/thanos-io/thanos/internal/mimir-prometheus/model/labels"
"github.com/thanos-io/thanos/internal/mimir-prometheus/tsdb/chunkenc"
"github.com/thanos-io/thanos/internal/mimir-prometheus/tsdb/chunks"
"github.com/thanos-io/thanos/internal/mimir-prometheus/tsdb/tsdbutil"
"github.com/thanos-io/thanos/internal/mimir-prometheus/utils"
)

type SeriesEntry struct {
Expand Down Expand Up @@ -281,7 +283,7 @@ func NewSeriesToChunkEncoder(series Series) ChunkSeries {
func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
var (
chk chunkenc.Chunk
app chunkenc.Appender
app *utils.RecodingAppender
err error
)
mint := int64(math.MaxInt64)
Expand All @@ -299,21 +301,16 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
if typ != lastType || i >= seriesToChunkEncoderSplit {
// Create a new chunk if the sample type changed or too many samples in the current one.
if chk != nil {
chks = append(chks, chunks.Meta{
MinTime: mint,
MaxTime: maxt,
Chunk: chk,
})
}
chks = appendChunk(chks, mint, maxt, chk)
chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding())
if err != nil {
return errChunksIterator{err: err}
}
app, err = chk.Appender()
chkAppender, err := chk.Appender()
if err != nil {
return errChunksIterator{err: err}
}
app = utils.NewRecodingAppender(&chk, chkAppender)
mint = int64(math.MaxInt64)
// maxt is immediately overwritten below which is why setting it here won't make a difference.
i = 0
Expand All @@ -332,10 +329,37 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
app.Append(t, v)
case chunkenc.ValHistogram:
t, h = seriesIter.AtHistogram()
app.AppendHistogram(t, h)
ok := app.AppendHistogram(t, h)
if !ok {
chks = appendChunk(chks, mint, maxt, chk)
chk = chunkenc.NewHistogramChunk()
chkAppender, err := chk.Appender()
if err != nil {
return errChunksIterator{err: err}
}
mint = int64(math.MaxInt64)
i = 0
app = utils.NewRecodingAppender(&chk, chkAppender)
if ok := app.AppendHistogram(t, h); !ok {
panic("unexpected error while appending histogram")
}
}
case chunkenc.ValFloatHistogram:
t, fh = seriesIter.AtFloatHistogram()
app.AppendFloatHistogram(t, fh)
if ok := app.AppendFloatHistogram(t, fh); !ok {
chks = appendChunk(chks, mint, maxt, chk)
chk = chunkenc.NewFloatHistogramChunk()
chkAppender, err := chk.Appender()
if err != nil {
return errChunksIterator{err: err}
}
mint = int64(math.MaxInt64)
i = 0
app = utils.NewRecodingAppender(&chk, chkAppender)
if ok := app.AppendFloatHistogram(t, fh); !ok {
panic("unexpected error while appending histogram")
}
}
default:
return errChunksIterator{err: fmt.Errorf("unknown sample type %s", typ.String())}
}
Expand Down Expand Up @@ -365,6 +389,17 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
return NewListChunkSeriesIterator(chks...)
}

func appendChunk(chks []chunks.Meta, mint int64, maxt int64, chk chunkenc.Chunk) []chunks.Meta {
if chk != nil {
chks = append(chks, chunks.Meta{
MinTime: mint,
MaxTime: maxt,
Chunk: chk,
})
}
return chks
}

type errChunksIterator struct {
err error
}
Expand Down
115 changes: 115 additions & 0 deletions internal/mimir-prometheus/utils/appender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package utils

import (
"github.com/prometheus/prometheus/model/histogram"

"github.com/thanos-io/thanos/internal/mimir-prometheus/tsdb/chunkenc"
)

type RecodingAppender struct {
chk *chunkenc.Chunk
app chunkenc.Appender
}

func NewRecodingAppender(chk *chunkenc.Chunk, app chunkenc.Appender) *RecodingAppender {
return &RecodingAppender{
chk: chk,
app: app,
}
}

func (a *RecodingAppender) Append(t int64, v float64) {
a.app.Append(t, v)
}

func (a *RecodingAppender) AppendHistogram(t int64, h *histogram.Histogram) bool {
app, _ := a.app.(*chunkenc.HistogramAppender)

if app.NumSamples() > 0 {
var (
pForwardInserts, nForwardInserts []chunkenc.Insert
pBackwardInserts, nBackwardInserts []chunkenc.Insert
pMergedSpans, nMergedSpans []histogram.Span
okToAppend bool
)
switch h.CounterResetHint {
case histogram.GaugeType:
if app != nil {
pForwardInserts, nForwardInserts,
pBackwardInserts, nBackwardInserts,
pMergedSpans, nMergedSpans,
okToAppend = app.AppendableGauge(h)
}
default:
if app != nil {
pForwardInserts, nForwardInserts, okToAppend, _ = app.Appendable(h)
}
}
if !okToAppend {
return false
}

if len(pBackwardInserts)+len(nBackwardInserts) > 0 {
h.PositiveSpans = pMergedSpans
h.NegativeSpans = nMergedSpans
app.RecodeHistogram(h, pBackwardInserts, nBackwardInserts)
}
if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
chk, app := app.Recode(
pForwardInserts, nForwardInserts,
h.PositiveSpans, h.NegativeSpans,
)
*a.chk = chk
a.app = app
}
}

a.app.AppendHistogram(t, h)
return true
}

func (a *RecodingAppender) AppendFloatHistogram(t int64, fh *histogram.FloatHistogram) bool {
app, _ := a.app.(*chunkenc.FloatHistogramAppender)

if app.NumSamples() > 0 {
var (
pForwardInserts, nForwardInserts []chunkenc.Insert
pBackwardInserts, nBackwardInserts []chunkenc.Insert
pMergedSpans, nMergedSpans []histogram.Span
okToAppend bool
)
switch fh.CounterResetHint {
case histogram.GaugeType:
pForwardInserts, nForwardInserts,
pBackwardInserts, nBackwardInserts,
pMergedSpans, nMergedSpans,
okToAppend = app.AppendableGauge(fh)
default:
if app != nil {
pForwardInserts, nForwardInserts, okToAppend, _ = app.Appendable(fh)
}
}

if !okToAppend {
return false
}

if len(pBackwardInserts)+len(nBackwardInserts) > 0 {
fh.PositiveSpans = pMergedSpans
fh.NegativeSpans = nMergedSpans
app.RecodeHistogramm(fh, pBackwardInserts, nBackwardInserts)
}

if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 {
chunk, app := app.Recode(
pForwardInserts, nForwardInserts,
fh.PositiveSpans, fh.NegativeSpans,
)
*a.chk = chunk
a.app = app
}
}

a.app.AppendFloatHistogram(t, fh)
return true
}
2 changes: 1 addition & 1 deletion pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ func TestApplyCounterResetsIterator(t *testing.T) {
func TestApplyCounterResetsIteratorHistograms(t *testing.T) {
const lenChunks, lenChunk = 4, 10

histograms := tsdb.GenerateTestHistograms(lenChunks * lenChunk)
histograms := tsdbutil.GenerateTestHistograms(lenChunks * lenChunk)

var chunks [][]*histogramPair
for i := 0; i < lenChunks; i++ {
Expand Down
8 changes: 5 additions & 3 deletions pkg/compact/downsamplemimir/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/value"
prom_tsdb "github.com/prometheus/prometheus/tsdb"

"go.uber.org/goleak"

"github.com/thanos-io/thanos/internal/mimir-prometheus/model/labels"
"github.com/thanos-io/thanos/internal/mimir-prometheus/storage"
"github.com/thanos-io/thanos/internal/mimir-prometheus/tsdb"
Expand All @@ -23,9 +25,9 @@ import (
"github.com/thanos-io/thanos/internal/mimir-prometheus/tsdb/index"
"github.com/thanos-io/thanos/internal/mimir-prometheus/tsdb/tombstones"
"github.com/thanos-io/thanos/internal/mimir-prometheus/tsdb/tsdbutil"
"go.uber.org/goleak"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
)
Expand Down Expand Up @@ -740,7 +742,7 @@ func TestApplyCounterResetsIterator(t *testing.T) {
func TestApplyCounterResetsIteratorHistograms(t *testing.T) {
const lenChunks, lenChunk = 4, 10

histograms := prom_tsdb.GenerateTestHistograms(lenChunks * lenChunk)
histograms := tsdbutil.GenerateTestHistograms(lenChunks * lenChunk)

var chunks [][]*histogramPair
for i := 0; i < lenChunks; i++ {
Expand Down
36 changes: 33 additions & 3 deletions pkg/dedupmimir/chunk_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,10 @@ func (h *chunkIteratorHeap) Pop() interface{} {
}

type overlappingMerger struct {
xorIterators []chunkenc.Iterator
aggrIterators [5][]chunkenc.Iterator
xorIterators []chunkenc.Iterator
histogramIterators []chunkenc.Iterator
floatHistogramIterators []chunkenc.Iterator
aggrIterators [5][]chunkenc.Iterator

samplesMergeFunc func(a, b chunkenc.Iterator) chunkenc.Iterator
}
Expand All @@ -169,6 +171,11 @@ func (o *overlappingMerger) addChunk(chk chunks.Meta) {
switch chk.Chunk.Encoding() {
case chunkenc.EncXOR:
o.xorIterators = append(o.xorIterators, chk.Chunk.Iterator(nil))
// TODO(rabenhorst): Copy this to dedup pkg.
case chunkenc.EncHistogram:
o.histogramIterators = append(o.histogramIterators, chk.Chunk.Iterator(nil))
case chunkenc.EncFloatHistogram:
o.floatHistogramIterators = append(o.floatHistogramIterators, chk.Chunk.Iterator(nil))
case downsample.ChunkEncAggr:
aggrChk := chk.Chunk.(*downsample.AggrChunk)
for i := downsample.AggrCount; i <= downsample.AggrCounter; i++ {
Expand All @@ -182,7 +189,7 @@ func (o *overlappingMerger) addChunk(chk chunks.Meta) {
func (o *overlappingMerger) empty() bool {
// OverlappingMerger only contains either xor chunk or aggr chunk.
// If xor chunks are present then we don't need to check aggr chunks.
if len(o.xorIterators) > 0 {
if len(o.xorIterators) > 0 || len(o.histogramIterators) > 0 || len(o.floatHistogramIterators) > 0 {
return false
}
return len(o.aggrIterators[downsample.AggrCount]) == 0
Expand All @@ -203,6 +210,29 @@ func (o *overlappingMerger) iterator(baseChk chunks.Meta) chunks.Iterator {
return it
}}).Iterator(nil)

// TODO(rabenhorst): Copy this to dedup pkg.
case chunkenc.EncHistogram:
// If Histogram encoding, we need to deduplicate the samples and re-encode them to chunks.
return storage.NewSeriesToChunkEncoder(&storage.SeriesEntry{
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
it = baseChk.Chunk.Iterator(nil)
for _, i := range o.histogramIterators {
it = o.samplesMergeFunc(it, i)
}
return it
}}).Iterator(nil)

case chunkenc.EncFloatHistogram:
// If FloatHistogram encoding, we need to deduplicate the samples and re-encode them to chunks.
return storage.NewSeriesToChunkEncoder(&storage.SeriesEntry{
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
it = baseChk.Chunk.Iterator(nil)
for _, i := range o.floatHistogramIterators {
it = o.samplesMergeFunc(it, i)
}
return it
}}).Iterator(nil)

case downsample.ChunkEncAggr:
// If Aggr encoding, each aggregated chunks need to be expanded and deduplicated,
// then re-encoded into Aggr chunks.
Expand Down
8 changes: 4 additions & 4 deletions test/e2e/native_histograms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/tsdbutil"

"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/queryfrontend"
Expand Down Expand Up @@ -46,7 +46,7 @@ func TestQueryNativeHistograms(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
t.Cleanup(cancel)

histograms := tsdb.GenerateTestHistograms(4)
histograms := tsdbutil.GenerateTestHistograms(4)
now := time.Now()

_, err = writeHistograms(ctx, now, histograms, rawRemoteWriteURL1)
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestWriteNativeHistograms(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
t.Cleanup(cancel)

histograms := tsdb.GenerateTestHistograms(1)
histograms := tsdbutil.GenerateTestHistograms(1)
now := time.Now()
_, err = writeHistograms(ctx, now, histograms, rawRemoteWriteURL)
testutil.Ok(t, err)
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestQueryFrontendNativeHistograms(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
t.Cleanup(cancel)

histograms := tsdb.GenerateTestHistograms(4)
histograms := tsdbutil.GenerateTestHistograms(4)
now := time.Now()
_, err = writeHistograms(ctx, now, histograms, rawRemoteWriteURL1)
testutil.Ok(t, err)
Expand Down

0 comments on commit b199846

Please sign in to comment.