Permalink
Browse files

callback/errback + multi-line response parsing

  • Loading branch information...
1 parent ec5d3b6 commit 87ff06e6f192efa5ae19de17e6e381a2c81de1b5 @igrigorik committed Jan 14, 2011
Showing with 62 additions and 27 deletions.
  1. +44 −15 lib/em-handlersocket/client.rb
  2. +18 −12 spec/client_spec.rb
@@ -1,7 +1,36 @@
module EventMachine
module HandlerSocket
- class Client < EventMachine::Connection
+ class Deferrable < EM::DefaultDeferrable
+ attr_accessor :lines, :buffer
+
+ def initialize(lines)
+ @lines = lines
+ @buffer = []
+ end
+
+ def recieve(line)
+ status, cols, data = line.chomp.split("\t")
+ @buffer.push data
+
+ # non-zero response code indicates error
+ if status.to_i != 0
+ fail
+ return true
+ end
+
+ done?
+ end
+
+ # non zero response code, or we've processed all lines
+ def done?; (@buffer.size == @lines); end
+
+ def succeed
+ super(@buffer)
+ end
+ end
+
+ class Client < EventMachine::Connection
include EventMachine::Deferrable
include EventMachine::Protocols::LineProtocol
@@ -13,33 +42,33 @@ def connection_completed
succeed
end
- def execute(*cmd, &blk)
- callback { send(cmd) }
- add_deferrable(&blk)
- end
-
def open_index(opts)
- execute(['P', opts[:id], opts[:db], opts[:table], opts[:index_name], opts[:columns]])
+ execute([['P', opts[:id], opts[:db], opts[:table], opts[:index_name], opts[:columns]]])
end
def query(*queries)
- queries = queries.map{|q| [q[:id], q[:op], 1, q[:key], q[:limit], q[:offset]].compact.join("\t") }.join("\n")
- execute(queries)
+ execute(queries.map{|q| [q[:id], q[:op], 1, q[:key], q[:limit], q[:offset]].compact })
+ end
+
+ def execute(cmd, &blk)
+ callback { send(cmd) }
+ add_deferrable(cmd.size, &blk)
end
private
def send(data)
- data = data.is_a?(String) ? data : data.join("\t") + "\n"
- send_data data
+ send_data data.map {|d| d.join("\t")}.join("\n") + "\n"
end
def receive_line(line)
- @deferrables.shift.succeed(line.chomp.split("\t"))
+ if @deferrables.first.recieve(line)
+ @deferrables.shift.succeed
+ end
end
- def add_deferrable(&blk)
- df = EM::DefaultDeferrable.new
+ def add_deferrable(lines, &blk)
+ df = Deferrable.new(lines)
df.callback &blk
@deferrables.push(df)
@@ -48,4 +77,4 @@ def add_deferrable(&blk)
end
end
-end
+end
View
@@ -38,11 +38,19 @@
EM.run {
c = EM::HandlerSocket.new
- df = c.execute(['P', '0', 'widgets', 'user', 'PRIMARY', 'user_name,user_email,created'])
- df.callback {|r|
- r.should == ['0', '1']
- EM.stop
- }
+ df = c.execute([['P', '0', 'widgets', 'user', 'PRIMARY', 'user_name,user_email,created']])
+ df.callback { EM.stop }
+ df.errback { fail }
+ }
+ end
+
+ it "should invoke errback on bad query" do
+ EM.run {
+ c = EM::HandlerSocket.new
+
+ df = c.execute([['P', '0', 'badDB', 'user', 'PRIMARY', 'user_name,user_email,created']])
+ df.callback { fail }
+ df.errback { EM.stop }
}
end
@@ -64,20 +72,18 @@
idx = {:id => 0, :db => 'widgets', :table => 'user', :index_name => 'PRIMARY', :columns => 'user_name'}
d = c.open_index(idx)
- d.callback do |s|
+ d.callback do
d = c.query(:id => 0, :op => '=', :key => '1')
d.callback do |data|
- data.last.should == 'Ilya'
+ data.should == ['Ilya']
EM.stop
end
end
}
end
it "should fetch multiple records" do
- pending("trickier one.. response is returned on multiple lines, requires DF accounting")
-
EM.run {
c = EM::HandlerSocket.new
idx = {:id => 0, :db => 'widgets', :table => 'user', :index_name => 'PRIMARY', :columns => 'user_name'}
@@ -86,13 +92,13 @@
d.callback do |s|
d = c.query({:id => 0, :op => '=', :key => '1'}, {:id => 0, :op => '=', :key => '2'})
+ d.errback { fail }
d.callback do |data|
- p data
+ data.should == ['Ilya', 'John']
EM.stop
end
end
}
end
-
-end
+end

0 comments on commit 87ff06e

Please sign in to comment.