Skip to content
Browse files

reconnect logic + behave as regular / raw mysql connection

  • Loading branch information...
1 parent 14c5ed9 commit 0eab4839a2d0abce6fa2f881f3dc51755472fae6 @igrigorik committed Feb 15, 2010
Showing with 70 additions and 176 deletions.
  1. +12 −6 lib/em-mysql/connection.rb
  2. +10 −4 lib/em-mysql/mysql.rb
  3. +48 −107 spec/mysql_spec.rb
  4. +0 −59 spec/old-test.rb
View
18 lib/em-mysql/connection.rb
@@ -1,3 +1,10 @@
+
+class Mysql
+ def result
+ @cur_result
+ end
+end
+
module EventMachine
class MySQLConnection < EventMachine::Connection
@@ -12,7 +19,8 @@ class MySQLConnection < EventMachine::Connection
'Lost connection to MySQL server during query'
] unless defined? DisconnectErrors
- def initialize(mysql, opts)
+ def initialize(mysql, opts, conn)
+ @conn = conn
@mysql = mysql
@fd = mysql.socket
@opts = opts
@@ -63,8 +71,6 @@ def notify_readable
end
def unbind
- @connected = false
-
# wait for the next tick until the current fd is removed completely from the reactor
#
# in certain cases the new FD# (@mysql.socket) is the same as the old, since FDs are re-used
@@ -73,14 +79,14 @@ def unbind
# do _NOT_ use EM.next_tick here. if a bunch of sockets disconnect at the same time, we want
# reconnects to happen after all the unbinds have been processed
- # TODO: reconnect logic will / is broken with new code structure
+ @connected = false
EM.add_timer(0) do
@processing = false
- @mysql = EventedMysql._connect @opts
+ @mysql = @conn.connect_socket(@opts)
@fd = @mysql.socket
- @signature = EM.attach_fd @mysql.socket, true
+ @signature = EM.attach_fd(@mysql.socket, true)
EM.set_notify_readable @signature, true
EM.instance_variable_get('@conns')[@signature] = self
@connected = true
View
14 lib/em-mysql/mysql.rb
@@ -14,15 +14,15 @@ def initialize(opts)
raise RuntimeError, 'mysqlplus and EM.watch are required for EventedMysql'
end
- @settings = { :debug => true }.merge!(opts)
+ @settings = { :debug => false }.merge!(opts)
@connection = connect(@settings)
end
def close
@connection.close
end
- def execute(sql, &blk)
+ def query(sql, &blk)
df = EventMachine::DefaultDeferrable.new
cb = blk || Proc.new { |r| df.succeed(r) }
eb = Proc.new { |r| df.fail(r) }
@@ -31,13 +31,19 @@ def execute(sql, &blk)
df
end
+ alias :real_query :query
- private
+ # behave as a normal mysql connection
+ def method_missing(method, *args, &block)
+ if @connection.respond_to? method
+ @connection.send(method, args)
+ end
+ end
def connect(opts)
if conn = connect_socket(opts)
debug [:connect, conn.socket, opts]
- EM.watch(conn.socket, EventMachine::MySQLConnection, conn, opts)
+ EM.watch(conn.socket, EventMachine::MySQLConnection, conn, opts, self)
else
# invokes :errback callback in opts before firing again
debug [:reconnect]
View
155 spec/mysql_spec.rb
@@ -32,135 +32,76 @@
it "should execute sql" do
EventMachine.run {
conn = EventMachine::MySQL.new(:host => 'localhost')
- query = conn.execute("select 1")
+ query = conn.query("select 1")
query.callback { |res|
- p res
+ res.fetch_row.first.should == "1"
EventMachine.stop
}
-
- # EventMachine.stop
}
end
it "should accept block as query callback" do
EventMachine.run {
conn = EventMachine::MySQL.new(:host => 'localhost')
- conn.execute("select 1") { |res|
- p res
+ conn.query("select 1") { |res|
+ res.fetch_row.first.should == "1"
EventMachine.stop
}
}
end
- # it "should reconnect when disconnected"
-
- # it "run select queries and return results"
- # it "queue up queries and execute them in order"
- # it "have raw mode which yields the mysql object"
- # it "allow custom error callbacks for each query"
-end
-
-
-
-__END__
-
-EM.describe EventedMysql, 'individual connections' do
-
- should 'create a new connection' do
- @mysql = EventedMysql.connect :host => '127.0.0.1',
- :port => 3306,
- :database => 'test',
- :logging => false
-
- @mysql.class.should == EventedMysql
- done
- end
-
- should 'connect to another host if the first one is not accepting connection' do
- @mysql = EventedMysql.connect({:host => 'unconnected.host',
- :port => 3306,
- :database => 'test',
- :logging => false},
- { :host => '127.0.0.1',
- :port => 3306,
- :database => 'test',
- :logging => false })
-
- @mysql.class.should == EventedMysql
- done
-
- end
-
-
- should 'execute sql' do
- start = Time.now
-
- @mysql.execute('select sleep(0.2)'){
- (Time.now-start).should.be.close 0.2, 0.1
- done
- }
- end
- should 'reconnect when disconnected' do
- @mysql.close
- @mysql.execute('select 1+2'){
- :connected.should == :connected
- done
- }
- end
-
- # to test, run:
- # mysqladmin5 -u root kill `mysqladmin5 -u root processlist | grep "select sleep(5)+1" | cut -d'|' -f2`
- #
- # should 're-run query if disconnected during query' do
- # @mysql.execute('select sleep(5)+1', :select){ |res|
- # res.first['sleep(5)+1'].should == '1'
- # done
- # }
- # end
-
- should 'run select queries and return results' do
- @mysql.execute('select 1+2', :select){ |res|
- res.size.should == 1
- res.first['1+2'].should == '3'
- done
+ it "allow custom error callbacks for each query" do
+ EventMachine.run {
+ conn = EventMachine::MySQL.new(:host => 'localhost')
+ query = conn.query("select 1 from")
+ query.errback { |res|
+ res.class.should == Mysql::Error
+ EventMachine.stop
+ }
}
end
- should 'queue up queries and execute them in order' do
- @mysql.execute('select 1+2', :select)
- @mysql.execute('select 2+3', :select)
- @mysql.execute('select 3+4', :select){ |res|
- res.first['3+4'].should == '7'
- done
- }
- end
+ it "queue up queries and execute them in order" do
+ EventMachine.run {
+ conn = EventMachine::MySQL.new(:host => 'localhost')
- should 'continue processing queries after hitting an error' do
- @mysql.settings.update :on_error => proc{|e|}
+ results = []
+ conn.query("select 1") {|res| results.push res.fetch_row.first.to_i}
+ conn.query("select 2") {|res| results.push res.fetch_row.first.to_i}
+ conn.query("select 3") {|res| results.push res.fetch_row.first.to_i}
- @mysql.execute('select 1+ from table'){}
- @mysql.execute('select 1+1 as num', :select){ |res|
- res[0]['num'].should == '2'
- done
+ EventMachine.add_timer(0.05) {
+ results.should == [1,2,3]
+ EventMachine.stop
+ }
}
end
+
+ it "should continue processing queries after hitting an error" do
+ EventMachine.run {
+ conn = EventMachine::MySQL.new(:host => 'localhost')
- should 'have raw mode which yields the mysql object' do
- @mysql.execute('select 1+2 as num', :raw){ |mysql|
- mysql.should.is_a? Mysql
- mysql.result.all_hashes.should == [{'num' => '3'}]
- done
+ conn.query("select 1+ from table")
+ conn.query("select 1+1") { |res|
+ res.fetch_row.first.to_i.should == 2
+ EventMachine.stop
+ }
}
end
- should 'allow custom error callbacks for each query' do
- @mysql.settings.update :on_error => proc{ should.flunk('default errback invoked') }
-
- @mysql.execute('select 1+ from table', :select, proc{
- should.flunk('callback invoked')
- }, proc{ |e|
- done
- })
- end
-
-end
+ # it "should reconnect when disconnected" do
+ # # to test, run:
+ # # mysqladmin5 -u root kill `mysqladmin -u root processlist | grep "select sleep(5)" | cut -d'|' -f2`
+ #
+ # EventMachine.run {
+ # conn = EventMachine::MySQL.new(:host => 'localhost')
+ #
+ # query = conn.query("select sleep(5)")
+ # query.callback {|res|
+ # res.fetch_row.first.to_i.should == 0
+ # EventMachine.stop
+ # }
+ # }
+ # end
+
+end
View
59 spec/old-test.rb
@@ -1,59 +0,0 @@
-require 'lib/em/mysql'
-
-# EM.kqueue
-# EM.epoll
-EM.run{
- EM.start_server '127.0.0.1', 12345 do |c|
- def c.receive_data data
- p 'sending http response'
- send_data "hello"
- close_connection_after_writing
- end
- end
-
- SQL = EventedMysql
- def SQL(query, &blk) SQL.select(query, &blk) end
-
- if false
-
- SQL.settings.update :logging => true,
- :database => 'test',
- :connections => 1
-
- SQL.execute('select 1+2')
-
- EM.add_timer(1){
- 3.times do SQL.select('select sleep(0.5)+1'){|r| p(r) } end
- }
-
- elsif false
-
- SQL.settings.update :logging => true,
- :database => 'test',
- :connections => 10
-
- EM.add_timer(2.5){ SQL.all('use test') }
-
- else
-
- SQL.settings.update :logging => true,
- :database => 'test',
- :connections => 10,
- :timeout => 1
-
- n = 0
-
- SQL.execute('drop table if exists testingabc'){
- SQL.execute('create table testingabc (a int, b int, c int)'){
- EM.add_periodic_timer(0.2) do
- cur_num = n+=1
- SQL.execute("insert into testingabc values (1,2,#{cur_num})"){
- SQL("select * from testingabc where c = #{cur_num} limit 1"){ |res| puts;puts }
- }
- end
- }
- }
-
- end
-
-}

0 comments on commit 0eab483

Please sign in to comment.
Something went wrong with that request. Please try again.