Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial commit. Thanks @aphyr @freelancing-god.

  • Loading branch information...
commit d6b922b31cb6c308d72cbe79d4da9ff34c1c24bd 0 parents
@seancribbs seancribbs authored
26 .gitignore
@@ -0,0 +1,26 @@
+## MAC OS
+.DS_Store
+
+## TEXTMATE
+*.tmproj
+tmtags
+
+## EMACS
+*~
+\#*
+.\#*
+
+## VIM
+*.swp
+
+## PROJECT::GENERAL
+coverage
+rdoc
+pkg
+
+## PROJECT::SPECIFIC
+.bundle
+Gemfile.lock
+**/bin
+*.rbc
+.rvmrc
1  Gemfile
@@ -0,0 +1 @@
+source "http://rubygems.org"
16 LICENSE
@@ -0,0 +1,16 @@
+Copyright 2011-2012 Sean Cribbs, Kyle Kingsbury and Basho Technologies, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+All of the files in this project are under the project-wide license
+unless they are otherwise marked.
55 README.md
@@ -0,0 +1,55 @@
+# Innertube
+
+Innertube is a thread-safe, re-entrant resource pool, extracted from
+the [Riak Ruby Client](/basho/riak-ruby-client), where it was used to
+pool connections to [Riak](/basho/riak). It is free to use and modify,
+licensed under the Apache 2.0 License.
+
+## Example
+
+```ruby
+# -------------------------------------------------------
+# Basics
+# -------------------------------------------------------
+
+# Create a pool with open/close callables
+pool = Innertube::Pool.new(proc { Connection.new },
+ proc {|c| c.disconnect })
+
+# Grab a connection from the pool, returns the same value
+# as the block
+pool.take {|conn| conn.ping } # => true
+
+# Raise the BadResource exception if the resource is no
+# longer good
+pool.take do |conn|
+ raise Innertube::Pool::BadResource unless conn.connected?
+ conn.ping
+end
+
+# Innertube helps your code be re-entrant! Take more resources
+# while you have one checked out.
+pool.take do |conn|
+ conn.stream_tweets do |tweet|
+ pool.take {|conn2| conn2.increment :tweets }
+ end
+end
+
+# -------------------------------------------------------
+# Iterations: These are slow because they have guarantees
+# about visiting all current elements of the pool.
+# -------------------------------------------------------
+
+# Do something with every connection in the pool
+pool.each {|conn| puts conn.get_stats }
+
+# Expunge some expired connections from the pool
+pool.delete_if {|conn| conn.idle_time > 5 }
+```
+
+## Credits
+
+The pool was originally implemented by [Kyle Kingsbury](/aphyr) and
+extracted by [Sean Cribbs](/seancribbs), when bugged about it by
+[Pat Allan](/freelancing-god) at
+[EuRuKo 2012](http://www.euruko2012.org/).
18 innertube.gemspec
@@ -0,0 +1,18 @@
+$:.push File.expand_path("../lib", __FILE__)
+require 'innertube/version'
+
+Gem::Specification.new do |gem|
+ gem.name = 'innertube'
+ gem.version = Innertube::VERSION
+ gem.summary = "A thread-safe resource pool, originally borne in riak-client (Ripple)."
+ gem.description = "Because everyone needs their own pool library."
+ gem.email = [ "sean@basho.com", "aphyr@aphyr.com" ]
+ gem.homepage = "http://github.com/basho/innertube"
+ gem.authors = ["Sean Cribbs", "Kyle Kingsbury"]
+
+ # Files
+ ignores = File.read(".gitignore").split(/\r?\n/).reject{ |f| f =~ /^(#.+|\s*)$/ }.map {|f| Dir[f] }.flatten
+ gem.files = (Dir['**/*','.gitignore'] - ignores).reject {|f| !File.file?(f) }
+ # gem.test_files = (Dir['spec/**/*','.gitignore'] - ignores).reject {|f| !File.file?(f) }
+ gem.require_paths = ['lib']
+end
188 lib/innertube.rb
@@ -0,0 +1,188 @@
+require 'thread'
+
+# Innertube is a re-entrant thread-safe resource pool that was
+# extracted from the Riak Ruby Client
+# (https://github.com/basho/riak-ruby-client).
+# @see Pool
+module Innertube
+ # A re-entrant thread-safe resource pool that generates new resources on
+ # demand.
+ # @private
+ class Pool
+ # Raised when a taken element should be deleted from the pool.
+ class BadResource < RuntimeError; end
+
+ # An element of the pool. Comprises an object with an owning
+ # thread. Not usually needed by user code, and should not be
+ # modified outside the {Pool}'s lock.
+ class Element
+ attr_reader :object, :owner
+
+ # Creates a pool element
+ # @param [Object] object the resource to wrap into the pool element
+ def initialize(object)
+ @object = object
+ @owner = nil
+ end
+
+ # Claims this element of the pool for the current Thread.
+ # Do not call this manually, it is only used from inside the pool.
+ def lock
+ self.owner = Thread.current
+ end
+
+ # @return [true,false] Is this element locked/claimed?
+ def locked?
+ !unlocked?
+ end
+
+ # Releases this element of the pool from the current Thread.
+ def unlock
+ self.owner = nil
+ end
+
+ # @return [true,false] Is this element available for use?
+ def unlocked?
+ owner.nil?
+ end
+ end
+
+ # Creates a new resource pool.
+ # @param [Proc, #call] open a callable which allocates a new object for the
+ # pool
+ # @param [Proc, #call] close a callable which is called with an
+ # object before it is freed.
+ def initialize(open, close)
+ @open = open
+ @close = close
+ @lock = Mutex.new
+ @iterator = Mutex.new
+ @element_released = ConditionVariable.new
+ @pool = Set.new
+ end
+
+ # On each element of the pool, calls close(element) and removes it.
+ # @private
+ def clear
+ each_element do |e|
+ delete_element e
+ end
+ end
+ alias :close :clear
+
+ # Deletes an element of the pool. Calls the close callback on its object.
+ # Not intended for external use.
+ # @param [Element] e the element to remove from the pool
+ def delete_element(e)
+ @close.call(e.object)
+ @lock.synchronize do
+ @pool.delete e
+ end
+ end
+ private :delete_element
+
+ # Locks each element in turn and closes/deletes elements for which the
+ # object passes the block.
+ # @yield [object] a block that should determine whether an element
+ # should be deleted from the pool
+ # @yieldparam [Object] object the resource
+ def delete_if
+ raise ArgumentError, "block required" unless block_given?
+
+ each_element do |e|
+ if yield e.object
+ delete_element e
+ end
+ end
+ end
+
+ # Acquire an element of the pool. Yields the object. If all
+ # elements are claimed, it will create another one.
+ # @yield [resource] a block that will perform some action with the
+ # element of the pool
+ # @yieldparam [Object] resource a resource managed by the pool.
+ # Locked for the duration of the block
+ # @param [Proc, #call] :filter a callable which receives objects and has
+ # the opportunity to reject each in turn.
+ # @param [Object] :default if no resources are available, use this object
+ # instead of calling #open.
+ # @private
+ def take(opts = {})
+ raise ArgumentError, "block required" unless block_given?
+
+ result = nil
+ element = nil
+ opts[:filter] ||= proc {|_| true }
+ @lock.synchronize do
+ element = pool.find { |e| e.unlocked? && opts[:filter].call(e.object) }
+ unless element
+ # No objects were acceptable
+ resource = opts[:default] || @open.call
+ element = Element.new(resource)
+ @pool << element
+ end
+ element.lock
+ end
+ begin
+ result = yield element.object
+ rescue BadResource
+ delete_element element
+ raise
+ ensure
+ # Unlock
+ if element
+ element.unlock
+ @element_released.signal
+ end
+ end
+ result
+ end
+ alias >> take
+
+ # Iterate over a snapshot of the pool. Yielded objects are locked
+ # for the duration of the block. This may block the current thread
+ # until elements in the snapshot are released by other threads.
+ # @yield [element] a block that will do something with each
+ # element in the pool
+ # @yieldparam [Element] element the current element in the
+ # iteration
+ def each_element
+ targets = @pool.to_a
+ unlocked = []
+
+ @iterator.synchronize do
+ until targets.empty?
+ @lock.synchronize do
+ unlocked, targets = targets.partition {|e| e.unlocked? }
+ unlocked.each {|e| e.lock }
+ end
+
+ unlocked.each do |e|
+ begin
+ yield e
+ ensure
+ e.unlock
+ end
+ end
+ @element_released.wait(@iterator) unless targets.empty?
+ end
+ end
+ end
+
+ # As each_element, but yields objects, not wrapper elements.
+ # @yield [resource] a block that will do something with each
+ # resource in the pool
+ # @yieldparam [Object] resource the current resource in the
+ # iteration
+ def each
+ each_element do |e|
+ yield e.object
+ end
+ end
+
+ # @return [Integer] the number of the resources in the pool
+ def size
+ @lock.synchronize { @pool.size }
+ end
+ end
+end
3  lib/innertube/version.rb
@@ -0,0 +1,3 @@
+module Innertube
+ VERSION = "1.0.0"
+end
Please sign in to comment.
Something went wrong with that request. Please try again.