@@ -1,299 +1,186 @@
#![cfg_attr(feature = "bench", feature(test))]


#[cfg(feature="bench")]
extern crate test;
extern crate time;
extern crate lazy_static;

use test::test::Bencher;
//#[macro_use]
//extern crate lazy_static;

mod pcg32;
//mod pcg32;
//mod statsd;

use pcg32::pcg32_random;
//use pcg32::pcg32_random;
//use std::collections::HashMap;

//////////////////
// DEFINITIONS

type ValueType = f32;
type Value = u64;

type TimeType = u64;
struct TimeType (u64);

type RateType = f32;
impl TimeType {
fn now() -> TimeType { TimeType(time::precise_time_ns()) }
fn elapsed_ms(self) -> Value { (TimeType::now().0 - self.0) / 1_000_000 }
}

//type ChannelTimebase = [u32; 2]; // u32 base (alternating) + u32 offset = u64 time
type RateType = f32;

#[derive(Debug, Copy, Clone)]
enum MetricType {
Event,
Count,
Gauge,
Time,
}

///////////////////
// GLOBALS

lazy_static! {
static ref SKIP_SCOPE: CloseScope = SkipScope {};
}

//////////////////
// CONTRACT

// INSTRUMENTATION (API CONTRACT)

trait Event {
fn mark(&self);
fn event(&self);
}

trait Value {
fn value(&self, value: ValueType);
}
struct ValueMetric ();

trait Time {
fn start() -> TimeType {}
fn time(&self, start_time: TimeType);
impl ValueMetric {
fn value(value: Value) -> () {}
}

trait Scope {
fn open_scope(&self) -> OpenedScope;
}
struct TimeMetric ();

trait TagEvent {
fn tag_event(&self, tags: Option<&[S]>);}

trait TagValue {
fn tag_value(&self, value: ValueType, tags: Option<&[S]>);
}

trait OpenedScope {
fn close_scope(self);
impl TimeMetric {
fn start() -> TimeType { TimeType::now() }
}


// CHANNEL

/// Base instruments
trait Meter {
fn new_event<S: AsRef<str>>(&self, name: S) -> Event;
fn new_count<S: AsRef<str>>(&self, name: S) -> Value;
fn new_timer<S: AsRef<str>>(&self, name: S) -> Value;
fn new_gauge<S: AsRef<str>>(&self, name: S) -> Value;
fn new_scope<S: AsRef<str>>(&self, name: S) -> Scope;
}

/// Per-instrument sampling
trait SampleMeter {
fn new_sample_event<S: AsRef<str>>(&self, name: S, sampling: RateType) -> Event;
fn new_sample_count<S: AsRef<str>>(&self, name: S, sampling: RateType) -> Value;
fn new_sample_timer<S: AsRef<str>>(&self, name: S, sampling: RateType) -> Value;
fn new_sample_gauge<S: AsRef<str>>(&self, name: S, sampling: RateType) -> Value;
fn new_sample_scope<S: AsRef<str>>(&self, name: S, sampling: RateType) -> Scope;
}

/// Tag instruments
trait TagMeter {
fn new_tag_event<S: AsRef<str>>(&self, name: S) -> TagEvent;
fn new_tag_count<S: AsRef<str>>(&self, name: S) -> TagValue;
fn new_tag_timer<S: AsRef<str>>(&self, name: S) -> TagValue;
fn new_tag_gauge<S: AsRef<str>>(&self, name: S) -> TagValue;
}

// (SPI CONTRACT)

// OUTPUT

type ValueOut = Fn(ValueType) -> ();

type TagValueOut = Fn(ValueType, Option<&[AsRef<str>]>) -> ();

trait ChannelOutput {
fn new_value<S: AsRef<str>>(&self, name: S, m_type: MetricType, sampling: RateType) -> ValueOut;
fn new_tag_value<S: AsRef<str>>(&self, m_type: MetricType, name: S) -> TagValueOut;
fn new_scope<S: AsRef<str>>(&self, name: S, sampling: RateType) -> Scope;

fn write<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: RateType, value: ValueType, tags: Option<&[S]>) {
self.new_value(m_type, name, sampling).call_mut(value)
}

fn tag_write<S: AsRef<str>>(&self, m_type: MetricType, name: S, sampling: RateType, value: ValueType, tags: Option<&[S]>) {
self.new_tag_value(m_type, name, sampling).call_mut(value, tags)
}

fn open_scope<S: AsRef<str>>(&self, scope_name: S, sampling: RateType) {
self.new_scope(name, sampling).open_scope()
}

fn in_scope<S: AsRef<str>>(&self, scope_name: S, sampling: RateType, mut block: FnMut(ChannelOutput) -> ()) {
let scope = self.new_scope(name, sampling).open_scope();
block.call_mut(self);
scope.close_scope()
}
}


/// A convenience macro to wrap a block or an expression with a start / stop timer.
/// Elapsed time is sent to the supplied statsd client after the computation has been performed.
/// Expression result (if any) is transparently returned.
#[macro_export]
macro_rules! time {
($client: expr, $key: expr, $body: block) => (
let start_time = $client.start_time();
$body
$client.stop_time($key, start_time);
);
}

//////////////////
// IMPLEMENTATION

// SKIP SCOPE

struct SkipScope {}

impl OpenedScope for SkipScope {
fn close_scope(self) {}
trait Metric {
fn write<S: AsRef<str>>(&self, value: Value, tags: Option<&[S]>);
}

struct Metric {
out: ValueOut,
trait Channel {
type Metric: Metric;
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> Self::Metric;
}

struct SampleMetric {
sampling: RateType,
out: ValueOut,
}
////////////

struct TagMetric {
out: TagValueOut,
struct AAM {
m_type: MetricType,
name: String,
sample: RateType
}

impl Event for Metric {
fn mark(&self) {
self.out.call(1.0);
impl Metric for AAM {
fn write<S: AsRef<str>>(&self, value: Value, tags: Option<&[S]>) {
println!("AAM {:?} {} {}", self.m_type, self.name, value)
}
}

impl Value for Metric {
fn value(&self, value: ValueType) {
self.out.call(value);
}
}
struct AAA {}

impl Scope for Metric {
fn open_scope(&self) {
self.out.open_scope()
}
}
impl Channel for AAA {
type Metric = AAM;

impl Event for SampleMetric {
fn mark(&self) {
if pcg32_random() < sampling {
self.out.call(1.0);
}
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> AAM {
AAM {m_type, name: name.as_ref().to_string(), sample}
}
}

impl Value for SampleMetric {
fn value(&self, value: ValueType) {
if pcg32_random() < sampling {
self.out.call(value);
}
}
}
////////////

impl Scope for SampleMetric {
fn open_scope(&self) {
if pcg32_random() < sampling {
out.open_scope()
} else {
SKIP_SCOPE
}
}
struct BBM {
m_type: MetricType,
name: String,
sample: RateType
}

impl TagEvent for TagMetric {
fn tag_event(&self, tags: Option<&[S]>) {
self.out.call(1.0, tags);
impl Metric for BBM {
fn write<S: AsRef<str>>(&self, value: Value, tags: Option<&[S]>) {
println!("BBM {:?} {} {}", self.m_type, self.name, value)
}
}

impl TagValue for TagMetric {
fn tag_value(&self, value: ValueType, tags: Option<&[S]>) {
self.out.call(value, tags);
}
}
struct BBB {}

impl Channel for BBB {
type Metric = BBM;

/// A point in time from which elapsed time can be determined
pub struct StartTime (u64);

impl StartTime {
/// The number of milliseconds elapsed between now and this StartTime
fn elapsed_ms(self) -> u64 {
(time::precise_time_ns() - self.0) / 1_000_000
fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> BBM {
BBM {m_type, name: name.as_ref().to_string(), sample}
}
}

////////////

/// eager aggregation
/// expand every new_* to many new_*
struct AggregatingBuffer {

struct PPM<M: Metric> {
proxy_metric: M
}


/// lazy aggregation
/// expand every new_* to many new_*
struct BufferAggregator {

}

struct Joined {

}

// flush when scope closed
// unscoped passthru
struct ScopeBuffer {

impl <M: Metric> Metric for PPM<M> {
fn write<S: AsRef<str>>(&self, value: Value, tags: Option<&[S]>) {
println!("PPM");
self.proxy_metric.write(value, tags)
}
}

// flush every n metrics
struct CountBuffer {

struct Proxy<C: Channel> {
proxy_channel: C,
}

// flush every n millis
struct TimeBuffer {
impl <C: Channel> Channel for Proxy<C> {
type Metric = PPM<C::Metric>;

fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> PPM<C::Metric> {
let pm = self.proxy_channel.define(m_type, name, sample);
PPM { proxy_metric: pm }
}
}

// flush every n metrics
struct Buffer {
////////////

struct MMP<M: Metric> {
proxy_metric: Vec<M>
}

// separate thread
struct Async {

impl <M: Metric> Metric for MMP<M> {
fn write<S: AsRef<str>>(&self, value: Value, tags: Option<&[S]>) {
println!("PPM");
for i in &self.proxy_metric {
i.write(value, tags);
}
}
}

struct RandomSampler {

struct Dispatch<C: Channel> {
proxy_channel: Vec<Box<C>>
}

struct TimeSampler {
impl <C: Channel> Channel for Dispatch<C> {
type Metric = MMP<C::Metric>;

fn define<S: AsRef<str>>(&self, m_type: MetricType, name: S, sample: RateType) -> MMP<C::Metric> {
// let mut v = Vec::new();
// for i in &self.proxy_channel {
// v.push(i.define(m_type, &name, sample))
// }
let v = self.proxy_channel.iter().map(|ref x| x.define(m_type, &name, sample)).collect();
MMP { proxy_metric: v }
}
}


#[test]
mod test {

}
////////////

#[bench]
fn bench_trait(b: &mut Bencher) {
b.iter(|| {});
fn main() {
let channel_a = Proxy{proxy_channel: AAA{}};
let channel_b = Proxy{proxy_channel: AAA{}};
let channel_x = Dispatch{proxy_channel: vec!(Box::new(channel_a), Box::new(channel_b))};
let z = channel_x.define(MetricType::Count, "count_a", 1.0);
z.write(1, Some(&["TAG"]));
}

@@ -12,7 +12,7 @@ fn seed() -> u64 {
.wrapping_add(1442695040888963407)
}

fn pcg32_random() -> u32 {
pub fn pcg32_random() -> u32 {
thread_local! {
static PCG32_STATE: RefCell<u64> = RefCell::new(seed());
}
@@ -38,6 +38,4 @@ fn to_int_rate(float_rate: f64) -> u32 {
((1.0 - float_rate) * ::std::u32::MAX as f64) as u32
}

fn accept_sample(int_rate: u32) -> bool {
pcg32::random() > int_rate
}
fn accept_sample(int_rate: u32) -> bool { pcg32_random > int_rate }
@@ -1,5 +1,6 @@
use std::net::UdpSocket;
use std::io::Result;
use super;

/// Use a safe maximum size for UDP to prevent fragmentation.
const MAX_UDP_PAYLOAD: usize = 576;