Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

We’re showing branches in this repository, but you can also compare across forks.

base fork: bskaplou/event_chopper
base: 04372d61ed
...
head fork: bskaplou/event_chopper
compare: e207c7e2c0
  • 2 commits
  • 9 files changed
  • 0 commit comments
  • 1 contributor
2  Gemfile
View
@@ -2,6 +2,8 @@ source "http://rubygems.org"
gem 'comm'
gem 'tokyotyrant'
+gem 'mongo'
+gem 'bson_ext'
# Specify your gem's dependencies in event_chopper.gemspec
gemspec
12 bin/replay
View
@@ -0,0 +1,12 @@
+#!/usr/bin/env ruby
+
+$LOAD_PATH.unshift File.expand_path("../lib", __FILE__)
+require 'event_chopper'
+
+reporter = Object.const_get('EventChopper').const_get(ARGV[0]).new
+EventChopper.logger.replay(Time.at(ARGV[1].to_i), Time.at(ARGV[2].to_i), reporter.event_types) do |type, message, stamp|
+ # puts type
+ # puts message
+# puts stamp
+ reporter.map type, message, stamp
+end
1  event_chopper.gemspec
View
@@ -24,5 +24,6 @@ Gem::Specification.new do |s|
s.add_runtime_dependency "comm"
s.add_runtime_dependency "bunny"
s.add_runtime_dependency "yajl-ruby"
+ s.add_runtime_dependency "mongo"
# s.add_runtime_dependency "rest-client"
end
17 lib/event_chopper.rb
View
@@ -1,6 +1,7 @@
require "event_chopper/version"
require "event_chopper/time_key"
require "event_chopper/base"
+require "event_chopper/logger"
require "event_chopper/gate_distribution"
require "event_chopper/airline_by_direction"
require "event_chopper/gate_slowness"
@@ -8,5 +9,19 @@
require "event_chopper/store/eleminating_store"
module EventChopper
- # Your code goes here...
+ def encode data
+ Yajl::Encoder.encode data
+ end
+
+ def decode data
+ Yajl::Parser.parse data
+ end
+
+ def logger
+ @logger ||= Logger.new
+ end
+
+ module_function :encode, :decode, :logger
end
+
+
4 lib/event_chopper/base.rb
View
@@ -22,8 +22,8 @@ def emit record, id = NOID, stamp = nil
puts stamp.to_s + ' -> ' + id + ' -> ' + current_value.to_s
end
- def map topic, record
- emit record
+ def map topic, record, stamp = nil
+ emit record, NOID, stamp
end
def get stamp, id = NOID
4 lib/event_chopper/gate_distribution.rb
View
@@ -5,9 +5,9 @@ def event_types
['gate.distribution.first']
end
- def map topic, message
+ def map topic, message, stamp
message.keys.each do |key|
- emit message[key], key
+ emit message[key], key, stamp
end
end
33 lib/event_chopper/logger.rb
View
@@ -0,0 +1,33 @@
+require 'comm'
+require 'mongo'
+
+module EventChopper
+
+class Logger
+ include EventChopper
+
+ attr_reader :db
+
+ def initialize host = 'localhost', port = 27017, dbname = 'event_chopper', collection = 'log'
+ @db = Mongo::Connection.new(host, port).db(dbname)[collection]
+ end
+
+ def run
+ Comm::Consumer.new.subscribe do |topic, message|
+ db.insert({:timestamp => DateTime.now.to_time.to_i, :type => topic, :event => encode(message)})
+ end
+ end
+
+ def replay from, to, types = nil
+ query = { '$and' => [
+ {:timestamp => {'$lte' => to.to_time.to_i}},
+ {:timestamp => {'$gte' => from.to_time.to_i}}
+ ]}
+ query[:type] = {'$in' => types} unless types.nil?
+ db.find(query).each do |arg|
+ yield arg['type'], decode(arg['event']), TimeKey.from_date(Time.at(arg['timestamp']))
+ end
+ end
+end
+
+end
12 lib/event_chopper/store/ttstore.rb
View
@@ -4,21 +4,15 @@
module EventChopper
class TTStore
+ include EventChopper
+
def initialize name, host = 'localhost', port = 1978
@prefix = name
@db = TokyoTyrant::RDB::new
@db.open host, port
end
- def encode data
- Yajl::Encoder.encode data
- end
-
- def decode data
- Yajl::Parser.parse data
- end
-
- def pkey key, id = NOID
+ def pkey key, id = NOID
@prefix + ' ' + key.to_s + '|' + id.to_s
end
26 lib/event_chopper/time_key.rb
View
@@ -12,12 +12,14 @@ def self.from_date date
self.new date.strftime('%Y-%m-%d %H:') + min_tail
end
+ attr_reader :val
+
def initialize stamp
@val = stamp
end
def to_s
- @val
+ val
end
def days_in_month year, month
@@ -29,14 +31,14 @@ def days_in_month year, month
MONTH = (0..12).inject([]) {|acc, item| acc << (item).to_s.rjust(2, '0')}
def children
- if @val.size == 13
- MIN.inject([]) {|acc, item| acc << self.class.new(@val + ':' + item)}
- elsif @val.size == 10
- HOUR.inject([]) {|acc, item| acc << self.class.new(@val + ' ' + item)}.flatten
- elsif @val.size == 7
- (year, month) = @val.split /\-/, 2
+ if val.size == 13
+ MIN.inject([]) {|acc, item| acc << self.class.new(val + ':' + item)}
+ elsif val.size == 10
+ HOUR.inject([]) {|acc, item| acc << self.class.new(val + ' ' + item)}.flatten
+ elsif val.size == 7
+ (year, month) = val.split /\-/, 2
(1..(days_in_month year.to_i, month.to_i)).inject([]) do |acc, item|
- acc << self.class.new(@val + '-' + item.to_s.rjust(2, '0'))
+ acc << self.class.new(val + '-' + item.to_s.rjust(2, '0'))
end
else
[]
@@ -45,10 +47,10 @@ def children
def parents
tor = []
- tor << self.class.new(@val[0,13]) if @val.size > 13
- tor << self.class.new(@val[0,10]) if @val.size > 10
- tor << self.class.new(@val[0,7]) if @val.size > 7
- tor << self.class.new(@val[0,4]) if @val.size > 4
+ tor << self.class.new(val[0,13]) if val.size > 13
+ tor << self.class.new(val[0,10]) if val.size > 10
+ tor << self.class.new(val[0,7]) if val.size > 7
+ tor << self.class.new(val[0,4]) if val.size > 4
tor
end
end

No commit comments for this range

Something went wrong with that request. Please try again.