Permalink
Browse files

Provide UDP connection alternative to tcp for inbound stream

  • Loading branch information...
rossta committed Apr 18, 2012
1 parent 5f663d2 commit 4c0e13f97972f20d107ee26b4897d8ec14fb57b0
View
@@ -3,4 +3,4 @@ source :rubygems
gemspec
gem "rake"
-gem "yajl-ruby", :git => "git://github.com/brianmario/yajl-ruby.git"
+gem "yajl-ruby", :git => "git://github.com/brianmario/yajl-ruby.git"
View
@@ -15,14 +15,14 @@
gauge :messages_read, :tick => 1.day.to_i, :title => "Messages (read)"
gauge :winks_sent, :tick => 1.day.to_i, :title => "Winks sent"
- gauge :pageviews_per_url_daily,
- :tick => 1.day.to_i,
- :title => "Daily Pageviews per URL",
+ gauge :pageviews_per_url_daily,
+ :tick => 1.day.to_i,
+ :title => "Daily Pageviews per URL",
:three_dimensional => true
- gauge :pageviews_per_url_monthly,
- :tick => 30.days.to_i,
- :title => "Monthly Pageviews per URL",
+ gauge :pageviews_per_url_monthly,
+ :tick => 30.days.to_i,
+ :title => "Monthly Pageviews per URL",
:three_dimensional => true
event :_pageview do
@@ -124,8 +124,8 @@
:width => 67,
:autoupdate => 30,
:gauges => [
- :skip_votes, :yes_votes,
- :maybe_votes, :winks_sent, :messages_sent, :messages_read,
+ :skip_votes, :yes_votes,
+ :maybe_votes, :winks_sent, :messages_sent, :messages_read,
]
}
@@ -235,7 +235,7 @@
if %w(ry201112a ry201112b ry201112_ref).include?(data[:campaign_key])
incr :rockyou1_requests
end
- end
+ end
widget 'Campaigns', {
:title => "RockYou (1) - PPI vs. Requests vs. Refs",
@@ -263,38 +263,38 @@
- gauge :abtest_sidebar_btn_totals,
- :tick => 36000.days.to_i,
- :title => "(A/B) sidebar_btn: Totals",
+ gauge :abtest_sidebar_btn_totals,
+ :tick => 36000.days.to_i,
+ :title => "(A/B) sidebar_btn: Totals",
:three_dimensional => true
- gauge :abtest_sidebar_btn_daily,
- :tick => 1.day.to_i,
- :title => "(A/B) sidebar_btn: Daily",
+ gauge :abtest_sidebar_btn_daily,
+ :tick => 1.day.to_i,
+ :title => "(A/B) sidebar_btn: Daily",
:three_dimensional => true
- gauge :abtest_sidebar_btn_leute_treffen,
- :tick => 1.day.to_i,
+ gauge :abtest_sidebar_btn_leute_treffen,
+ :tick => 1.day.to_i,
:title => "leute_treffen"
- gauge :abtest_sidebar_btn_jetzt_losflirten,
- :tick => 1.day.to_i,
+ gauge :abtest_sidebar_btn_jetzt_losflirten,
+ :tick => 1.day.to_i,
:title => "jetzt_losflirten"
- gauge :abtest_sidebar_btn_dates_finden,
- :tick => 1.day.to_i,
+ gauge :abtest_sidebar_btn_dates_finden,
+ :tick => 1.day.to_i,
:title => "dates_finden"
- gauge :abtest_sidebar_btn_leute_treffen_monthly,
- :tick => 1.month.to_i,
+ gauge :abtest_sidebar_btn_leute_treffen_monthly,
+ :tick => 1.month.to_i,
:title => "leute_treffen"
- gauge :abtest_sidebar_btn_jetzt_losflirten_monthly,
- :tick => 1.month.to_i,
+ gauge :abtest_sidebar_btn_jetzt_losflirten_monthly,
+ :tick => 1.month.to_i,
:title => "jetzt_losflirten"
- gauge :abtest_sidebar_btn_dates_finden_monthly,
- :tick => 1.month.to_i,
+ gauge :abtest_sidebar_btn_dates_finden_monthly,
+ :tick => 1.month.to_i,
:title => "dates_finden"
@@ -428,46 +428,46 @@
end
- gauge :competitors_badoo_mau,
+ gauge :competitors_badoo_mau,
:tick => 1.day.to_i,
:title => "Badoo (MAU)"
- gauge :competitors_badoo_dau,
+ gauge :competitors_badoo_dau,
:tick => 1.day.to_i,
:title => "Badoo (DAU)"
- gauge :competitors_zoosk_mau,
+ gauge :competitors_zoosk_mau,
:tick => 1.day.to_i,
:title => "zoosk (MAU)"
- gauge :competitors_zoosk_dau,
+ gauge :competitors_zoosk_dau,
:tick => 1.day.to_i,
:title => "zoosk (DAU)"
- gauge :competitors_areyouinterested_mau,
+ gauge :competitors_areyouinterested_mau,
:tick => 1.day.to_i,
:title => "areyouinterested (MAU)"
- gauge :competitors_areyouinterested_dau,
+ gauge :competitors_areyouinterested_dau,
:tick => 1.day.to_i,
:title => "areyouinterested (DAU)"
- gauge :competitors_onetwolike_mau,
+ gauge :competitors_onetwolike_mau,
:tick => 1.day.to_i,
:title => "onetwolike (MAU)"
- gauge :competitors_onetwolike_dau,
+ gauge :competitors_onetwolike_dau,
:tick => 1.day.to_i,
:title => "onetwolike (DAU)"
- gauge :competitors_kizzle_mau,
+ gauge :competitors_kizzle_mau,
:tick => 1.day.to_i,
:title => "kizzle (MAU)"
- gauge :competitors_kizzle_dau,
+ gauge :competitors_kizzle_dau,
:tick => 1.day.to_i,
:title => "kizzle (DAU)"
-
+
event :competition_data do
set_value(:competitors_badoo_mau, data[:badoo_mau]) if data[:badoo_mau]
set_value(:competitors_badoo_dau, data[:badoo_dau]) if data[:badoo_dau]
@@ -557,7 +557,7 @@
gauge :events_per_hour, :tick => 1.hour.to_i
gauge :events_per_second, :tick => 1
gauge :votes_per_second, :tick => 1
-
+
event :"*" do
incr :events_per_minute
incr :events_per_hour
View
@@ -26,6 +26,7 @@ def self.default_options(opts = {})
:redis_url => "redis://localhost:6379",
:redis_prefix => "fnordmetric",
:inbound_stream => ["0.0.0.0", "1337"],
+ :inbound_protocol => :tcp,
:web_interface => ["0.0.0.0", "4242"],
:web_interface_server => "thin",
:start_worker => true,
@@ -122,11 +123,12 @@ def self.embedded(opts={})
end
if opts[:inbound_stream]
+ inbound_class = opts[:inbound_protocol] == :udp ? InboundDatagram : InboundStream
begin
- inbound_stream = InboundStream.start(opts)
- log "listening on tcp##{opts[:inbound_stream].join(":")}"
+ inbound_stream = inbound_class.start(opts)
+ log "listening on #{opts[:inbound_protocol]}##{opts[:inbound_stream].join(":")}"
rescue
- log "cant start FnordMetric::InboundStream. port in use?"
+ log "cant start #{inbound_class.name}. port in use?"
end
end
@@ -145,6 +147,7 @@ def self.embedded(opts={})
require "fnordmetric/api"
require "fnordmetric/inbound_stream"
+require "fnordmetric/inbound_datagram"
require "fnordmetric/worker"
require "fnordmetric/widget"
require "fnordmetric/timeline_widget"
@@ -0,0 +1,20 @@
+class FnordMetric::InboundDatagram < EventMachine::Connection
+
+ class << self
+ attr_accessor :opts
+ end
+
+ def self.start(opts)
+ self.opts = opts
+ EM.open_datagram_socket(*opts[:inbound_stream], self, opts)
+ end
+
+ def receive_data(data)
+ api.event(data)
+ end
+
+ def api
+ @api ||= FnordMetric::API.new(self.class.opts)
+ end
+
+end
@@ -1,42 +1,42 @@
-class FnordMetric::InboundStream < EventMachine::Connection
+class FnordMetric::InboundStream < EventMachine::Connection
@@opts = nil
def self.start(opts)
@@opts = opts
- EM.start_server(*opts[:inbound_stream], self)
+ EM.start_server(*opts[:inbound_stream], self)
end
def self.options(opts)
@@opts = opts
end
- def receive_data(chunk)
- @buffer << chunk
+ def receive_data(chunk)
+ @buffer << chunk
next_event
end
def next_event
read_next_event
push_next_event
end
-
+
def read_next_event
while (event = @buffer.slice!(/^(.*)\n/))
@events_buffered += 1
@events << event
- end
+ end
end
def push_next_event
return true if @events.empty?
@events_buffered -= 1
@api.event(@events.pop)
close_connection?
- EM.next_tick(&method(:push_next_event))
+ EM.next_tick(&method(:push_next_event))
end
def close_connection?
- @api.disconnect unless @streaming || (@events_buffered!=0)
+ @api.disconnect unless @streaming || (@events_buffered!=0)
end
def post_init
@@ -0,0 +1,34 @@
+require ::File.expand_path('../spec_helper.rb', __FILE__)
+
+describe FnordMetric::InboundDatagram do
+
+ let(:inbound_datagram) { FnordMetric::InboundDatagram.new(nil) }
+
+ before(:all) do
+ @redis = Redis.new
+ @redis_wrap = RedisWrap.new(@redis)
+ FnordMetric::InboundDatagram.opts = {
+ :redis_url => "redis://localhost:6379",
+ :redis_prefix => "fnordmetric-test",
+ :event_queue_ttl => 120
+ }
+ end
+
+ describe "pushing new events" do
+ it "should add parsable event to the queue" do
+ data = %Q{{"_type": "started"}}
+
+ lambda {
+ inbound_datagram.receive_data data
+ }.should change { @redis.llen("fnordmetric-test-queue") }.by +1
+ end
+
+ it "should reject non parsable events" do
+ broken_data = %Q{{"_type" => "started"}}
+
+ lambda {
+ inbound_datagram.receive_data broken_data
+ }.should_not change { @redis.llen("fnordmetric-test-queue") }
+ end
+ end
+end
@@ -24,7 +24,7 @@
it "should reject non parsable events" do
broken_data = %Q{{"_type": \n"started"}\n}
-
+
lambda {
@inbound_stream.receive_data broken_data
}.should_not change { @redis.llen("fnordmetric-test-queue") }

0 comments on commit 4c0e13f

Please sign in to comment.