Permalink
Browse files

updated to use blpop *args to listen to multiple keys at once with no…

… threads. hard to believe it got smaller but is still a very usefull primitive abstraction
  • Loading branch information...
1 parent 853ac1f commit 86a9b7c6049ff8bb55c4ab603479fd67399d891a Ezra Zygmuntowicz committed Jan 3, 2010
Showing with 37 additions and 15 deletions.
  1. +35 −3 lib/redactor.rb
  2. +2 −12 lib/redactor/actor.rb
View
@@ -3,15 +3,47 @@
require 'json'
require 'redactor/actor'
+
module RedActor
class << self
- attr_accessor :redis_options, :threads
+ attr_accessor :queues, :redis
+
+ def get_queues(timeout=15)
+ RedActor.queues.keys << timeout
+ end
def run(opts={})
opts[:timeout] ||= 15
- RedActor.redis_options = opts
- RedActor.threads.each {|t| t.join }
+ redis = Redis.new(opts)
+ RedActor.redis = redis
+ loop do
+ queue, msg = redis.blpop(*get_queues(opts[:timeout]))
+ if queue && msg
+ RedActor.queues[queue].new(redis).__send__("receive_#{queue}", msg)
+ end
+ end
end
end
end
+if __FILE__ == $0
+
+ class Foo < RedActor::Actor
+
+ mailbox :foo
+ mailbox :bar
+
+ def receive_foo(msg)
+ p [:receive_foo, msg]
+ send_msg 'bar', msg + ' world'
+ end
+
+ def receive_bar(msg)
+ p [:receive_bar, msg]
+ end
+
+ end
+
+
+ RedActor.run
+end
View
@@ -3,18 +3,8 @@ module RedActor
class Actor
class << self
def mailbox(queue)
- thread = Thread.new do
- sleep 0.1 until RedActor.redis_options
- redis = Redis.new(RedActor.redis_options)
- loop do
- msg = redis.blpop(queue, 10)
- if msg
- self.new(redis).__send__("receive_#{queue}", msg)
- end
- end
- end
- RedActor.threads ||= []
- RedActor.threads << thread
+ RedActor.queues ||= {}
+ RedActor.queues[queue.to_s] = self
end
end

0 comments on commit 86a9b7c

Please sign in to comment.