Permalink
Browse files

fiber work

  • Loading branch information...
1 parent 8a9523b commit b5cc81cc64611f83a076e32580d8f720146be588 @dj2 dj2 committed Jun 7, 2011
Showing with 19 additions and 12 deletions.
  1. +19 −12 lib/em-jack/connection.rb
View
@@ -30,6 +30,7 @@ def initialize(opts = {})
@data = ""
@retries = 0
@in_reserve = false
+ @fiberized = false
@conn = EM::connect(host, port, EMJack::BeanstalkConnection) do |conn|
conn.client = self
@@ -42,7 +43,7 @@ def initialize(opts = {})
end
def reset_tube_state
- prev_used = @used_tube
+ prev_used = @used_tube
prev_watched = @watched_tubes.dup if @watched_tubes
@used_tube = 'default'
@@ -53,16 +54,15 @@ def reset_tube_state
end
def fiber!
- eigen = (class << self
- self
- end)
+ @fiberized = true
+
+ eigen = (class << self; self; end)
eigen.instance_eval do
%w(use reserve ignore watch peek stats list delete touch bury kick pause release put).each do |meth|
alias_method :"a#{meth}", meth.to_sym
define_method(meth.to_sym) do |*args|
fib = Fiber.current
ameth = :"a#{meth}"
- p [ameth, *args]
proc = lambda { |*result| fib.resume(*result) }
send(ameth, *args, &proc)
Fiber.yield
@@ -240,7 +240,7 @@ def connected
def disconnected
d = @deferrables.dup
-
+p 'disconnected'
## if reconnecting, need to fail ourself to remove any callbacks
fail
@@ -256,26 +256,33 @@ def disconnected
end
prev_used, prev_watched = reset_tube_state
- @reconnect_proc = Proc.new { reconnect(prev_used, prev_watched) } unless @reconnect_proc
+ unless @reconnect_proc
+ recon = Proc.new { reconnect(prev_used, prev_watched) }
+ if @fiberized
+ @reconnect_proc = Proc.new { Fiber.new { recon.call }.resume }
+ else
+ @reconnect_proc = recon
+ end
+ end
@retries += 1
EM.add_timer(5) { @reconnect_proc.call }
end
-
+
def reconnect(prev_used, prev_watched)
@conn.reconnect(@host, @port)
use(prev_used) if prev_used
- [ prev_watched ].flatten.compact.each do |tube|
- watch(tube)
+
+ [prev_watched].flatten.compact.each do |tube|
+ @fiberized ? awatch(tube) : watch(tube)
end
end
def reconnect!
@retries = 0
prev_used, prev_watched = reset_tube_state
-
EM.next_tick { reconnect(prev_used, prev_watched) }
end
@@ -294,7 +301,7 @@ def add_deferrable(&blk)
def on_error(&blk)
@error_callback = blk
end
-
+
def on_disconnect(&blk)
@disconnected_callback = blk
end

0 comments on commit b5cc81c

Please sign in to comment.