Skip to content

Commit

Permalink
Fiber aware ActiveRecord which works not only in FiberPool
Browse files Browse the repository at this point in the history
  • Loading branch information
prepor committed Dec 7, 2011
1 parent e1198ff commit 0887f8a
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 156 deletions.
29 changes: 23 additions & 6 deletions lib/active_record/connection_adapters/em_mysql2_adapter.rb
Expand Up @@ -2,17 +2,34 @@

# AR adapter for using a fibered mysql2 connection with EM
# This adapter should be used within Thin or Unicorn with the rack-fiber_pool middleware.
# Just update your database.yml's adapter to be 'em_mysql2'

require 'active_record/connection_adapters/abstract_adapter'
require 'active_record/connection_adapters/mysql2_adapter'
# Just update your database.yml's adapter to be 'em_mysql2', set :pool to 1 and :real_pool
# to real connection pool size.

module ActiveRecord
class Base
def self.em_mysql2_connection(config)
client = Mysql2::EM::Client.new(config.symbolize_keys)
client = EM::Synchrony::ActiveRecord::ConnectionPool.new(size: config[:real_pool]) do
conn = EM::Synchrony::ActiveRecord::Mysql2Client.new(config.symbolize_keys)
conn.open_transactions = 0
conn.acquired = 0
# From Mysql2Adapter#configure_connection
conn.query_options.merge!(:as => :array)

# By default, MySQL 'where id is null' selects the last inserted id.
# Turn this off. http://dev.rubyonrails.org/ticket/6778
variable_assignments = ['SQL_AUTO_IS_NULL=0']
encoding = config[:encoding]
variable_assignments << "NAMES '#{encoding}'" if encoding

wait_timeout = config[:wait_timeout]
wait_timeout = 2592000 unless wait_timeout.is_a?(Fixnum)
variable_assignments << "@@wait_timeout = #{wait_timeout}"

conn.query("SET #{variable_assignments.join(', ')}")
conn
end
options = [config[:host], config[:username], config[:password], config[:database], config[:port], config[:socket], 0]
ConnectionAdapters::Mysql2Adapter.new(client, logger, options, config)
EM::Synchrony::ActiveRecord::Adapter.new(client, logger, options, config)
end
end
end
136 changes: 0 additions & 136 deletions lib/active_record/patches.rb

This file was deleted.

80 changes: 79 additions & 1 deletion lib/em-synchrony/activerecord.rb
@@ -1,3 +1,81 @@
require 'active_record'
require 'active_record/connection_adapters/abstract/connection_pool'
require 'active_record/patches'
require 'active_record/connection_adapters/abstract_adapter'
require 'active_record/connection_adapters/mysql2_adapter'
require 'em-synchrony/thread'

module ActiveRecord
module ConnectionAdapters
class ConnectionPool
def connection
_fibered_mutex.synchronize do
@reserved_connections[current_connection_id] ||= checkout
end
end

def _fibered_mutex
@fibered_mutex ||= EM::Synchrony::Thread::Mutex.new
end
end
end
end

module EM::Synchrony
module ActiveRecord
class Mysql2Client < Mysql2::EM::Client
attr_accessor :open_transactions
attr_accessor :acquired
end

class Adapter < ::ActiveRecord::ConnectionAdapters::Mysql2Adapter
def configure_connection
nil
end

def transaction(*args, &blk)
@connection.execute(false) do |conn|
super
end
end

def real_connection
@connection.connection
end

def open_transactions
real_connection.open_transactions
end

def increment_open_transactions
real_connection.open_transactions += 1
end

def decrement_open_transactions
real_connection.open_transactions -= 1
end
end

class ConnectionPool < EM::Synchrony::ConnectionPool

# consider connection acquired
def execute(async)
f = Fiber.current
begin
conn = acquire(f)
conn.acquired += 1
yield conn
ensure
conn.acquired -= 1
release(f) if !async && conn.acquired == 0
end
end

# via method_missing affected_rows will be recognized as async method
def affected_rows(*args, &blk)
execute(false) do |conn|
conn.send(:affected_rows, *args, &blk)
end
end
end
end
end
6 changes: 5 additions & 1 deletion lib/em-synchrony/connection_pool.rb
Expand Up @@ -26,13 +26,17 @@ def execute(async)
end
end

def connection
acquire(Fiber.current)
end

private

# Acquire a lock on a connection and assign it to executing fiber
# - if connection is available, pass it back to the calling block
# - if pool is full, yield the current fiber until connection is available
def acquire(fiber)

return @reserved[fiber.object_id] if @reserved[fiber.object_id]
if conn = @available.pop
@reserved[fiber.object_id] = conn
conn
Expand Down

0 comments on commit 0887f8a

Please sign in to comment.