Permalink
Browse files

better mysql support for handling server has gone away, pg still to b…

…e tested
  • Loading branch information...
humanzz committed Oct 23, 2008
1 parent aecad81 commit a413092f869e421baddf95a97de86c4eb09c01dd
@@ -13,15 +13,17 @@ def adapter_name
def insert_sql(sql, name = nil, pk = nil, id_value = nil, sequence_name = nil) #:nodoc:
begin_db_transaction
super sql, name
- id_value || @connection.insert_id
+ id = id_value || @connection.insert_id
commit_db_transaction
+ id
end
def update_sql(sql, name = nil) #:nodoc:
begin_db_transaction
super
- @connection.affected_rows
+ rows = @connection.affected_rows
commit_db_transaction
+ rows
end
def begin_db_transaction
@@ -51,7 +53,7 @@ def connect
# Turn this off. http://dev.rubyonrails.org/ticket/6778
conn.query("SET SQL_AUTO_IS_NULL=0")
end
- conn.register_with_event_loop(:em)
+# conn.register_with_event_loop(:em)
conn
end
end
View
@@ -73,6 +73,18 @@ def yield *args
require 'never_block/pool/fibered_connection_pool'
module NeverBlock
+
+ # Checks if we should be working in a non-blocking mode
+ def self.neverblocking?
+ Fiber.respond_to?(:current) && Fiber.current[:neverblock]
+ end
+
+ def self.event_loop_available?
+ defined?(EM) && EM.reactor_running?
+ end
+
+ # The given block will run its queries either in blocking or non-blocking
+ # mode based on the first parameter
def self.neverblock(nb = true, &block)
status = Fiber.current[:neverblock]
Fiber.current[:neverblock] = nb
@@ -1,38 +1,33 @@
module NeverBlock
module DB
module FiberedDBConnection
+
# Attaches the connection socket to an event loop.
# Currently only supports EM, but Rev support will be
# completed soon.
- def register_with_event_loop(loop)
- @fd = socket
- @io = IO.new(socket)
- if loop == :em
- if EM.reactor_running?
- @em_connection = EM::attach(@io,EMConnectionHandler,self)
- else
- raise "EventMachine reactor not running"
- end
+ def register_with_event_loop
+ if EM.reactor_running?
+ @em_connection = EM::attach(socket,EMConnectionHandler,self)
else
- raise "Could not register with the event loop"
+ raise "EventMachine reactor not running"
end
- @loop = loop
end
# Unattaches the connection socket from the event loop
def unregister_from_event_loop
- if @loop == :em
- if @em_connection
- @em_connection.detach
- @em_connection = nil
- true
- else
- false
- end
+ if @em_connection
+ @em_connection.detach
+ @em_connection = nil
+ true
else
- raise NotImplementedError.new("unregister_from_event_loop not implemented for #{@loop}")
+ false
end
end
+
+ # Closes the connection using event loop
+ def event_loop_connection_close
+ @em_connection.close_connection if @em_connection
+ end
# The callback, this is called whenever
# there is data available at the socket
@@ -42,8 +37,6 @@ def resume_command
f = @fiber
@fiber = nil
f.resume
- else
- unregister_from_event_loop
end
end
@@ -14,50 +14,51 @@ module DB
class FiberedMysqlConnection < Mysql
include FiberedDBConnection
-
+
+ # Initializes the connection and remembers the connection params
+ def initialize(*args)
+ @connection_params = args
+ super(*@connection_params)
+ end
+
+ # Does a normal real_connect if arguments are passed. If no arguments are
+ # passed it uses the ones it remembers
+ def real_connect(*args)
+ @connection_params = args unless args.empty?
+ super(*@connection_params)
+ end
+
+ alias_method :connect, :real_connect
+
# Assuming the use of NeverBlock fiber extensions and that the exec is run in
# the context of a fiber. One that have the value :neverblock set to true.
# All neverblock IO classes check this value, setting it to false will force
# the execution in a blocking way.
def query(sql)
- begin
- if Fiber.respond_to? :current and Fiber.current[:neverblock]
+ if NB.event_loop_available? && NB.neverblocking?
+ begin
send_query sql
@fiber = Fiber.current
- Fiber.yield
- get_result
- else
- super(sql)
+ Fiber.yield register_with_event_loop
+ get_result
+ rescue Exception => e
+ if error = ['not connected', 'gone away', 'Lost connection'].detect{|msg| e.message.include? msg}
+ event_loop_connection_close
+ unregister_from_event_loop
+ connect
+ end
+ raise e
+ ensure
+ unregister_from_event_loop
end
- rescue Exception => e
- if error = ['not connected', 'gone away', 'Lost connection'].detect{|msg| e.message.include? msg}
- stop
- connect
- end
- raise e
- end
- end
-
- alias :exec :query
-
- # stop the connection and deattach from the event loop
- def stop
- unregister_from_event_loop
+ else
+ super(sql)
+ end
end
- # reconnect and attach to the event loop
- def connect
- super
- register_with_event_loop(@loop)
- end
+ alias_method :exec, :query
- # unregisters from the event loop and closes the connection
- def close
- unregister_from_event_loop
- super
- end
-
- end #FiberedMySQLConnection
+ end #FiberedMySQLConnection
end #DB
@@ -21,11 +21,13 @@ class FiberedPostgresConnection < PGconn
# All neverblock IO classes check this value, setting it to false will force
# the execution in a blocking way.
def exec(sql)
- begin
- if Fiber.respond_to? :current and Fiber.current[:neverblock]
+ # TODO Still not "killing the query process"-proof
+ # In some cases, the query is simply sent but the fiber never yields
+ if NB.event_loop_available? && NB.neverblocking?
+ begin
send_query sql
- @fiber = Fiber.current
- Fiber.yield
+ @fiber = Fiber.current
+ Fiber.yield register_with_event_loop
while is_busy
consume_input
Fiber.yield if is_busy
@@ -35,26 +37,23 @@ def exec(sql)
res = self.get_result
data << res unless res.nil?
end
- data.last
- else
- super(sql)
+ data.last
+ rescue Exception => e
+ if error = ['not connected', 'gone away', 'Lost connection','no connection'].detect{|msg| e.message.include? msg}
+ #event_loop_connection_close
+ unregister_from_event_loop
+ reset
+ end
+ raise e
+ ensure
+ unregister_from_event_loop
end
- rescue Exception => e
- reset if e.message.include? "not connected"
- raise e
- end
+ else
+ super(sql)
+ end
end
- alias :query :exec
-
- # reset the connection
- # and reattach to the
- # event loop
- def reset
- unregister_from_event_loop
- super
- register_with_event_loop(@loop)
- end
+ alias_method :query, :exec
end #FiberedPostgresConnection
@@ -46,7 +46,7 @@ def commit_db_transaction
end
end
- #close all connections and remove them from the event loop
+ #closes all connections
def close
@pool.all_connections do |conn|
conn.close
@@ -1,12 +1,3 @@
-# we need Fiber.current
-# so we must require
-# fiber
-# Author:: Mohammad A. Ali (mailto:oldmoe@gmail.com)
-# Copyright:: Copyright (c) 2008 eSpace, Inc.
-# License:: Distributes under the same terms as Ruby
-
-#require 'fiber'
-
module NeverBlock
module Pool
@@ -40,15 +31,17 @@ class FiberPool
# every time. Once a fiber is done with its block, it attempts to fetch
# another one from the queue
def initialize(count = 50)
- @fibers,@queue = [],[]
+ @fibers,@busy_fibers,@queue = [],{},[]
count.times do |i|
fiber = Fiber.new do |block|
loop do
block.call
unless @queue.empty?
block = @queue.shift
else
- block = Fiber.yield @fibers << Fiber.current
+ @busy_fibers.delete(Fiber.current.object_id)
+ @fibers << Fiber.current
+ block = Fiber.yield
end
end
end
@@ -61,6 +54,7 @@ def initialize(count = 50)
# in a queue
def spawn(evented = true, &block)
if fiber = @fibers.shift
+ @busy_fibers[fiber.object_id] = fiber
fiber[:neverblock] = evented
fiber.resume(block)
else
View
@@ -1,4 +1,5 @@
$:.unshift File.expand_path(File.dirname(__FILE__))
-
+require 'neverblock'
+require 'never_block/db/fibered_db_connection'
+require 'never_block/db/pooled_db_connection'
require 'never_block/db/fibered_postgres_connection'
-require 'never_block/db/pooled_fibered_postgres_connection'

0 comments on commit a413092

Please sign in to comment.