Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
initialize ROB with correct metric interval
Browse files Browse the repository at this point in the history
fix #1196
  • Loading branch information
replay authored and Dieterbe committed Apr 10, 2019
1 parent df38af3 commit 763e5fb
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 23 deletions.
4 changes: 2 additions & 2 deletions api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func TestGetSeriesFixed(t *testing.T) {
num += 1
id := test.GetMKey(num)

metric := metrics.GetOrCreate(id, 0, 0)
metric := metrics.GetOrCreate(id, 0, 0, 1)
metric.Add(offset, 10) // this point will always be quantized to 10
metric.Add(10+offset, 20) // this point will always be quantized to 20, so it should be selected
metric.Add(20+offset, 30) // this point will always be quantized to 30, so it should be selected
Expand Down Expand Up @@ -698,7 +698,7 @@ func TestGetSeriesAggMetrics(t *testing.T) {
req.ArchInterval = archInterval
ctx := newRequestContext(test.NewContext(), &req, consolidation.None)

metric := metrics.GetOrCreate(metricKey, 0, 0)
metric := metrics.GetOrCreate(metricKey, 0, 0, 1)
for i := uint32(50); i < 3000; i++ {
metric.Add(i, float64(i^2))
}
Expand Down
4 changes: 2 additions & 2 deletions input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, format msg
return
}

m := in.metrics.GetOrCreate(point.MKey, archive.SchemaId, archive.AggId)
m := in.metrics.GetOrCreate(point.MKey, archive.SchemaId, archive.AggId, uint32(archive.Interval))
m.Add(point.Time, point.Value)
}

Expand Down Expand Up @@ -113,6 +113,6 @@ func (in DefaultHandler) ProcessMetricData(md *schema.MetricData, partition int3

archive, _, _ := in.metricIndex.AddOrUpdate(mkey, md, partition)

m := in.metrics.GetOrCreate(mkey, archive.SchemaId, archive.AggId)
m := in.metrics.GetOrCreate(mkey, archive.SchemaId, archive.AggId, uint32(md.Interval))
m.Add(uint32(md.Time), md.Value)
}
4 changes: 2 additions & 2 deletions mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type AggMetric struct {
// it optionally also creates aggregations with the given settings
// the 0th retention is the native archive of this metric. if there's several others, we create aggregators, using agg.
// it's the callers responsibility to make sure agg is not nil in that case!
func NewAggMetric(store Store, cachePusher cache.CachePusher, key schema.AMKey, retentions conf.Retentions, reorderWindow uint32, agg *conf.Aggregation, dropFirstChunk bool) *AggMetric {
func NewAggMetric(store Store, cachePusher cache.CachePusher, key schema.AMKey, retentions conf.Retentions, reorderWindow, interval uint32, agg *conf.Aggregation, dropFirstChunk bool) *AggMetric {

// note: during parsing of retentions, we assure there's at least 1.
ret := retentions[0]
Expand All @@ -67,7 +67,7 @@ func NewAggMetric(store Store, cachePusher cache.CachePusher, key schema.AMKey,
lastWrite: uint32(time.Now().Unix()),
}
if reorderWindow != 0 {
m.rob = NewReorderBuffer(reorderWindow, ret.SecondsPerPoint)
m.rob = NewReorderBuffer(reorderWindow, interval)
}

for _, ret := range retentions[1:] {
Expand Down
10 changes: 5 additions & 5 deletions mdata/aggmetric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func testMetricPersistOptionalPrimary(t *testing.T, primary bool) {

numChunks, chunkAddCount, chunkSpan := uint32(5), uint32(10), uint32(300)
ret := []conf.Retention{conf.NewRetentionMT(1, 1, chunkSpan, numChunks, 0)}
agg := NewAggMetric(mockstore, &mockCache, test.GetAMKey(42), ret, 0, nil, false)
agg := NewAggMetric(mockstore, &mockCache, test.GetAMKey(42), ret, 0, chunkSpan, nil, false)

for ts := chunkSpan; ts <= chunkSpan*chunkAddCount; ts += chunkSpan {
agg.Add(ts, 1)
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestAggMetric(t *testing.T) {
cluster.Init("default", "test", time.Now(), "http", 6060)

ret := []conf.Retention{conf.NewRetentionMT(1, 1, 120, 5, 0)}
c := NewChecker(t, NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 0, nil, false))
c := NewChecker(t, NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 0, 1, nil, false))

// chunk t0's: 120, 240, 360, 480, 600, 720, 840, 960

Expand Down Expand Up @@ -242,7 +242,7 @@ func TestAggMetricWithReorderBuffer(t *testing.T) {
AggregationMethod: []conf.Method{conf.Avg},
}
ret := []conf.Retention{conf.NewRetentionMT(1, 1, 120, 5, 0)}
c := NewChecker(t, NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 10, &agg, false))
c := NewChecker(t, NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 10, 1, &agg, false))

// basic adds and verifies with test data
c.Add(121, 121)
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestAggMetricDropFirstChunk(t *testing.T) {
chunkSpan := uint32(10)
numChunks := uint32(5)
ret := []conf.Retention{conf.NewRetentionMT(1, 1, chunkSpan, numChunks, 0)}
m := NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 0, nil, true)
m := NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(42), ret, 0, 1, nil, true)
m.Add(10, 10)
m.Add(11, 11)
m.Add(12, 12)
Expand Down Expand Up @@ -321,7 +321,7 @@ func BenchmarkAggMetricAdd(b *testing.B) {
},
}

metric := NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(0), retentions, 0, nil, false)
metric := NewAggMetric(mockstore, &cache.MockCache{}, test.GetAMKey(0), retentions, 0, 10, nil, false)

max := uint32(b.N*10 + 1)
for t := uint32(1); t < max; t += 10 {
Expand Down
4 changes: 2 additions & 2 deletions mdata/aggmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (ms *AggMetrics) Get(key schema.MKey) (Metric, bool) {
return m, ok
}

func (ms *AggMetrics) GetOrCreate(key schema.MKey, schemaId, aggId uint16) Metric {
func (ms *AggMetrics) GetOrCreate(key schema.MKey, schemaId, aggId uint16, interval uint32) Metric {
var m *AggMetric
// in the most common case, it's already there and an Rlock is all we need
ms.RLock()
Expand Down Expand Up @@ -161,7 +161,7 @@ func (ms *AggMetrics) GetOrCreate(key schema.MKey, schemaId, aggId uint16) Metri
ms.Unlock()
return m
}
m = NewAggMetric(ms.store, ms.cachePusher, k, confSchema.Retentions, confSchema.ReorderWindow, &agg, ms.dropFirstChunk)
m = NewAggMetric(ms.store, ms.cachePusher, k, confSchema.Retentions, confSchema.ReorderWindow, interval, &agg, ms.dropFirstChunk)
ms.Metrics[key.Org][key.Key] = m
active := len(ms.Metrics[key.Org])
ms.Unlock()
Expand Down
12 changes: 6 additions & 6 deletions mdata/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,31 @@ func NewAggregator(store Store, cachePusher cache.CachePusher, key schema.AMKey,
case conf.Avg:
if aggregator.sumMetric == nil {
key.Archive = schema.NewArchive(schema.Sum, span)
aggregator.sumMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, nil, dropFirstChunk)
aggregator.sumMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk)
}
if aggregator.cntMetric == nil {
key.Archive = schema.NewArchive(schema.Cnt, span)
aggregator.cntMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, nil, dropFirstChunk)
aggregator.cntMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk)
}
case conf.Sum:
if aggregator.sumMetric == nil {
key.Archive = schema.NewArchive(schema.Sum, span)
aggregator.sumMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, nil, dropFirstChunk)
aggregator.sumMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk)
}
case conf.Lst:
if aggregator.lstMetric == nil {
key.Archive = schema.NewArchive(schema.Lst, span)
aggregator.lstMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, nil, dropFirstChunk)
aggregator.lstMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk)
}
case conf.Max:
if aggregator.maxMetric == nil {
key.Archive = schema.NewArchive(schema.Max, span)
aggregator.maxMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, nil, dropFirstChunk)
aggregator.maxMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk)
}
case conf.Min:
if aggregator.minMetric == nil {
key.Archive = schema.NewArchive(schema.Min, span)
aggregator.minMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, nil, dropFirstChunk)
aggregator.minMetric = NewAggMetric(store, cachePusher, key, conf.Retentions{ret}, 0, span, nil, dropFirstChunk)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion mdata/ifaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type Metrics interface {
Get(key schema.MKey) (Metric, bool)
GetOrCreate(key schema.MKey, schemaId, aggId uint16) Metric
GetOrCreate(key schema.MKey, schemaId, aggId uint16, interval uint32) Metric
}

type Metric interface {
Expand Down
2 changes: 1 addition & 1 deletion mdata/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (dn DefaultNotifierHandler) Handle(data []byte) {
log.Debugf("notifier: skipping metric with MKey %s as it is not in the index", amkey.MKey)
continue
}
agg := dn.metrics.GetOrCreate(amkey.MKey, def.SchemaId, def.AggId)
agg := dn.metrics.GetOrCreate(amkey.MKey, def.SchemaId, def.AggId, uint32(def.Interval))
if amkey.Archive != 0 {
consolidator := consolidation.FromArchive(amkey.Archive.Method())
aggSpan := amkey.Archive.Span()
Expand Down
4 changes: 2 additions & 2 deletions mdata/reorder_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ type ReorderBuffer struct {
buf []schema.Point // the actual buffer holding the data
}

func NewReorderBuffer(reorderWindow uint32, interval int) *ReorderBuffer {
func NewReorderBuffer(reorderWindow, interval uint32) *ReorderBuffer {
return &ReorderBuffer{
interval: uint32(interval),
interval: interval,
buf: make([]schema.Point, reorderWindow),
}
}
Expand Down

0 comments on commit 763e5fb

Please sign in to comment.