Skip to content

Commit

Permalink
Detect forks
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshMcKin committed May 16, 2016
1 parent 070f98f commit 64d0d6b
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 138 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -8,4 +8,4 @@ rvm:
- 2.0.0-p648
- ruby-head
- rbx-2
- jruby-head
- jruby
3 changes: 2 additions & 1 deletion HISTORY.md
Expand Up @@ -5,11 +5,12 @@ Head
=======
- None yet.

1.0.1
1.1.0
=======
- Close orphan clients outside of synchonize
- Freeze alarm message
- Detect dead resources and reap
- Detect fork

1.0.0
=======
Expand Down
29 changes: 1 addition & 28 deletions README.md
Expand Up @@ -140,35 +140,8 @@ a lambda that accepts the client as an argument or symbol representing a method

## Forking

HotTub's `#reset!` methods close all idle connections, prevents connections in use from returning
to the pool and attempts to close orphaned connections as they attempt to return.
HotTub::Pool automatically detects forks and drains the pool, so no additional "after fork" code is required.

# Puma
on_worker_boot do

# If you let HotTub manage all your connections
HotTub.reset!

# If you have your own HotTub::Sessions
MY_SESSIONS.reset!

# If you have any one-off pools
MY_POOL.reset!

end

# Unicorn
before_fork do |server, worker|

# If you let HotTub manage all your connections
HotTub.reset!

# If you have your own HotTub::Sessions
MY_SESSIONS.reset!

# If you have any one-off pools
MY_POOL.reset!
end


## Contributing to HotTub
Expand Down
48 changes: 16 additions & 32 deletions lib/hot_tub/pool.rb
Expand Up @@ -87,10 +87,13 @@ class Pool
# [:reap_timeout]
# Default is 600 seconds. An integer that represents the timeout for reaping the pool in seconds.
#
# [:detect_fork]
# Set to false to disable fork detection
#
def initialize(opts={},&client_block)
raise ArgumentError, 'a block that initializes a new client is required' unless block_given?
@name = (opts[:name] || self.class.name)
@size = (opts[:size] || 5) # in seconds
@size = (opts[:size] || 5)
@wait_timeout = (opts[:wait_timeout] || 10) # in seconds
@reap_timeout = (opts[:reap_timeout] || 600) # the interval to reap connections in seconds
@max_size = (opts[:max_size] || 0) # maximum size of pool when non-blocking, 0 means no limit
Expand All @@ -116,12 +119,15 @@ def initialize(opts={},&client_block)

@never_block = (@max_size == 0)

@pid = Process.pid unless opts[:detect_fork] == false

at_exit {shutdown!} unless @sessions_key
end

# Preform an operations with a client/connection.
# Requires a block that receives the client.
def run
drain! if forked?
clnt = pop
yield clnt
ensure
Expand Down Expand Up @@ -158,48 +164,22 @@ def drain!
ensure
@_out.clear
@_pool.clear
@pid = Process.pid
@cond.broadcast
end
end
nil
end
alias :close! :drain!

# Reset the pool.
# or if shutdown allow threads to quickly finish their work
# Clients from the previous pool will not return to pool.
def reset!
HotTub.logger.info "[HotTub] Resetting pool #{@name}!" if HotTub.logger
@mutex.synchronize do
begin
while clnt = @_pool.pop
close_client(clnt)
end
ensure
@_out.clear
@_pool.clear
@cond.broadcast
end
end
nil
end
alias :reset! :drain!

# Kills the reaper and drains the pool.
def shutdown!
HotTub.logger.info "[HotTub] Shutting down pool #{@name}!" if HotTub.logger
@shutdown = true
kill_reaper if @reaper
@mutex.synchronize do
begin
while clnt = @_pool.pop
close_client(clnt)
end
ensure
@_out.clear
@_pool.clear
@cond.broadcast
end
end
drain!
@shutdown = false
nil
end

Expand Down Expand Up @@ -250,6 +230,10 @@ def max_size=max_size
@max_size = max_size
end

def forked?
(@pid && (@pid != Process.pid))
end

private

ALARM_MESSAGE = "Could not fetch a free client in time. Consider increasing your pool size.".freeze
Expand Down Expand Up @@ -289,7 +273,7 @@ def push(clnt)

# Safely pull client from pool, adding if allowed
# If a client is not available, check for dead
# resources and schedule reap if nesseccary
# resources and schedule reap if nesseccary
def pop
alarm = (Time.now + @wait_timeout)
clnt = nil
Expand Down
155 changes: 79 additions & 76 deletions spec/hot_tub/integration/net_http_spec.rb
@@ -1,104 +1,107 @@
require 'spec_helper'


describe HotTub do
context "blocking (size equals max_size)" do
let(:pool) do
HotTub::Pool.new(:size => 4, :max_size => 4) {
uri = URI.parse(HotTub::Server.url)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = false
http.start
http
}
end
unless HotTub.jruby?
context "blocking (size equals max_size)" do
let(:pool) do
HotTub::Pool.new(:size => 4, :max_size => 4) {
uri = URI.parse(HotTub::Server.url)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = false
http.start
http
}
end

let(:threads) { [] }
let(:threads) { [] }

before(:each) do
20.times do
net_http_thread_work(pool, 10, threads)
before(:each) do
20.times do
net_http_thread_work(pool, 10, threads)
end
end
end

it { expect(pool.current_size).to eql(4) }
it { expect(pool.current_size).to eql(4) }

it "should work" do
results = threads.collect{ |t| t[:status]}
expect(results.length).to eql(200)
expect(results.uniq).to eql(['200'])
end
it "should work" do
results = threads.collect{ |t| t[:status]}
expect(results.length).to eql(200)
expect(results.uniq).to eql(['200'])
end

it "should shutdown" do
pool.shutdown!
expect(pool.current_size).to eql(0)
it "should shutdown" do
pool.shutdown!
expect(pool.current_size).to eql(0)
end
end
end

context "with larger max" do
let(:pool) do
HotTub::Pool.new(:size => 4, :max_size => 8) {
uri = URI.parse(HotTub::Server.url)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = false
http.start
http
}
end
context "with larger max" do
let(:pool) do
HotTub::Pool.new(:size => 4, :max_size => 8) {
uri = URI.parse(HotTub::Server.url)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = false
http.start
http
}
end

let(:threads) { [] }
let(:threads) { [] }

before(:each) do
20.times do
net_http_thread_work(pool, 10, threads)
before(:each) do
20.times do
net_http_thread_work(pool, 10, threads)
end
end
end

it { expect(pool.current_size).to be >= 4 }
it { expect(pool.current_size).to be <= 8 }
it "should work" do
results = threads.collect{ |t| t[:status]}
expect(results.length).to eql(200)
expect(results.uniq).to eql(['200'])
it { expect(pool.current_size).to be >= 4 }
it { expect(pool.current_size).to be <= 8 }
it "should work" do
results = threads.collect{ |t| t[:status]}
expect(results.length).to eql(200)
expect(results.uniq).to eql(['200'])
end
end
end

context "sized without max" do
let(:pool) do
HotTub::Pool.new(:size => 4) {
uri = URI.parse(HotTub::Server.url)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = false
http.start
http
}
end
context "sized without max" do
let(:pool) do
HotTub::Pool.new(:size => 4) {
uri = URI.parse(HotTub::Server.url)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = false
http.start
http
}
end

let(:threads) { [] }
let(:threads) { [] }

before(:each) do
20.times do
net_http_thread_work(pool, 10, threads)
before(:each) do
20.times do
net_http_thread_work(pool, 10, threads)
end
end
end

it { expect(pool.current_size).to be > 4 }
it { expect(pool.current_size).to be > 4 }

it "should work" do
results = threads.collect{ |t| t[:status]}
expect(results.length).to eql(200)
expect(results.uniq).to eql(['200'])
it "should work" do
results = threads.collect{ |t| t[:status]}
expect(results.length).to eql(200)
expect(results.uniq).to eql(['200'])
end
end
end
end

def net_http_thread_work(pool,thread_count=0, threads=[])
thread_count.times.each do
threads << Thread.new do
uri = URI.parse(HotTub::Server.url)
pool.run{|connection| Thread.current[:status] = connection.get(uri.path).code }
def net_http_thread_work(pool,thread_count=0, threads=[])
thread_count.times.each do
threads << Thread.new do
uri = URI.parse(HotTub::Server.url)
pool.run{|connection| Thread.current[:status] = connection.get(uri.path).code }
end
end
threads.each do |t|
t.join
end
end
threads.each do |t|
t.join
end
end

0 comments on commit 64d0d6b

Please sign in to comment.