From 763e5fb20f2992bbaaf9a12462448067d7c35fa6 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Thu, 24 Jan 2019 18:32:32 -0300 Subject: [PATCH] initialize ROB with correct metric interval fix #1196 --- api/dataprocessor_test.go | 4 ++-- input/input.go | 4 ++-- mdata/aggmetric.go | 4 ++-- mdata/aggmetric_test.go | 10 +++++----- mdata/aggmetrics.go | 4 ++-- mdata/aggregator.go | 12 ++++++------ mdata/ifaces.go | 2 +- mdata/notifier.go | 2 +- mdata/reorder_buffer.go | 4 ++-- 9 files changed, 23 insertions(+), 23 deletions(-) diff --git a/api/dataprocessor_test.go b/api/dataprocessor_test.go index 1a2d9ab75c..4ce5efb84f 100644 --- a/api/dataprocessor_test.go +++ b/api/dataprocessor_test.go @@ -364,7 +364,7 @@ func TestGetSeriesFixed(t *testing.T) { num += 1 id := test.GetMKey(num) - metric := metrics.GetOrCreate(id, 0, 0) + metric := metrics.GetOrCreate(id, 0, 0, 1) metric.Add(offset, 10) // this point will always be quantized to 10 metric.Add(10+offset, 20) // this point will always be quantized to 20, so it should be selected metric.Add(20+offset, 30) // this point will always be quantized to 30, so it should be selected @@ -698,7 +698,7 @@ func TestGetSeriesAggMetrics(t *testing.T) { req.ArchInterval = archInterval ctx := newRequestContext(test.NewContext(), &req, consolidation.None) - metric := metrics.GetOrCreate(metricKey, 0, 0) + metric := metrics.GetOrCreate(metricKey, 0, 0, 1) for i := uint32(50); i < 3000; i++ { metric.Add(i, float64(i^2)) } diff --git a/input/input.go b/input/input.go index 4a3e6bb910..505104dfd0 100644 --- a/input/input.go +++ b/input/input.go @@ -78,7 +78,7 @@ func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, format msg return } - m := in.metrics.GetOrCreate(point.MKey, archive.SchemaId, archive.AggId) + m := in.metrics.GetOrCreate(point.MKey, archive.SchemaId, archive.AggId, uint32(archive.Interval)) m.Add(point.Time, point.Value) } @@ -113,6 +113,6 @@ func (in DefaultHandler) ProcessMetricData(md *schema.MetricData, partition int3 archive, _, _ := in.metricIndex.AddOrUpdate(mkey, md, partition) - m := in.metrics.GetOrCreate(mkey, archive.SchemaId, archive.AggId) + m := in.metrics.GetOrCreate(mkey, archive.SchemaId, archive.AggId, uint32(md.Interval)) m.Add(uint32(md.Time), md.Value) } diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index 67fe85f0d1..eb351f0c6c 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -48,7 +48,7 @@ type AggMetric struct { // it optionally also creates aggregations with the given settings // the 0th retention is the native archive of this metric. if there's several others, we create aggregators, using agg. // it's the callers responsibility to make sure agg is not nil in that case! -func NewAggMetric(store Store, cachePusher cache.CachePusher, key schema.AMKey, retentions conf.Retentions, reorderWindow uint32, agg *conf.Aggregation, dropFirstChunk bool) *AggMetric { +func NewAggMetric(store Store, cachePusher cache.CachePusher, key schema.AMKey, retentions conf.Retentions, reorderWindow, interval uint32, agg *conf.Aggregation, dropFirstChunk bool) *AggMetric { // note: during parsing of retentions, we assure there's at least 1. ret := retentions[0] @@ -67,7 +67,7 @@ func NewAggMetric(store Store, cachePusher cache.CachePusher, key schema.AMKey, lastWrite: uint32(time.Now().Unix()), } if reorderWindow != 0 { - m.rob = NewReorderBuffer(reorderWindow, ret.SecondsPerPoint) + m.rob = NewReorderBuffer(reorderWindow, interval) } for _, ret := range retentions[1:] { diff --git a/mdata/aggmetric_test.go b/mdata/aggmetric_test.go index 80900a6d6d..02127b6379 100644 --- a/mdata/aggmetric_test.go +++ b/mdata/aggmetric_test.go @@ -128,7 +128,7 @@ func testMetricPersistOptionalPrimary(t *testing.T, primary bool) { numChunks, chunkAddCount, chunkSpan := uint32(5), uint32(10), uint32(300) ret := []conf.Retention{conf.NewRetentionMT(1, 1, chunkSpan, numChunks, 0)} - agg := NewAggMetric(mockstore, &mockCache, test.GetAMKey(42), ret, 0, nil, false) + agg := NewAggMetric(mockstore, &mockCache, test.GetAMKey(42), ret, 0, chunkSpan, nil, false) for ts := chunkSpan; ts <= chunkSpan*chunkAddCount; ts += chunkSpan { agg.Add(ts, 1) @@ -164,7 +164,7 @@ func TestAggMetric(t *testing.T) { cluster.Init("default", "test", time.Now(), "http", 6060) ret := []conf.Retention{conf.NewRetentionMT(1, 1, 120, 5, 0)} - c := NewChecker(t, NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 0, nil, false)) + c := NewChecker(t, NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 0, 1, nil, false)) // chunk t0's: 120, 240, 360, 480, 600, 720, 840, 960 @@ -242,7 +242,7 @@ func TestAggMetricWithReorderBuffer(t *testing.T) { AggregationMethod: []conf.Method{conf.Avg}, } ret := []conf.Retention{conf.NewRetentionMT(1, 1, 120, 5, 0)} - c := NewChecker(t, NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 10, &agg, false)) + c := NewChecker(t, NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 10, 1, &agg, false)) // basic adds and verifies with test data c.Add(121, 121) @@ -282,7 +282,7 @@ func TestAggMetricDropFirstChunk(t *testing.T) { chunkSpan := uint32(10) numChunks := uint32(5) ret := []conf.Retention{conf.NewRetentionMT(1, 1, chunkSpan, numChunks, 0)} - m := NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 0, nil, true) + m := NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 0, 1, nil, true) m.Add(10, 10) m.Add(11, 11) m.Add(12, 12) @@ -321,7 +321,7 @@ func BenchmarkAggMetricAdd(b *testing.B) { }, } - metric := NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(0), retentions, 0, nil, false) + metric := NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(0), retentions, 0, 10, nil, false) max := uint32(b.N*10 + 1) for t := uint32(1); t < max; t += 10 { diff --git a/mdata/aggmetrics.go b/mdata/aggmetrics.go index 6b02daa2f1..6d38176dc9 100644 --- a/mdata/aggmetrics.go +++ b/mdata/aggmetrics.go @@ -129,7 +129,7 @@ func (ms *AggMetrics) Get(key schema.MKey) (Metric, bool) { return m, ok } -func (ms *AggMetrics) GetOrCreate(key schema.MKey, schemaId, aggId uint16) Metric { +func (ms *AggMetrics) GetOrCreate(key schema.MKey, schemaId, aggId uint16, interval uint32) Metric { var m *AggMetric // in the most common case, it's already there and an Rlock is all we need ms.RLock() @@ -161,7 +161,7 @@ func (ms *AggMetrics) GetOrCreate(key schema.MKey, schemaId, aggId uint16) Metri ms.Unlock() return m } - m = NewAggMetric(ms.store, ms.cachePusher, k, confSchema.Retentions, confSchema.ReorderWindow, &agg, ms.dropFirstChunk) + m = NewAggMetric(ms.store, ms.cachePusher, k, confSchema.Retentions, confSchema.ReorderWindow, interval, &agg, ms.dropFirstChunk) ms.Metrics[key.Org][key.Key] = m active := len(ms.Metrics[key.Org]) ms.Unlock() diff --git a/mdata/aggregator.go b/mdata/aggregator.go index 3584ce2de8..0ef1985b34 100644 --- a/mdata/aggregator.go +++ b/mdata/aggregator.go @@ -43,31 +43,31 @@ func NewAggregator(store Store, cachePusher cache.CachePusher, key schema.AMKey, case conf.Avg: if aggregator.sumMetric == nil { key.Archive = schema.NewArchive(schema.Sum, span) - aggregator.sumMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, nil, dropFirstChunk) + aggregator.sumMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk) } if aggregator.cntMetric == nil { key.Archive = schema.NewArchive(schema.Cnt, span) - aggregator.cntMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, nil, dropFirstChunk) + aggregator.cntMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk) } case conf.Sum: if aggregator.sumMetric == nil { key.Archive = schema.NewArchive(schema.Sum, span) - aggregator.sumMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, nil, dropFirstChunk) + aggregator.sumMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk) } case conf.Lst: if aggregator.lstMetric == nil { key.Archive = schema.NewArchive(schema.Lst, span) - aggregator.lstMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, nil, dropFirstChunk) + aggregator.lstMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk) } case conf.Max: if aggregator.maxMetric == nil { key.Archive = schema.NewArchive(schema.Max, span) - aggregator.maxMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, nil, dropFirstChunk) + aggregator.maxMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk) } case conf.Min: if aggregator.minMetric == nil { key.Archive = schema.NewArchive(schema.Min, span) - aggregator.minMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, nil, dropFirstChunk) + aggregator.minMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk) } } } diff --git a/mdata/ifaces.go b/mdata/ifaces.go index e8c48bc0cb..8251a413da 100644 --- a/mdata/ifaces.go +++ b/mdata/ifaces.go @@ -12,7 +12,7 @@ import ( type Metrics interface { Get(key schema.MKey) (Metric, bool) - GetOrCreate(key schema.MKey, schemaId, aggId uint16) Metric + GetOrCreate(key schema.MKey, schemaId, aggId uint16, interval uint32) Metric } type Metric interface { diff --git a/mdata/notifier.go b/mdata/notifier.go index 2692fba13e..c25cdb3e6d 100644 --- a/mdata/notifier.go +++ b/mdata/notifier.go @@ -95,7 +95,7 @@ func (dn DefaultNotifierHandler) Handle(data []byte) { log.Debugf("notifier: skipping metric with MKey %s as it is not in the index", amkey.MKey) continue } - agg := dn.metrics.GetOrCreate(amkey.MKey, def.SchemaId, def.AggId) + agg := dn.metrics.GetOrCreate(amkey.MKey, def.SchemaId, def.AggId, uint32(def.Interval)) if amkey.Archive != 0 { consolidator := consolidation.FromArchive(amkey.Archive.Method()) aggSpan := amkey.Archive.Span() diff --git a/mdata/reorder_buffer.go b/mdata/reorder_buffer.go index 40dd5db1d7..99c5ddfdbf 100644 --- a/mdata/reorder_buffer.go +++ b/mdata/reorder_buffer.go @@ -18,9 +18,9 @@ type ReorderBuffer struct { buf []schema.Point // the actual buffer holding the data } -func NewReorderBuffer(reorderWindow uint32, interval int) *ReorderBuffer { +func NewReorderBuffer(reorderWindow, interval uint32) *ReorderBuffer { return &ReorderBuffer{ - interval: uint32(interval), + interval: interval, buf: make([]schema.Point, reorderWindow), } }