Skip to content

Commit

Permalink
use ThreadSafe::Cache for threaded Session
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshMcKin committed Feb 6, 2014
1 parent 4bd8246 commit b1cad23
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
@@ -1,8 +1,8 @@
language: ruby
rvm:
- 2.1.0
- 2.0.0
- 1.9.3
- 1.9.2
- jruby-19mode
- rbx-19mode
- ruby-head
Expand Down
2 changes: 1 addition & 1 deletion Gemfile
Expand Up @@ -3,7 +3,7 @@ source "https://rubygems.org"
# Specify your gem's dependencies in http_hot_tub.gemspec
gemspec
gem 'rake'

gem 'thread_safe'
group :development do
platform :ruby do
gem 'coveralls', :require => false
Expand Down
1 change: 1 addition & 0 deletions HISTORY.md
Expand Up @@ -7,3 +7,4 @@ HEAD
- Rename unsafe methods with leading "_"
- Add register to pool to track all connections
- EM.add_shutdown_hook for EM connections
- Use ThreadSafe::Cache for threaded sessions
1 change: 1 addition & 0 deletions lib/hot_tub.rb
@@ -1,4 +1,5 @@
require 'thread'
require 'thread_safe'
require 'logger'
require "hot_tub/version"
require "hot_tub/known_clients"
Expand Down
32 changes: 25 additions & 7 deletions lib/hot_tub/session.rb
@@ -1,5 +1,24 @@
require 'uri'
module HotTub
class EmCache < Hash
def initialize
@mutex = EM::Synchrony::Thread::Mutex.new
super
end

def [](key)
@mutex.synchronize do
super
end
end

def []=(key,val)
@mutex.synchronize do
super
end
end
end

class Session
include HotTub::KnownClients
# A HotTub::Session is a synchronized hash used to separate pools/clients by their domain.
Expand Down Expand Up @@ -35,8 +54,7 @@ def initialize(options={},&client_block)
raise ArgumentError, "HotTub::Sessions requre a block on initialization that accepts a single argument" unless block_given?
@options = options || {}
@client_block = client_block
@sessions = Hash.new
@mutex = (em_client? ? EM::Synchrony::Thread::Mutex.new : Mutex.new)
@sessions = (em_client? ? EmCache.new : ThreadSafe::Cache.new)
HotTub.hot_at_exit( em_client? ) {close_all}
end

Expand All @@ -45,14 +63,14 @@ def initialize(options={},&client_block)
def sessions(url)
key = to_key(url)
return @sessions[key] if @sessions[key]
@mutex.synchronize do
#@mutex.synchronize do
if @options[:with_pool]
@sessions[key] = HotTub::Pool.new(@options) { @client_block.call(url) }
else
@sessions[key] = @client_block.call(url) if @sessions[key].nil?
end
@sessions[key]
end
#end
end

def run(url,&block)
Expand All @@ -63,7 +81,7 @@ def run(url,&block)

# Calls close on all pools/clients in sessions
def close_all
@sessions.each do |key,clnt|
@sessions.each_pair do |key,clnt|
if clnt.is_a?(HotTub::Pool)
clnt.close_all
else
Expand All @@ -73,9 +91,9 @@ def close_all
HotTub.logger.error "There was an error close one of your HotTub::Session clients: #{e}"
end
end
@mutex.synchronize do
#@mutex.synchronize do
@sessions[key] = nil
end
#end
end
end

Expand Down
16 changes: 9 additions & 7 deletions spec/session_spec.rb
Expand Up @@ -49,8 +49,8 @@
with_pool_options = HotTub::Session.new { |url| HotTub::Pool.new(:size => 13) { MocClient.new(url) } }
with_pool_options.sessions(@url)
sessions = with_pool_options.instance_variable_get(:@sessions)
sessions.length.should eql(1)
sessions.first[1].should be_a(HotTub::Pool)
sessions.size.should eql(1)
sessions.each_value {|v| v.should be_a( HotTub::Pool)}
end
end

Expand All @@ -59,8 +59,8 @@
no_pool = HotTub::Session.new { |url| Excon.new(url) }
no_pool.sessions(@url)
sessions = no_pool.instance_variable_get(:@sessions)
sessions.length.should eql(1)
sessions.first[1].should be_a(Excon::Connection)
sessions.size.should eql(1)
sessions.each_value {|v| v.should be_a(Excon::Connection)}
end
end

Expand Down Expand Up @@ -175,14 +175,15 @@
context 'fibers' do
it "should work" do
EM.synchrony do
url = HotTub::Server.url
sessions = HotTub::Session.new(:with_pool => true) {|url| EM::HttpRequest.new(url)}
failed = false
fibers = []
lambda {
10.times.each do
fibers << Fiber.new do
sessions.run(@url) {|connection|
s = connection.head(:keepalive => true).response_header.status
sessions.run(url) {|connection|
s = connection.head(:keepalive => true).response_header.status
failed = true unless s == 200}
end
end
Expand All @@ -202,8 +203,9 @@
end
}.should_not raise_error
sessions.instance_variable_get(:@sessions).keys.length.should eql(1)
(sessions.sessions(@url).instance_variable_get(:@pool).length >= 5).should be_true #make sure work got done
(sessions.sessions(url).instance_variable_get(:@pool).length >= 5).should be_true #make sure work got done
failed.should be_false # Make sure our requests worked
sessions.close_all
EM.stop
end
end
Expand Down

0 comments on commit b1cad23

Please sign in to comment.