Permalink
Browse files

better bucket_flush procedure

  • Loading branch information...
asmuth committed Feb 25, 2013
1 parent 0e19bad commit 9a50d777d436d410780a47a42a140d3e0a350883
Showing with 14 additions and 10 deletions.
  1. +5 −2 fnordmetric-enterprise/src/AbstractBucket.scala
  2. +9 −8 fnordmetric-enterprise/src/Metric.scala
@@ -14,20 +14,23 @@ trait AbstractBucket {
def sample(value: Double) : Unit
def flush() : Double
- def flush_every(interval: Long, proc: (Long, Double) => Unit) = {
+ def flush_every(interval: Long) : (Long, Double) = {
val now = FnordMetric.now
var triggered = (next_flush == 0)
+ var ret : (Long, Double) = null
if (triggered)
next_flush = now
while (next_flush <= now) {
if (!triggered)
- proc(next_flush, flush)
+ ret = ((next_flush, flush))
next_flush += interval
triggered = true
}
+
+ ret
}
}
@@ -19,18 +19,19 @@ class Metric(key: MetricKey) {
// adds a value to the metric's bucket and tries to flush the bucket
def sample(value: Double) = this.synchronized {
-
- // call flush_bucket with the returned aggregated value for every
- // flush_interval since the last call to flush_every
- bucket.flush_every(key.flush_interval, (
- (time, value) => flush_bucket(time, value) ))
-
bucket.sample(value)
+ flush_bucket
}
// adds an aggregated value to the in memory ring buffer after it has
// been flushed from the bucket
- private def flush_bucket(time: Long, value: Double) = {
+ def flush_bucket : Unit = {
+ val nxt = bucket.flush_every(key.flush_interval)
+
+ // flush_every returns null if the current flush interval is not over
+ // yet (makes this method idempotent)
+ if (nxt == null)
+ return
// if the ring buffer is already full we need to clear up a slot
if (rbuf.remaining == 0) {
@@ -53,7 +54,7 @@ class Metric(key: MetricKey) {
// now at least one slot in the ring buffer is free so we can just
// push our sample
- rbuf.push(((time, value)))
+ rbuf.push(nxt)
flush_rbuf // FIXPAUL: remove me
}

0 comments on commit 9a50d77

Please sign in to comment.