From 64d0d6b910a8a67fae92c3a5044370a1f8687a7d Mon Sep 17 00:00:00 2001 From: "Joshua T. Mckinney" Date: Mon, 16 May 2016 09:00:13 -0500 Subject: [PATCH] Detect forks --- .travis.yml | 2 +- HISTORY.md | 3 +- README.md | 29 +--- lib/hot_tub/pool.rb | 48 +++---- spec/hot_tub/integration/net_http_spec.rb | 155 +++++++++++----------- 5 files changed, 99 insertions(+), 138 deletions(-) diff --git a/.travis.yml b/.travis.yml index a571998..47dd675 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,4 +8,4 @@ rvm: - 2.0.0-p648 - ruby-head - rbx-2 - - jruby-head \ No newline at end of file + - jruby \ No newline at end of file diff --git a/HISTORY.md b/HISTORY.md index aafeb4a..c17c847 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,11 +5,12 @@ Head ======= - None yet. -1.0.1 +1.1.0 ======= - Close orphan clients outside of synchonize - Freeze alarm message - Detect dead resources and reap +- Detect fork 1.0.0 ======= diff --git a/README.md b/README.md index bbc1091..641ec1c 100644 --- a/README.md +++ b/README.md @@ -140,35 +140,8 @@ a lambda that accepts the client as an argument or symbol representing a method ## Forking -HotTub's `#reset!` methods close all idle connections, prevents connections in use from returning -to the pool and attempts to close orphaned connections as they attempt to return. +HotTub::Pool automatically detects forks and drains the pool, so no additional "after fork" code is required. - # Puma - on_worker_boot do - - # If you let HotTub manage all your connections - HotTub.reset! - - # If you have your own HotTub::Sessions - MY_SESSIONS.reset! - - # If you have any one-off pools - MY_POOL.reset! - - end - - # Unicorn - before_fork do |server, worker| - - # If you let HotTub manage all your connections - HotTub.reset! - - # If you have your own HotTub::Sessions - MY_SESSIONS.reset! - - # If you have any one-off pools - MY_POOL.reset! - end ## Contributing to HotTub diff --git a/lib/hot_tub/pool.rb b/lib/hot_tub/pool.rb index 2d9db25..3ca163c 100644 --- a/lib/hot_tub/pool.rb +++ b/lib/hot_tub/pool.rb @@ -87,10 +87,13 @@ class Pool # [:reap_timeout] # Default is 600 seconds. An integer that represents the timeout for reaping the pool in seconds. # + # [:detect_fork] + # Set to false to disable fork detection + # def initialize(opts={},&client_block) raise ArgumentError, 'a block that initializes a new client is required' unless block_given? @name = (opts[:name] || self.class.name) - @size = (opts[:size] || 5) # in seconds + @size = (opts[:size] || 5) @wait_timeout = (opts[:wait_timeout] || 10) # in seconds @reap_timeout = (opts[:reap_timeout] || 600) # the interval to reap connections in seconds @max_size = (opts[:max_size] || 0) # maximum size of pool when non-blocking, 0 means no limit @@ -116,12 +119,15 @@ def initialize(opts={},&client_block) @never_block = (@max_size == 0) + @pid = Process.pid unless opts[:detect_fork] == false + at_exit {shutdown!} unless @sessions_key end # Preform an operations with a client/connection. # Requires a block that receives the client. def run + drain! if forked? clnt = pop yield clnt ensure @@ -158,48 +164,22 @@ def drain! ensure @_out.clear @_pool.clear + @pid = Process.pid @cond.broadcast end end nil end alias :close! :drain! - - # Reset the pool. - # or if shutdown allow threads to quickly finish their work - # Clients from the previous pool will not return to pool. - def reset! - HotTub.logger.info "[HotTub] Resetting pool #{@name}!" if HotTub.logger - @mutex.synchronize do - begin - while clnt = @_pool.pop - close_client(clnt) - end - ensure - @_out.clear - @_pool.clear - @cond.broadcast - end - end - nil - end + alias :reset! :drain! # Kills the reaper and drains the pool. def shutdown! HotTub.logger.info "[HotTub] Shutting down pool #{@name}!" if HotTub.logger @shutdown = true kill_reaper if @reaper - @mutex.synchronize do - begin - while clnt = @_pool.pop - close_client(clnt) - end - ensure - @_out.clear - @_pool.clear - @cond.broadcast - end - end + drain! + @shutdown = false nil end @@ -250,6 +230,10 @@ def max_size=max_size @max_size = max_size end + def forked? + (@pid && (@pid != Process.pid)) + end + private ALARM_MESSAGE = "Could not fetch a free client in time. Consider increasing your pool size.".freeze @@ -289,7 +273,7 @@ def push(clnt) # Safely pull client from pool, adding if allowed # If a client is not available, check for dead - # resources and schedule reap if nesseccary + # resources and schedule reap if nesseccary def pop alarm = (Time.now + @wait_timeout) clnt = nil diff --git a/spec/hot_tub/integration/net_http_spec.rb b/spec/hot_tub/integration/net_http_spec.rb index 5e4a527..8a0127e 100644 --- a/spec/hot_tub/integration/net_http_spec.rb +++ b/spec/hot_tub/integration/net_http_spec.rb @@ -1,104 +1,107 @@ require 'spec_helper' + describe HotTub do - context "blocking (size equals max_size)" do - let(:pool) do - HotTub::Pool.new(:size => 4, :max_size => 4) { - uri = URI.parse(HotTub::Server.url) - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = false - http.start - http - } - end + unless HotTub.jruby? + context "blocking (size equals max_size)" do + let(:pool) do + HotTub::Pool.new(:size => 4, :max_size => 4) { + uri = URI.parse(HotTub::Server.url) + http = Net::HTTP.new(uri.host, uri.port) + http.use_ssl = false + http.start + http + } + end - let(:threads) { [] } + let(:threads) { [] } - before(:each) do - 20.times do - net_http_thread_work(pool, 10, threads) + before(:each) do + 20.times do + net_http_thread_work(pool, 10, threads) + end end - end - it { expect(pool.current_size).to eql(4) } + it { expect(pool.current_size).to eql(4) } - it "should work" do - results = threads.collect{ |t| t[:status]} - expect(results.length).to eql(200) - expect(results.uniq).to eql(['200']) - end + it "should work" do + results = threads.collect{ |t| t[:status]} + expect(results.length).to eql(200) + expect(results.uniq).to eql(['200']) + end - it "should shutdown" do - pool.shutdown! - expect(pool.current_size).to eql(0) + it "should shutdown" do + pool.shutdown! + expect(pool.current_size).to eql(0) + end end - end - context "with larger max" do - let(:pool) do - HotTub::Pool.new(:size => 4, :max_size => 8) { - uri = URI.parse(HotTub::Server.url) - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = false - http.start - http - } - end + context "with larger max" do + let(:pool) do + HotTub::Pool.new(:size => 4, :max_size => 8) { + uri = URI.parse(HotTub::Server.url) + http = Net::HTTP.new(uri.host, uri.port) + http.use_ssl = false + http.start + http + } + end - let(:threads) { [] } + let(:threads) { [] } - before(:each) do - 20.times do - net_http_thread_work(pool, 10, threads) + before(:each) do + 20.times do + net_http_thread_work(pool, 10, threads) + end end - end - it { expect(pool.current_size).to be >= 4 } - it { expect(pool.current_size).to be <= 8 } - it "should work" do - results = threads.collect{ |t| t[:status]} - expect(results.length).to eql(200) - expect(results.uniq).to eql(['200']) + it { expect(pool.current_size).to be >= 4 } + it { expect(pool.current_size).to be <= 8 } + it "should work" do + results = threads.collect{ |t| t[:status]} + expect(results.length).to eql(200) + expect(results.uniq).to eql(['200']) + end end - end - context "sized without max" do - let(:pool) do - HotTub::Pool.new(:size => 4) { - uri = URI.parse(HotTub::Server.url) - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = false - http.start - http - } - end + context "sized without max" do + let(:pool) do + HotTub::Pool.new(:size => 4) { + uri = URI.parse(HotTub::Server.url) + http = Net::HTTP.new(uri.host, uri.port) + http.use_ssl = false + http.start + http + } + end - let(:threads) { [] } + let(:threads) { [] } - before(:each) do - 20.times do - net_http_thread_work(pool, 10, threads) + before(:each) do + 20.times do + net_http_thread_work(pool, 10, threads) + end end - end - it { expect(pool.current_size).to be > 4 } + it { expect(pool.current_size).to be > 4 } - it "should work" do - results = threads.collect{ |t| t[:status]} - expect(results.length).to eql(200) - expect(results.uniq).to eql(['200']) + it "should work" do + results = threads.collect{ |t| t[:status]} + expect(results.length).to eql(200) + expect(results.uniq).to eql(['200']) + end end end -end -def net_http_thread_work(pool,thread_count=0, threads=[]) - thread_count.times.each do - threads << Thread.new do - uri = URI.parse(HotTub::Server.url) - pool.run{|connection| Thread.current[:status] = connection.get(uri.path).code } + def net_http_thread_work(pool,thread_count=0, threads=[]) + thread_count.times.each do + threads << Thread.new do + uri = URI.parse(HotTub::Server.url) + pool.run{|connection| Thread.current[:status] = connection.get(uri.path).code } + end + end + threads.each do |t| + t.join end - end - threads.each do |t| - t.join end end