Skip to content

Commit

Permalink
better error handling for connections
Browse files Browse the repository at this point in the history
  • Loading branch information
oldmoe committed Sep 11, 2008
1 parent aba3b18 commit 537a9bb
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 45 deletions.
49 changes: 35 additions & 14 deletions lib/never_block/db/fibered_mysql_connection.rb
@@ -1,4 +1,4 @@
require 'mysql' require 'mysqlplus'


module NeverBlock module NeverBlock


Expand All @@ -18,27 +18,48 @@ class FiberedMysqlConnection < Mysql
# Creates a new mysql connection, sets it # Creates a new mysql connection, sets it
# to nonblocking and wraps the descriptor in an IO # to nonblocking and wraps the descriptor in an IO
# object. # object.
def real_connect(*args) def self.real_connect(*args)
super(*args) me = super(*args)
@fd = socket me.init_descriptor
@io = IO.new(socket) me
end end
#alias :real_connect :initialize #alias :real_connect :initialize
#alias :connect :initialize #alias :connect :initialize

def init_descriptor
@fd = socket
@io = IO.new(socket)
end


# Assuming the use of NeverBlock fiber extensions and that the exec is run in # Assuming the use of NeverBlock fiber extensions and that the exec is run in
# the context of a fiber. One that have the value :neverblock set to true. # the context of a fiber. One that have the value :neverblock set to true.
# All neverblock IO classes check this value, setting it to false will force # All neverblock IO classes check this value, setting it to false will force
# the execution in a blocking way. # the execution in a blocking way.
def query(sql) def query(sql)
if Fiber.respond_to? :current and Fiber.current[:neverblock] begin
send_query sql if Fiber.respond_to? :current and Fiber.current[:neverblock]
@fiber = Fiber.current send_query sql
Fiber.yield @fiber = Fiber.current
else Fiber.yield
super(sql) get_result
else
super(sql)
end
rescue Exception => e
reconnect if e.msg.include? "not connected"
raise e
end end
end end

# reset the connection
# and reattach to the
# event loop
def reconnect
unregister_from_event_loop
super
init_descriptor
register_with_event_loop(@loop)
end


# Attaches the connection socket to an event loop. # Attaches the connection socket to an event loop.
# Currently only supports EM, but Rev support will be # Currently only supports EM, but Rev support will be
Expand Down Expand Up @@ -73,8 +94,8 @@ def unregister_from_event_loop


# The callback, this is called whenever # The callback, this is called whenever
# there is data available at the socket # there is data available at the socket
def process_command def resume_command
@fiber.resume get_result @fiber.resume
end end


end #FiberedPostgresConnection end #FiberedPostgresConnection
Expand All @@ -86,7 +107,7 @@ def initialize connection
@connection = connection @connection = connection
end end
def notify_readable def notify_readable
@connection.process_command @connection.resume_command
end end
end end


Expand Down
65 changes: 41 additions & 24 deletions lib/never_block/db/fibered_postgres_connection.rb
Expand Up @@ -20,24 +20,52 @@ class FiberedPostgresConnection < PGconn
# object. # object.
def initialize(*args) def initialize(*args)
super(*args) super(*args)
@fd = socket init_descriptor
@io = IO.new(socket)
#setnonblocking(true) #setnonblocking(true)
end end


def init_descriptor
@fd = socket
@io = IO.new(socket)
end
# Assuming the use of NeverBlock fiber extensions and that the exec is run in # Assuming the use of NeverBlock fiber extensions and that the exec is run in
# the context of a fiber. One that have the value :neverblock set to true. # the context of a fiber. One that have the value :neverblock set to true.
# All neverblock IO classes check this value, setting it to false will force # All neverblock IO classes check this value, setting it to false will force
# the execution in a blocking way. # the execution in a blocking way.
def exec(sql) def exec(sql)
if Fiber.respond_to? :current and Fiber.current[:neverblock] begin
self.send_query sql if Fiber.respond_to? :current and Fiber.current[:neverblock]
@fiber = Fiber.current send_query sql
Fiber.yield @fiber = Fiber.current
else Fiber.yield
super(sql) while is_busy
consume_input
Fiber.yield if is_busy
end
res, data = 0, []
while res != nil
res = self.get_result
data << res unless res.nil?
end
data.last
else
super(sql)
end
rescue Exception => e
reset if e.msg.include? "not connected"
raise e
end end
end end

# reset the connection
# and reattach to the
# event loop
def reset
unregister_from_event_loop
super
init_descriptor
register_with_event_loop(@loop)
end


# Attaches the connection socket to an event loop. # Attaches the connection socket to an event loop.
# Currently only supports EM, but Rev support will be # Currently only supports EM, but Rev support will be
Expand Down Expand Up @@ -73,20 +101,9 @@ def unregister_from_event_loop


# The callback, this is called whenever # The callback, this is called whenever
# there is data available at the socket # there is data available at the socket
def process_command def resume_command
# make sure all commands are sent #let the fiber continue its work
# before attempting to read @fiber.resume
#return unless self.flush
self.consume_input
unless is_busy
res, data = 0, []
while res != nil
res = self.get_result
data << res unless res.nil?
end
#let the fiber continue its work
@fiber.resume(data.last)
end
end end


end #FiberedPostgresConnection end #FiberedPostgresConnection
Expand All @@ -98,7 +115,7 @@ def initialize connection
@connection = connection @connection = connection
end end
def notify_readable def notify_readable
@connection.process_command @connection.resume_command
end end
end end


Expand Down
16 changes: 10 additions & 6 deletions test/test_mysql.rb
Expand Up @@ -2,22 +2,26 @@
require 'neverblock' require 'neverblock'
require 'neverblock-mysql' require 'neverblock-mysql'


@count = 10 class Mysql
attr_accessor :fiber
end

@count = 100
@connections = {} @connections = {}
@fpool = NB::Pool::FiberPool.new(@count) @fpool = NB::Pool::FiberPool.new(@count)
@cpool = NB::Pool::FiberedConnectionPool.new(size:@count, eager:true) do @cpool = NB::Pool::FiberedConnectionPool.new(size:@count, eager:true) do
c = Mysql.real_connect('localhost','root',nil) c = NB::DB::FiberedMysqlConnection.real_connect('localhost','root',nil)
@connections[IO.new(c.socket)] = c @connections[c.io] = c
c c
end end


@break = false @break = false
@done = 0 @done = 0
@t = Time.now @t = Time.now
@count.times do @count.times do
@fpool.spawn(false) do @fpool.spawn do
@cpool.hold do |conn| @cpool.hold do |conn|
conn.query('select sleep(1)').each{|r| r} conn.query('select sleep(1) as sleep').each{|r|p r}
@done = @done + 1 @done = @done + 1
puts "done in #{Time.now - @t}" if @done == @count puts "done in #{Time.now - @t}" if @done == @count
end end
Expand All @@ -27,6 +31,6 @@
loop do loop do
res = select(@sockets,nil,nil,nil) res = select(@sockets,nil,nil,nil)
if res if res
res.first.each{|c|@connections[c].process_command} res.first.each{|s|@connections[s].resume_command}
end end
end end
2 changes: 1 addition & 1 deletion test/test_pg.rb
Expand Up @@ -97,6 +97,6 @@ def stop_loop
run_evented run_evented
loop do loop do
res = select($sockets,nil,nil,nil) res = select($sockets,nil,nil,nil)
res.first.each{ |s|$connections[s].process_command } if res res.first.each{ |s|$connections[s].resume_command } if res
break if $done break if $done
end end

0 comments on commit 537a9bb

Please sign in to comment.