Skip to content

Commit

Permalink
awoods feedback: early return
Browse files Browse the repository at this point in the history
  • Loading branch information
Dieterbe committed Nov 14, 2019
1 parent d485154 commit 7fa00fa
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions aggregator/aggregator.go
Expand Up @@ -172,9 +172,10 @@ func (a *Aggregator) AddOrCreate(key string, ts uint32, quantized uint, value fl
if ok {
proc, ok = agg.state[key]
if ok {
// if both levels exist, we can just add the value and that's it
// if both levels already exist, we only need to add the value
agg.count++
proc.Add(value, ts)
return
}
} else {
// first level doesn't exist. create it and add the ts to the list
Expand All @@ -189,21 +190,21 @@ func (a *Aggregator) AddOrCreate(key string, ts uint32, quantized uint, value fl
a.aggregations[quantized] = agg
}

if !ok {
// note, we only flush where for a given value of now, quantized < now-wait
// this means that as long as the clock doesn't go back in time
// we never recreate a previously created bucket (and reflush with same key and ts)
// a consequence of this is, that if your data stream runs consistently significantly behind
// real time, it may never be included in aggregates, but it's up to you to configure your wait
// parameter properly. You can use the rangeTracker and numTooOld metrics to help with this
if quantized > uint(a.now().Unix())-a.Wait {
agg.count++
proc = a.procConstr(value, ts)
agg.state[key] = proc
return
}
numTooOld.Inc(1)
// first level exists but we need to create the 2nd level.

// note, we only flush where for a given value of now, quantized < now-wait
// this means that as long as the clock doesn't go back in time
// we never recreate a previously created bucket (and reflush with same key and ts)
// a consequence of this is, that if your data stream runs consistently significantly behind
// real time, it may never be included in aggregates, but it's up to you to configure your wait
// parameter properly. You can use the rangeTracker and numTooOld metrics to help with this
if quantized > uint(a.now().Unix())-a.Wait {
agg.count++
proc = a.procConstr(value, ts)
agg.state[key] = proc
return
}
numTooOld.Inc(1)
}

// Flush finalizes and removes aggregations that are due
Expand Down

0 comments on commit 7fa00fa

Please sign in to comment.