From c065a9b5e40ca14a979c8a4049ac8676acd09c97 Mon Sep 17 00:00:00 2001 From: Brad Pardee Date: Thu, 8 Dec 2011 08:07:18 -0500 Subject: [PATCH] Dynamic modification of pool size, added close and remove_idle methods --- .gitignore | 3 +- Gemfile | 6 ++ History.md | 25 +++++++ History.txt | 20 ------ LICENSE => LICENSE.txt | 2 +- README.md | 58 +++++++++++++++ README.rdoc | 44 ------------ Rakefile | 42 +++++++---- VERSION | 1 - gene_pool.gemspec | 55 +++------------ lib/gene_pool.rb | 155 ++++++++++++++++++++++++++++++++++------- 11 files changed, 260 insertions(+), 151 deletions(-) create mode 100644 Gemfile create mode 100644 History.md delete mode 100644 History.txt rename LICENSE => LICENSE.txt (96%) create mode 100644 README.md delete mode 100644 README.rdoc delete mode 100644 VERSION diff --git a/.gitignore b/.gitignore index ae5fc6d..c1a73a1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ doc -pkg .idea +gene_pool-*.gem +Gemfile.lock diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..ecb3b75 --- /dev/null +++ b/Gemfile @@ -0,0 +1,6 @@ +source "http://rubygems.org" + +group :development do + gem 'rake' + gem 'rdoc' +end diff --git a/History.md b/History.md new file mode 100644 index 0000000..8fc55a8 --- /dev/null +++ b/History.md @@ -0,0 +1,25 @@ +GenePool Changelog +===================== + +1.2.0 / 2011-12-07 + + - Allow dynamic modification of pool size. + - Added close method which will prevent checking out of new connections and wait for and close all current connections. + - Added remove_idle method which will close all current connections which have been idle for the given idle_time. + +1.1.1 / 2010-11-18 + + - In with_connection_auto_retry, add check for e.message =~ /expired/ as JRuby exception won't be a + Timeout::Error at this point (http://jira.codehaus.org/browse/JRUBY-5194) + +1.1.0 / 2010-11-11 + + - Added with_connection_auto_retry to automatically retry yield block if a non-timeout exception occurs + +1.0.1 / 2010-09-12 + + - Debug logging was NOT thread-safe + +1.0.0 / 2010-09-05 + + - Initial release diff --git a/History.txt b/History.txt deleted file mode 100644 index 1b259c2..0000000 --- a/History.txt +++ /dev/null @@ -1,20 +0,0 @@ -=== 1.1.1 / 2010-11-18 - -* Bug fixes - * In with_connection_auto_retry, add check for e.message =~ /expired/ as JRuby exception won't be a - Timeout::Error at this point (http://jira.codehaus.org/browse/JRUBY-5194) - -=== 1.1.0 / 2010-11-11 - -* 1 enhancement: - * Added with_connection_auto_retry to automatically retry yield block if a non-timeout exception occurs - -=== 1.0.1 / 2010-09-12 - -* Bug Fixes - * Debug logging was NOT thread-safe - -== 1.0.0 / 2010-09-05 - -* 1 major enhancement: - * Initial release diff --git a/LICENSE b/LICENSE.txt similarity index 96% rename from LICENSE rename to LICENSE.txt index 2803398..a51d362 100644 --- a/LICENSE +++ b/LICENSE.txt @@ -1,4 +1,4 @@ -Copyright (c) 2010 Brad Pardee +Copyright (c) 2010-2011 Brad Pardee Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the diff --git a/README.md b/README.md new file mode 100644 index 0000000..089a8db --- /dev/null +++ b/README.md @@ -0,0 +1,58 @@ +# gene_pool + +* http://github.com/bpardee/gene_pool + +## DESCRIPTION: + +Generic pooling library for connection pools. + +## FEATURES/PROBLEMS: + +* Thread-safe +* Pure ruby + +## INSTALL: + + gem install gene_pool + +## EXAMPLE USAGE: + + class MyClient + @@gene_pool = GenePool.new(:name => 'MyClient', + :pool_size => 10, + :warn_timeout => 0.25, + :logger => Rails.logger, + :close_proc => :close) do + TCPSocket.new('myserver', 4321) + end + + def send_message + @@gene_pool.with_connection do |socket| + begin + # use socket here + rescue Exception => e + # If the socket gets closed, remove it from the pool + @@gene_pool.remove(socket) + end + end + end + + # Equivalent to send_message above + def send_message_auto_remove + # On exception, close and reopen socket and perform retry + @@gene_pool.with_connection_auto_remove do |socket| + # use socket here, + end + end + + def send_message_auto_retry + # On exception, close and reopen socket and perform retry + @@gene_pool.with_connection_auto_retry do |socket| + # use socket here, + end + end + end + +## Copyright + +Copyright (c) 2010-2011 Brad Pardee. See LICENSE for details. diff --git a/README.rdoc b/README.rdoc deleted file mode 100644 index e4a8c7a..0000000 --- a/README.rdoc +++ /dev/null @@ -1,44 +0,0 @@ -= gene_pool - -* http://github.com/bpardee/gene_pool - -== DESCRIPTION: - -Generic pooling library for connection pools. - -== FEATURES/PROBLEMS: - -* Thread-safe -* Pure ruby - -== INSTALL: - - gem install gene_pool - -== EXAMPLE USAGE: - - class MyClient - @@gene_pool = GenePool.new(:name => 'MyClient', - :pool_size => 10, - :warn_timeout => 0.25, - :logger => Rails.logger) do - TCPSocket.new('myserver', 4321) - end - - def send_message - @@gene_pool.with_connection do |socket| - # use socket here - end - end - - def send_message_auto_retry - # On exception, close and reopen socket and perform retry - @@gene_pool.with_connection_auto_retry do |socket| - # use socket here, - end - end - end - -== Copyright - -Copyright (c) 2010 Brad Pardee. See LICENSE for details. diff --git a/Rakefile b/Rakefile index 8f368dd..0d962dc 100644 --- a/Rakefile +++ b/Rakefile @@ -1,16 +1,34 @@ +# encoding: UTF-8 require 'rubygems' -require 'rake' - begin - require 'jeweler' - Jeweler::Tasks.new do |gemspec| - gemspec.name = "gene_pool" - gemspec.summary = "Generic pooling library for creating a connection pool" - gemspec.description = "Generic pooling library for creating a connection pool" - gemspec.email = "bradpardee@gmail.com" - gemspec.homepage = "http://github.com/bpardee/gene_pool" - gemspec.authors = ["Brad Pardee"] - end + require 'bundler/setup' rescue LoadError - puts "Jeweler not available. Install it with: gem install jeweler" + puts 'You must `gem install bundler` and `bundle install` to run rake tasks' +end + +require 'rake' +require 'rdoc/task' +require 'rake/testtask' +require 'rake/clean' + +desc "Build gem" +task :gem do |t| + system 'gem build gene_pool.gemspec' +end + +Rake::TestTask.new(:test) do |t| + t.libs << 'lib' + t.libs << 'test' + t.pattern = 'test/**/*_test.rb' + t.verbose = false +end + +task :default => :test + +RDoc::Task.new(:rdoc) do |rdoc| + rdoc.rdoc_dir = 'rdoc' + rdoc.title = 'GenePool' + rdoc.options << '--line-numbers' << '--inline-source' + rdoc.rdoc_files.include('README.md') + rdoc.rdoc_files.include('lib/**/*.rb') end diff --git a/VERSION b/VERSION deleted file mode 100644 index 524cb55..0000000 --- a/VERSION +++ /dev/null @@ -1 +0,0 @@ -1.1.1 diff --git a/gene_pool.gemspec b/gene_pool.gemspec index 0660564..6cf6191 100644 --- a/gene_pool.gemspec +++ b/gene_pool.gemspec @@ -1,49 +1,12 @@ -# Generated by jeweler -# DO NOT EDIT THIS FILE DIRECTLY -# Instead, edit Jeweler::Tasks in Rakefile, and run the gemspec command -# -*- encoding: utf-8 -*- - Gem::Specification.new do |s| - s.name = %q{gene_pool} - s.version = "1.1.1" - - s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version= - s.authors = ["Brad Pardee"] - s.date = %q{2010-11-18} - s.description = %q{Generic pooling library for creating a connection pool} - s.email = %q{bradpardee@gmail.com} - s.extra_rdoc_files = [ - "LICENSE", - "README.rdoc" - ] - s.files = [ - ".gitignore", - "History.txt", - "LICENSE", - "README.rdoc", - "Rakefile", - "VERSION", - "gene_pool.gemspec", - "lib/gene_pool.rb", - "test/gene_pool_test.rb" - ] - s.homepage = %q{http://github.com/bpardee/gene_pool} - s.rdoc_options = ["--charset=UTF-8"] + s.name = "gene_pool" + s.summary = 'Generic pooling library for creating a connection pool' + s.description = 'Generic pooling library for creating a connection pool' + s.authors = ['Brad Pardee'] + s.email = ['bradpardee@gmail.com'] + s.homepage = 'http://github.com/bpardee/gene_pool' + s.files = Dir["{examples,lib}/**/*"] + %w(LICENSE.txt Rakefile Gemfile History.md README.md) + s.test_files = ["test/gene_pool_test.rb"] + s.version = '1.2.0' s.require_paths = ["lib"] - s.rubygems_version = %q{1.3.6} - s.summary = %q{Generic pooling library for creating a connection pool} - s.test_files = [ - "test/gene_pool_test.rb" - ] - - if s.respond_to? :specification_version then - current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION - s.specification_version = 3 - - if Gem::Version.new(Gem::RubyGemsVersion) >= Gem::Version.new('1.2.0') then - else - end - else - end end - diff --git a/lib/gene_pool.rb b/lib/gene_pool.rb index ba13a90..e9bf9b9 100644 --- a/lib/gene_pool.rb +++ b/lib/gene_pool.rb @@ -1,21 +1,33 @@ +require 'logger' + # Generic connection pool class class GenePool - attr_reader :name, :pool_size, :warn_timeout, :logger + attr_accessor :name, :pool_size, :warn_timeout, :logger + # Creates a gene_pool. The passed block will be used to initialize a single instance of + # the item being pooled (i.e., socket connection or whatever) + # options - + # name - The name used in logging messages + # pool_size - The maximum number of instances that will be created (Defaults to 1). + # warn_timeout - Displays an error message if a checkout takes longer that the given time (used to give hints to increase the pool size) + # logger - The logger used for log messages, defaults to STDERR. + # close_proc - The process or method used to close a pooled instance when it is removed. + # Defaults to :close. Set to nil for no-op or a symbol for a method or a proc that takes an argument for the instance. def initialize(options={}, &connect_block) @connect_block = connect_block @name = options[:name] || 'GenePool' @pool_size = options[:pool_size] || 1 @warn_timeout = options[:warn_timeout] || 5.0 - @logger = options[:logger] + @logger = options[:logger] || Logger.new(STDERR) + @close_proc = options[:close_proc] || :close # Mutex for synchronizing pool access @mutex = Mutex.new # Condition variable for waiting for an available connection - @queue = ConditionVariable.new + @condition = ConditionVariable.new @connections = [] @checked_out = [] @@ -24,6 +36,26 @@ def initialize(options={}, &connect_block) @with_map = {} end + def size + @mutex.synchronize do + return @connections.size + end + end + + def pool_size=(size) + @mutex.synchronize do + return if @pool_size == size + @pool_size = size + if @pool_size < @connections.size + old_connections = (@connections - @checked_out).last(@connections.size - @pool_size) + old_connections.each do |connection| + remove_and_close(connection) + @logger.info "#{@name}: Connection #{connection}(#{connection.object_id}) has been removed due to pool size reduction" + end + end + end + end + # Check out a connection from the pool, creating it if necessary. def checkout start_time = Time.now @@ -31,6 +63,7 @@ def checkout reserved_connection_placeholder = Thread.current begin @mutex.synchronize do + raise "Can't perform checkout, #{@name} has been closed" if @pool_size == 0 until connection do if @checked_out.size < @connections.size connection = (@connections - @checked_out).first @@ -40,16 +73,16 @@ def checkout connection = reserved_connection_placeholder @connections << connection @checked_out << connection - @logger.debug "#{@name}: Created connection ##{@connections.size} #{connection}:#{connection.object_id} for #{name}" if @logger && @logger.debug? + @logger.debug {"#{@name}: Created connection ##{@connections.size} #{connection}(#{connection.object_id}) for #{name}"} else - @logger.info "#{@name}: Waiting for an available connection, all #{@pool_size} connections are checked out." if @logger - @queue.wait(@mutex) + @logger.info "#{@name}: Waiting for an available connection, all #{@pool_size} connections are checked out." + @condition.wait(@mutex) end end end ensure delta = Time.now - start_time - if @logger && delta > @warn_timeout + if delta > @warn_timeout @logger.warn "#{@name}: It took #{delta} seconds to obtain a connection. Consider raising the pool size which is " + "currently set to #{@pool_size}." end @@ -58,7 +91,7 @@ def checkout connection = renew(reserved_connection_placeholder) end - @logger.debug "#{@name}: Checkout connection #{connection.object_id} self=#{dump}" if @logger && @logger.debug? + @logger.debug {"#{@name}: Checkout connection #{connection}(#{connection.object_id}) self=#{self}"} return connection end @@ -66,12 +99,24 @@ def checkout def checkin(connection) @mutex.synchronize do @checked_out.delete(connection) - @queue.signal + if @pool_size < @connections.size + remove_and_close(connection) + @logger.info "#{@name}: Checkin connection #{connection}(#{connection.object_id}) has been removed due to pool size reduction" + else + connection._last_used = Time.now + @condition.signal + end end - @logger.debug "#{@name}: Checkin connection #{connection.object_id} self=#{dump}" if @logger && @logger.debug? + @logger.debug {"#{@name}: Checkin connection #{connection}(#{connection.object_id}) self=#{self}"} end # Create a scope for checking out a connection + # The client should handle cleanup on exception which should be something similar to the following: + # rescue Exception => e + # @gene_pool.remove(connection) + # raise + # end + # Note that with_connection_auto_remove automatically does this def with_connection connection = checkout @mutex.synchronize do @@ -87,16 +132,26 @@ def with_connection checkin(connection) end end - + + # Create a scope for checking out a connection while automatically removing this connection on exception + def with_connection_auto_remove + with_connection do |connection| + begin + yield connection + rescue Exception => e + remove(connection) + raise + end + end + end + + # Create a scope for checking out a connection while automatically retrying on exception - def with_connection_auto_retry(close_on_error = true) + def with_connection_auto_retry with_connection do |connection| begin yield connection rescue Exception => e - if close_on_error - connection.close rescue nil - end if e.kind_of?(Timeout::Error) || e.message =~ /expired/ remove(connection) raise @@ -105,9 +160,6 @@ def with_connection_auto_retry(close_on_error = true) begin yield connection rescue Exception => e - if close_on_error - connection.close rescue nil - end remove(connection) raise end @@ -120,9 +172,10 @@ def remove(connection) @mutex.synchronize do @connections.delete(connection) @checked_out.delete(connection) - @queue.signal + @condition.signal end - @logger.debug "#{@name}: Removed connection #{connection.object_id} self=#{dump}" if @logger && @logger.debug? + close_connection(connection) + @logger.debug {"#{@name}: Removed connection #{connection}(#{connection.object_id}) self=#{self}"} end # If a connection needs to be renewed for some reason, reassign it here @@ -134,29 +187,56 @@ def renew(old_connection) remove old_connection raise end + class << new_connection + attr_accessor :_last_used + end @mutex.synchronize do index = @checked_out.index(old_connection) raise Error.new("Can't reassign non-checked out connection for #{@name}") unless index + close_connection(old_connection) @checked_out[index] = new_connection @connections[@connections.index(old_connection)] = new_connection # If this is part of a with_connection block, then track our new connection with_key = @with_map.index(old_connection) @with_map[with_key] = new_connection if with_key end - @logger.debug "#{@name}: Renewed connection old=#{old_connection.object_id} new=#{new_connection}:#{new_connection.object_id}" if @logger && @logger.debug? + @logger.debug {"#{@name}: Renewed connection old=#{old_connection.object_id} new=#{new_connection}(#{new_connection.object_id})"} return new_connection end - # Perform the given block for each connection, i.e., closing each connection. + # Perform the given block for each connection. Note that close should be used for safely closing all connections def each + # TBD: Should this be removed? @mutex.synchronize do @connections.each { |connection| yield connection } end end - - private - - def dump + + def close(timeout=10) + self.pool_size = 0 + start = Time.now + while (Time.now - start) < timeout + sleep 1 + @mutex.synchronize do + return if @connections.empty? + @logger.info "#{@name}: Waiting to close, #{@connections.size} connections are still in use" + end + end + @logger.warn "#{@name}: Timed out while waiting to close, #{@connections.size} connections are still in use" + end + + def remove_idle(idle_time=60) + @mutex.synchronize do + (@connections - @checked_out).each do |idle_connection| + if (Time.now - idle_connection._last_used) >= idle_time + remove_and_close(idle_connection) + @logger.debug {"#{@name}: Removed idle connection=#{idle_connection}(#{idle_connection.object_id})"} + end + end + end + end + + def to_s conn = chk = with = nil @mutex.synchronize do conn = @connections.map{|c| c.object_id}.join(',') @@ -165,4 +245,27 @@ def dump end "connections=#{conn} checked_out=#{chk} with_map=#{with}" end + + ####### + private + ####### + + def close_connection(connection) + return unless @close_proc + # Thread is used as a reserved_connection_placeholder so don't close the connection if it's actually a thread + return if connection.kind_of?(Thread) + if @close_proc.kind_of?(Symbol) + connection.send(@close_proc) + else + @close_proc.call(connection) + end + rescue Exception => e + @logger.warn "Exception trying to close #{connection}(#{connection.object_id}): #{e.message}" + end + + # Clients should have obtained the mutex before calling this! + def remove_and_close(connection) + @connections.delete(connection) + close_connection(connection) + end end