From 0887f8ad8da9ca87f7afa436ac4e212e56aec3fe Mon Sep 17 00:00:00 2001 From: Andrew rudenko Date: Wed, 7 Dec 2011 14:32:31 +0400 Subject: [PATCH] Fiber aware ActiveRecord which works not only in FiberPool --- .../connection_adapters/em_mysql2_adapter.rb | 29 +++- lib/active_record/patches.rb | 136 ------------------ lib/em-synchrony/activerecord.rb | 80 ++++++++++- lib/em-synchrony/connection_pool.rb | 6 +- spec/activerecord_spec.rb | 85 +++++++++-- 5 files changed, 180 insertions(+), 156 deletions(-) delete mode 100644 lib/active_record/patches.rb diff --git a/lib/active_record/connection_adapters/em_mysql2_adapter.rb b/lib/active_record/connection_adapters/em_mysql2_adapter.rb index 0f2c67f..272a256 100644 --- a/lib/active_record/connection_adapters/em_mysql2_adapter.rb +++ b/lib/active_record/connection_adapters/em_mysql2_adapter.rb @@ -2,17 +2,34 @@ # AR adapter for using a fibered mysql2 connection with EM # This adapter should be used within Thin or Unicorn with the rack-fiber_pool middleware. -# Just update your database.yml's adapter to be 'em_mysql2' - -require 'active_record/connection_adapters/abstract_adapter' -require 'active_record/connection_adapters/mysql2_adapter' +# Just update your database.yml's adapter to be 'em_mysql2', set :pool to 1 and :real_pool +# to real connection pool size. module ActiveRecord class Base def self.em_mysql2_connection(config) - client = Mysql2::EM::Client.new(config.symbolize_keys) + client = EM::Synchrony::ActiveRecord::ConnectionPool.new(size: config[:real_pool]) do + conn = EM::Synchrony::ActiveRecord::Mysql2Client.new(config.symbolize_keys) + conn.open_transactions = 0 + conn.acquired = 0 + # From Mysql2Adapter#configure_connection + conn.query_options.merge!(:as => :array) + + # By default, MySQL 'where id is null' selects the last inserted id. + # Turn this off. http://dev.rubyonrails.org/ticket/6778 + variable_assignments = ['SQL_AUTO_IS_NULL=0'] + encoding = config[:encoding] + variable_assignments << "NAMES '#{encoding}'" if encoding + + wait_timeout = config[:wait_timeout] + wait_timeout = 2592000 unless wait_timeout.is_a?(Fixnum) + variable_assignments << "@@wait_timeout = #{wait_timeout}" + + conn.query("SET #{variable_assignments.join(', ')}") + conn + end options = [config[:host], config[:username], config[:password], config[:database], config[:port], config[:socket], 0] - ConnectionAdapters::Mysql2Adapter.new(client, logger, options, config) + EM::Synchrony::ActiveRecord::Adapter.new(client, logger, options, config) end end end diff --git a/lib/active_record/patches.rb b/lib/active_record/patches.rb deleted file mode 100644 index d5eaf95..0000000 --- a/lib/active_record/patches.rb +++ /dev/null @@ -1,136 +0,0 @@ -module ActiveRecord - module ConnectionAdapters - - def self.fiber_pools - @fiber_pools ||= [] - end - def self.register_fiber_pool(fp) - fiber_pools << fp - end - - class FiberedMonitor - class Queue - def initialize - @queue = [] - end - - def wait(timeout) - t = timeout || 5 - fiber = Fiber.current - x = EM::Timer.new(t) do - @queue.delete(fiber) - fiber.resume(false) - end - @queue << fiber - Fiber.yield.tap do - x.cancel - end - end - - def signal - fiber = @queue.pop - fiber.resume(true) if fiber - end - end - - def synchronize - yield - end - - def new_cond - Queue.new - end - end - - # ActiveRecord's connection pool is based on threads. Since we are working - # with EM and a single thread, multiple fiber design, we need to provide - # our own connection pool that keys off of Fiber.current so that different - # fibers running in the same thread don't try to use the same connection. - class ConnectionPool - def initialize(spec) - @spec = spec - - # The cache of reserved connections mapped to threads - @reserved_connections = {} - - # The mutex used to synchronize pool access - @connection_mutex = FiberedMonitor.new - @queue = @connection_mutex.new_cond - - # default 5 second timeout unless on ruby 1.9 - @timeout = spec.config[:wait_timeout] || 5 - - # default max pool size to 5 - @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 - - @connections = [] - @checked_out = [] - @automatic_reconnect = true - @tables = {} - - @columns = Hash.new do |h, table_name| - h[table_name] = with_connection do |conn| - - # Fetch a list of columns - conn.columns(table_name, "#{table_name} Columns").tap do |columns| - - # set primary key information - columns.each do |column| - column.primary = column.name == primary_keys[table_name] - end - end - end - end - - @columns_hash = Hash.new do |h, table_name| - h[table_name] = Hash[columns[table_name].map { |col| - [col.name, col] - }] - end - - @column_defaults = Hash.new do |h, table_name| - h[table_name] = Hash[columns[table_name].map { |col| - [col.name, col.default] - }] - end - - @primary_keys = Hash.new do |h, table_name| - h[table_name] = with_connection do |conn| - table_exists?(table_name) ? conn.primary_key(table_name) : 'id' - end - end - end - - def clear_stale_cached_connections! - cache = @reserved_connections - keys = Set.new(cache.keys) - - ActiveRecord::ConnectionAdapters.fiber_pools.each do |pool| - pool.busy_fibers.each_pair do |object_id, fiber| - keys.delete(object_id) - end - end - - keys.each do |key| - next unless cache.has_key?(key) - checkin cache[key] - cache.delete(key) - end - end - - private - - def current_connection_id #:nodoc: - Fiber.current.object_id - end - - def checkout_and_verify(c) - @checked_out << c - c.run_callbacks :checkout - c.verify! - c - end - end - - end -end diff --git a/lib/em-synchrony/activerecord.rb b/lib/em-synchrony/activerecord.rb index 2735de0..6ff55a9 100644 --- a/lib/em-synchrony/activerecord.rb +++ b/lib/em-synchrony/activerecord.rb @@ -1,3 +1,81 @@ require 'active_record' require 'active_record/connection_adapters/abstract/connection_pool' -require 'active_record/patches' +require 'active_record/connection_adapters/abstract_adapter' +require 'active_record/connection_adapters/mysql2_adapter' +require 'em-synchrony/thread' + +module ActiveRecord + module ConnectionAdapters + class ConnectionPool + def connection + _fibered_mutex.synchronize do + @reserved_connections[current_connection_id] ||= checkout + end + end + + def _fibered_mutex + @fibered_mutex ||= EM::Synchrony::Thread::Mutex.new + end + end + end +end + +module EM::Synchrony + module ActiveRecord + class Mysql2Client < Mysql2::EM::Client + attr_accessor :open_transactions + attr_accessor :acquired + end + + class Adapter < ::ActiveRecord::ConnectionAdapters::Mysql2Adapter + def configure_connection + nil + end + + def transaction(*args, &blk) + @connection.execute(false) do |conn| + super + end + end + + def real_connection + @connection.connection + end + + def open_transactions + real_connection.open_transactions + end + + def increment_open_transactions + real_connection.open_transactions += 1 + end + + def decrement_open_transactions + real_connection.open_transactions -= 1 + end + end + + class ConnectionPool < EM::Synchrony::ConnectionPool + + # consider connection acquired + def execute(async) + f = Fiber.current + begin + conn = acquire(f) + conn.acquired += 1 + yield conn + ensure + conn.acquired -= 1 + release(f) if !async && conn.acquired == 0 + end + end + + # via method_missing affected_rows will be recognized as async method + def affected_rows(*args, &blk) + execute(false) do |conn| + conn.send(:affected_rows, *args, &blk) + end + end + end + end +end \ No newline at end of file diff --git a/lib/em-synchrony/connection_pool.rb b/lib/em-synchrony/connection_pool.rb index 698e3e1..c84907d 100644 --- a/lib/em-synchrony/connection_pool.rb +++ b/lib/em-synchrony/connection_pool.rb @@ -26,13 +26,17 @@ def execute(async) end end + def connection + acquire(Fiber.current) + end + private # Acquire a lock on a connection and assign it to executing fiber # - if connection is available, pass it back to the calling block # - if pool is full, yield the current fiber until connection is available def acquire(fiber) - + return @reserved[fiber.object_id] if @reserved[fiber.object_id] if conn = @available.pop @reserved[fiber.object_id] = conn conn diff --git a/spec/activerecord_spec.rb b/spec/activerecord_spec.rb index 2ad9680..350b0e7 100644 --- a/spec/activerecord_spec.rb +++ b/spec/activerecord_spec.rb @@ -1,9 +1,14 @@ require "spec/helper/all" require "em-synchrony/activerecord" +require "em-synchrony/fiber_iterator" # create database widgets; # use widgets; -# create table widgets (idx INT); +# create table widgets ( +# id INT NOT NULL AUTO_INCREMENT, +# title varchar(255), +# PRIMARY KEY (`id`) +# ); class Widget < ActiveRecord::Base; end; @@ -11,28 +16,31 @@ class Widget < ActiveRecord::Base; end; DELAY = 0.25 QUERY = "SELECT sleep(#{DELAY})" - it "should establish AR connection" do - EventMachine.synchrony do + + def establish_connection ActiveRecord::Base.establish_connection( :adapter => 'em_mysql2', :database => 'widgets', - :username => 'root' + :username => 'root', + :pool => 10, + :real_pool => 10 ) + Widget.delete_all + end - result = Widget.find_by_sql(QUERY) - result.size.should == 1 + it "should establish AR connection" do + EventMachine.synchrony do + establish_connection + result = Widget.find_by_sql(QUERY) + result.size.should eql(1) EventMachine.stop end end it "should fire sequential, synchronous requests within single fiber" do EventMachine.synchrony do - ActiveRecord::Base.establish_connection( - :adapter => 'em_mysql2', - :database => 'widgets', - :username => 'root' - ) + establish_connection start = now res = [] @@ -41,10 +49,63 @@ class Widget < ActiveRecord::Base; end; res.push Widget.find_by_sql(QUERY) (now - start.to_f).should be_within(DELAY * res.size * 0.15).of(DELAY * res.size) - res.size.should == 2 + res.size.should eql(2) EventMachine.stop end end + it "should fire 100 requests in fibers" do + EM.synchrony do + establish_connection + EM::Synchrony::FiberIterator.new(1..100, 40).each do |i| + widget = Widget.create title: 'hi' + widget.update_attributes title: 'hello' + end + EM.stop + end + end + + it "should create widget" do + EM.synchrony do + establish_connection + Widget.create + Widget.create + Widget.count.should eql(2) + EM.stop + end + end + + it "should update widget" do + EM.synchrony do + establish_connection + ActiveRecord::Base.connection.execute("TRUNCATE TABLE widgets;") + widget = Widget.create title: 'hi' + widget.update_attributes title: 'hello' + Widget.find(widget.id).title.should eql('hello') + EM.stop + end + end + + describe "transactions" do + it "should work properly" do + EM.synchrony do + establish_connection + EM::Synchrony::FiberIterator.new(1..50, 30).each do |i| + widget = Widget.create title: "hi#{i}" + ActiveRecord::Base.transaction do + widget.update_attributes title: "hello" + end + ActiveRecord::Base.transaction do + raise ActiveRecord::Rollback + end + end + Widget.all.each do |widget| + widget.title.should eq('hello') + end + EM.stop + end + end + end + end \ No newline at end of file