Permalink
Browse files

Port the specs over and fix the typo bugs, enable Travis CI, bump to …

…1.0.1.
  • Loading branch information...
1 parent d6b922b commit d59902da348c9d40aac26f9ed11dac72e7b51c98 @seancribbs seancribbs committed Jun 1, 2012
Showing with 341 additions and 9 deletions.
  1. +13 −0 .travis.yml
  2. +2 −0 Gemfile
  3. +4 −4 README.md
  4. +3 −1 innertube.gemspec
  5. +3 −3 lib/innertube.rb
  6. +1 −1 lib/innertube/version.rb
  7. +298 −0 spec/innertube_spec.rb
  8. +17 −0 spec/spec_helper.rb
View
@@ -0,0 +1,13 @@
+language: ruby
+script: bundle exec rspec --color
+rvm:
+ - 1.8.7
+ - 1.9.2
+ - 1.9.3
+ - jruby-18mode
+ - jruby-19mode
+ - jruby-head
+ - rbx-18mode
+ - rbx-19mode
+notifications:
+ email: false
View
@@ -1 +1,3 @@
source "http://rubygems.org"
+
+gemspec
View
@@ -16,18 +16,18 @@ licensed under the Apache 2.0 License.
pool = Innertube::Pool.new(proc { Connection.new },
proc {|c| c.disconnect })
-# Grab a connection from the pool, returns the same value
+# Grab a connection from the pool, returns the same value
# as the block
-pool.take {|conn| conn.ping } # => true
+pool.take {|conn| conn.ping } # => true
-# Raise the BadResource exception if the resource is no
+# Raise the BadResource exception if the resource is no
# longer good
pool.take do |conn|
raise Innertube::Pool::BadResource unless conn.connected?
conn.ping
end
-# Innertube helps your code be re-entrant! Take more resources
+# Innertube helps your code be re-entrant! Take more resources
# while you have one checked out.
pool.take do |conn|
conn.stream_tweets do |tweet|
View
@@ -10,7 +10,9 @@ Gem::Specification.new do |gem|
gem.homepage = "http://github.com/basho/innertube"
gem.authors = ["Sean Cribbs", "Kyle Kingsbury"]
- # Files
+ gem.add_development_dependency 'rspec', '~> 2.10.0'
+
+ # Files
ignores = File.read(".gitignore").split(/\r?\n/).reject{ |f| f =~ /^(#.+|\s*)$/ }.map {|f| Dir[f] }.flatten
gem.files = (Dir['**/*','.gitignore'] - ignores).reject {|f| !File.file?(f) }
# gem.test_files = (Dir['spec/**/*','.gitignore'] - ignores).reject {|f| !File.file?(f) }
View
@@ -28,7 +28,7 @@ def initialize(object)
# Claims this element of the pool for the current Thread.
# Do not call this manually, it is only used from inside the pool.
def lock
- self.owner = Thread.current
+ @owner = Thread.current
end
# @return [true,false] Is this element locked/claimed?
@@ -38,7 +38,7 @@ def locked?
# Releases this element of the pool from the current Thread.
def unlock
- self.owner = nil
+ @owner = nil
end
# @return [true,false] Is this element available for use?
@@ -114,7 +114,7 @@ def take(opts = {})
element = nil
opts[:filter] ||= proc {|_| true }
@lock.synchronize do
- element = pool.find { |e| e.unlocked? && opts[:filter].call(e.object) }
+ element = @pool.find { |e| e.unlocked? && opts[:filter].call(e.object) }
unless element
# No objects were acceptable
resource = opts[:default] || @open.call
View
@@ -1,3 +1,3 @@
module Innertube
- VERSION = "1.0.0"
+ VERSION = "1.0.1"
end
View
@@ -0,0 +1,298 @@
+require 'spec_helper'
+require 'thread'
+
+describe Innertube::Pool do
+ let(:pool_members) { subject.instance_variable_get(:@pool) }
+
+ describe 'basics' do
+ subject do
+ described_class.new(lambda { [0] }, lambda { |x| })
+ end
+
+ it 'yields a new object' do
+ subject.take do |x|
+ x.should == [0]
+ end
+ end
+
+ it 'retains a single object for serial access' do
+ n = 100
+ n.times do |i|
+ subject.take do |x|
+ x.should == [i]
+ x[0] += 1
+ end
+ end
+ subject.size.should == 1
+ end
+
+ it 'should be re-entrant' do
+ n = 10
+ n.times do |i|
+ subject.take do |x|
+ x.replace [1]
+ subject.take do |y|
+ y.replace [2]
+ subject.take do |z|
+ z.replace [3]
+ subject.take do |t|
+ t.replace [4]
+ end
+ end
+ end
+ end
+ end
+ subject.instance_variable_get(:@pool).map { |e| e.object.first }.sort.should == [1,2,3,4]
+ end
+
+
+ it 'should unlock when exceptions are raised' do
+ begin
+ subject.take do |x|
+ x << 1
+ subject.take do |y|
+ x << 2
+ y << 3
+ raise
+ end
+ end
+ rescue
+ end
+ pool_members.should be_all {|e| not e.owner }
+ pool_members.map { |e| e.object }.should =~ [[0,1,2],[0,3]]
+ end
+
+ context 'when BadResource is raised' do
+ subject do
+ described_class.new(lambda do
+ m = mock('resource')
+ m.should_receive(:close)
+ m
+ end,
+ lambda do |res|
+ res.close
+ end)
+ end
+
+ it 'should delete' do
+ lambda do
+ subject.take do |x|
+ raise Innertube::Pool::BadResource
+ end
+ end.should raise_error(Innertube::Pool::BadResource)
+ subject.size.should == 0
+ end
+ end
+ end
+
+ describe 'threads' do
+ subject {
+ described_class.new(lambda { [] }, lambda { |x| })
+ }
+
+ it 'should allocate n objects for n concurrent operations' do
+ # n threads concurrently allocate and sign objects from the pool
+ n = 10
+ readyq = Queue.new
+ finishq = Queue.new
+ threads = (0...n).map do
+ Thread.new do
+ subject.take do |x|
+ readyq << 1
+ x << Thread.current
+ finishq.pop
+ end
+ end
+ end
+
+ n.times { readyq.pop }
+ n.times { finishq << 1 }
+
+ # Wait for completion
+ threads.each do |t|
+ t.join
+ end
+
+ # Should have taken exactly n objects to do this
+ subject.size.should == n
+ # And each one should be signed exactly once
+ pool_members.map do |e|
+ e.object.size.should == 1
+ e.object.first
+ end.should =~ threads
+ end
+
+ it 'take with filter and default' do
+ n = 10
+ subject = described_class.new(lambda { [] }, lambda { |x| })
+
+ # Allocate several elements of the pool
+ q = Queue.new
+ threads = (0...n).map do |i|
+ Thread.new do
+ subject.take do |a|
+ a << i
+ q << 1
+ sleep 0.02
+ end
+ end
+ end
+
+ # Wait for all threads to have acquired an element
+ n.times { q.pop }
+
+ threads.each do |t|
+ t.join
+ end
+
+ # Get and delete existing even elements
+ got = Set.new
+ (n / 2).times do
+ begin
+ subject.take(
+ :filter => lambda { |x| x.first.even? },
+ :default => [:default]
+ ) do |x|
+ got << x.first
+ raise Innertube::Pool::BadResource
+ end
+ rescue Innertube::Pool::BadResource
+ end
+ end
+ got.should == (0...n).select(&:even?).to_set
+
+ # This time, no even elements exist, so we should get the default.
+ subject.take(:filter => lambda { |x| x.first.even? },
+ :default => :default) do |x|
+ x.should == :default
+ end
+ end
+
+ it 'iterates over a snapshot of all connections, even ones in use' do
+ started = Queue.new
+ n = 30
+ threads = (0..n).map do
+ Thread.new do
+ psleep = 0.75 * rand # up to 50ms sleep
+ subject.take do |a|
+ started << 1
+ a << rand
+ sleep psleep
+ end
+ end
+ end
+
+ n.times { started.pop }
+ touched = []
+
+ subject.each do |e|
+ touched << e
+ end
+
+ threads.each do |t|
+ t.join
+ end
+
+ touched.should be_all {|item| pool_members.find {|e| e.object == item } }
+ end
+
+ it 'should clear' do
+ n = 10
+ subject = described_class.new(
+ lambda { mock('connection').tap {|m| m.should_receive(:teardown) }},
+ lambda { |b| b.teardown }
+ )
+
+ # Allocate several elements of the pool
+ q = Queue.new
+ threads = (0...n).map do |i|
+ Thread.new do
+ subject.take do |a|
+ q << 1
+ sleep 0.1
+ end
+ end
+ end
+
+ # Wait for all threads to have acquired an element
+ n.times { q.pop }
+
+ # Clear the pool while threads still have elements checked out
+ subject.clear
+ pool_members.should be_empty
+
+ # Wait for threads to complete
+ threads.each do |t|
+ t.join
+ end
+ end
+
+ it 'should delete_if' do
+ n = 10
+ subject = described_class.new(
+ lambda { [] },
+ lambda { |x| }
+ )
+
+ # Allocate several elements of the pool
+ q = Queue.new
+ threads = (0...n).map do |i|
+ Thread.new do
+ subject.take do |a|
+ a << i
+ q << 1
+ sleep 0.02
+ end
+ end
+ end
+
+ # Wait for all threads to have acquired an element
+ n.times { q.pop }
+
+ # Delete odd elements
+ subject.delete_if do |x|
+ x.first.odd?
+ end
+
+ # Verify odds are gone.
+ pool_members.all? do |x|
+ x.object.first.even?
+ end.should == true
+
+ # Wait for threads
+ threads.each do |t|
+ t.join
+ end
+ end
+
+ it 'stress test', :slow => true do
+ n = 100
+ psleep = 0.8
+ tsleep = 0.01
+ rounds = 100
+
+ threads = (0...n).map do
+ Thread.new do
+ rounds.times do |i|
+ subject.take do |a|
+ a.should == []
+ a << Thread.current
+ a.should == [Thread.current]
+
+ # Sleep and check
+ while rand < psleep
+ sleep tsleep
+ a.should == [Thread.current]
+ end
+
+ a.delete Thread.current
+ end
+ end
+ end
+ end
+ threads.each do |t|
+ t.join
+ end
+ end
+ end
+end
Oops, something went wrong.

0 comments on commit d59902d

Please sign in to comment.