Skip to content

Commit

Permalink
#42 Fix memory and performance leak.
Browse files Browse the repository at this point in the history
- If multiple callbacks are registered under the same conflicting key, only the last one will survive.
- It would be especially bad if the callback was re-registered periodically. The memory would increase and even all historical callbacks would be executed during each iteration.
  • Loading branch information
mixalturek authored and fralalonde committed Mar 7, 2019
1 parent 210b37f commit 5e35e68
Showing 1 changed file with 7 additions and 11 deletions.
18 changes: 7 additions & 11 deletions src/bucket/atomic.rs
Expand Up @@ -12,7 +12,7 @@ use core::error;

use std::mem;
use std::isize;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::AtomicIsize;
use std::sync::atomic::Ordering::*;
use std::sync::{Arc};
Expand Down Expand Up @@ -58,7 +58,7 @@ struct InnerAtomicBucket {
-> Option<(InputKind, MetricName, MetricValue)> + Send + Sync + 'static>>,
drain: Option<Arc<OutputDyn + Send + Sync + 'static>>,
publish_metadata: bool,
gauge_observers: Vec<GaugeObserver>,
gauge_observers: HashMap<MetricName, GaugeObserver>,
}

impl fmt::Debug for InnerAtomicBucket {
Expand All @@ -75,9 +75,8 @@ lazy_static! {
impl InnerAtomicBucket {

pub fn flush(&mut self) -> error::Result<()> {
for observer in self.gauge_observers.iter() {
let callback = observer.callback.as_ref();
let value = callback();
for observer in self.gauge_observers.values() {
let value = (observer.callback)();
observer.gauge.value(value);
}

Expand Down Expand Up @@ -168,7 +167,7 @@ impl AtomicBucket {
drain: None,
// TODO add API toggle for metadata publish
publish_metadata: false,
gauge_observers: Vec::new(),
gauge_observers: HashMap::new(),
}))
}
}
Expand Down Expand Up @@ -253,15 +252,12 @@ impl InputScope for AtomicBucket {
InputMetric::new(move |value, _labels| scores.update(value))
}

/// Observe a gauge value using a callback function.
fn observe(&self, name: &str, callback: GaugeCallback) {
let gauge = self.gauge(name);

self.inner
.write()
.expect("Aggregator")
write_lock!(self.inner)
.gauge_observers
.push(GaugeObserver { gauge, callback })
.insert(self.prefix_append(name), GaugeObserver { gauge, callback });
}
}

Expand Down

0 comments on commit 5e35e68

Please sign in to comment.