Permalink
Browse files

support arbitrary iteration methods with threadify

  • Loading branch information...
ahoward committed Jun 9, 2009
1 parent 86d43db commit 5fe0ad6c4e019c4f1f394d210fd11e837da2b917
Showing with 138 additions and 58 deletions.
  1. +60 −29 README
  2. +38 −0 README.erb
  3. +14 −6 lib/threadify.rb
  4. +10 −15 sample/a.rb
  5. +14 −6 sample/b.rb
  6. +2 −2 threadify.gemspec
View
89 README
@@ -1,50 +1,56 @@
NAME
- threadify
+ threadify.rb
+
+SYNOPSIS
+ enumerable = %w( a b c d )
+ enumerable.threadify(2){ 'process this block using two worker threads' }
+ enumerable.threadify(:each_slice, 4){ 'process each slice of 4 in a thread' }
DESCRIPTION
+ threadify.rb makes it stupid easy to process a bunch of data using 'n'
+ worker threads
INSTALL
gem install threadify
+URI
+ http://rubyforge.org/projects/codeforpeople
+ http://github.com/ahoward/threadify/
+
SAMPLES
<========< sample/a.rb >========>
~ > cat sample/a.rb
- require 'open-uri'
- require 'yaml'
-
- require 'rubygems'
require 'threadify'
+ require 'open-uri'
+ require 'yaml'
uris =
%w(
+ http://codeforpeople.com
+ http://drawohara.com
+ http://twitter.com/drawohara
+ http://github.com/ahoward/threadify/
http://google.com
- http://yahoo.com
http://rubyforge.org
http://ruby-lang.org
- http://kcrw.org
- http://drawohara.com
- http://codeforpeople.com
+ http://hypem.com
)
+ curl = lambda{|url| open(url){|socket| socket.read}}
time 'without threadify' do
- uris.each do |uri|
- body = open(uri){|pipe| pipe.read}
- end
+ uris.each{|uri| curl[uri]}
end
time 'with threadify' do
- uris.threadify do |uri|
- body = open(uri){|pipe| pipe.read}
- end
+ uris.threadify(uris.size){|uri| curl[uri]}
end
-
BEGIN {
def time label
a = Time.now.to_f
@@ -58,33 +64,41 @@ SAMPLES
~ > ruby sample/a.rb
---
- without threadify: 7.70478916168213
+ without threadify: 12.6283850669861
---
- with threadify: 1.69362306594849
+ with threadify: 1.93842315673828
<========< sample/b.rb >========>
~ > cat sample/b.rb
- require 'yaml'
-
- require 'rubygems'
require 'threadify'
- size = Integer(ARGV.shift || (2 ** 15))
+ require 'yaml'
+
+ #size = Integer(ARGV.shift || (2 ** 20))
+ size = 64
haystack = Array.new(size){|i| i}
needle = 2 * (size / 3)
a, b = 4, 2
time 'without threadify' do
- a = haystack.each{|value| break value if value == needle}
+ a =
+ haystack.each do |value|
+ break value if value == needle
+ sleep(rand) # mimic work
+ end
end
time 'with threadify' do
- b = haystack.threadify(16){|value| threadify! value if value == needle}
+ b =
+ haystack.threadify(:each_slice, size / 8) do |slice|
+ slice.each{|value| threadify! value if value == needle}
+ sleep(rand) # mimic work
+ end
end
raise if a != b
@@ -104,12 +118,29 @@ SAMPLES
~ > ruby sample/b.rb
---
- without threadify: 0.00685286521911621
+ without threadify: 22.8657438755035
---
- with threadify: 0.546236038208008
+ with threadify: 0.967911005020142
---
- :needle: 21844
- :a: 21844
- :b: 21844
+ :needle: 42
+ :a: 42
+ :b: 42
+
+
+
+
+HISTORY
+ 1.1.0
+ - added ability to specify arbitrary iterator (not only each)
+ [0,1,2,3].threadify(:each_slice, 2){|ary| ary}
+ - update samples
+ - auto include enumerator
+ 0.0.3
+ - added ability to short-circuit the parallel processing, a.k.a to 'break'
+ from threadify
+ 0.0.2
+ - don't use thread.exit, just let the thread die naturally
+ - add version to Threadify module
+ - comments ;-)
View
@@ -0,0 +1,38 @@
+NAME
+ threadify.rb
+
+SYNOPSIS
+ enumerable = %w( a b c d )
+ enumerable.threadify(2){ 'process this block using two worker threads' }
+ enumerable.threadify(:each_slice, 4){ 'process each slice of 4 in a thread' }
+
+DESCRIPTION
+ threadify.rb makes it stupid easy to process a bunch of data using 'n'
+ worker threads
+
+INSTALL
+ gem install threadify
+
+URI
+ http://rubyforge.org/projects/codeforpeople
+ http://github.com/ahoward/threadify/
+
+SAMPLES
+ <%= samples %>
+
+
+HISTORY
+ 1.1.0
+ - added ability to specify arbitrary iterator (not only each)
+ [0,1,2,3].threadify(:each_slice, 2){|ary| ary}
+ - update samples
+ - auto include enumerator
+
+ 0.0.3
+ - added ability to short-circuit the parallel processing, a.k.a to 'break'
+ from threadify
+
+ 0.0.2
+ - don't use thread.exit, just let the thread die naturally
+ - add version to Threadify module
+ - comments ;-)
View
@@ -1,26 +1,34 @@
module Threadify
- VERSION = '1.0.0'
+ Threadify::VERSION = '1.1.0' unless defined?(Threadify::VERSION)
def Threadify.version() Threadify::VERSION end
require 'thread'
+ require 'enumerator'
@threads = 8
@abort_on_exception = true
+ @strategy = [:each]
class << self
attr_accessor :threads
attr_accessor :abort_on_exception
+ attr_accessor :strategy
end
class Error < ::StandardError; end
end
module Enumerable
- def threadify(opts = {}, &block)
+ def threadify(*args, &block)
# setup
#
- opts = {:threads => opts} if Numeric === opts
- threads = Integer(opts[:threads] || opts['threads'] || Threadify.threads)
+ opts = args.last.is_a?(Hash) ? args.pop : {}
+ opts.keys.each{|key| opts[key.to_s.to_sym] = opts.delete(key)}
+ opts[:threads] ||= (Numeric === args.first ? args.shift : Threadify.threads)
+ opts[:strategy] ||= (args.empty? ? Threadify.strategy : args)
+
+ threads = Integer(opts[:threads])
+ strategy = opts[:strategy]
done = Object.new.freeze
nothing = done
jobs = Array.new(threads).map{ [] }
@@ -29,7 +37,7 @@ def threadify(opts = {}, &block)
# produce jobs
#
i = 0
- each{|*args| jobs[i % threads].push([args, i]); i += 1}
+ send(*strategy){|*args| jobs[i % threads].push([args, i]); i += 1}
threads.times{|i| jobs[i].push(done)}
# setup consumer list
@@ -76,8 +84,8 @@ def threadify(opts = {}, &block)
unless nothing == thrown
- args, i = job
thrownq.push [i, thrown]
+ args, i = job
end
end
end
View
@@ -1,36 +1,31 @@
-require 'open-uri'
-require 'yaml'
-
-require 'rubygems'
require 'threadify'
+require 'open-uri'
+require 'yaml'
uris =
%w(
+ http://codeforpeople.com
+ http://drawohara.com
+ http://twitter.com/drawohara
+ http://github.com/ahoward/threadify/
http://google.com
- http://yahoo.com
http://rubyforge.org
http://ruby-lang.org
- http://kcrw.org
- http://drawohara.com
- http://codeforpeople.com
+ http://hypem.com
)
+curl = lambda{|url| open(url){|socket| socket.read}}
time 'without threadify' do
- uris.each do |uri|
- body = open(uri){|pipe| pipe.read}
- end
+ uris.each{|uri| curl[uri]}
end
time 'with threadify' do
- uris.threadify do |uri|
- body = open(uri){|pipe| pipe.read}
- end
+ uris.threadify(uris.size){|uri| curl[uri]}
end
-
BEGIN {
def time label
a = Time.now.to_f
View
@@ -1,21 +1,29 @@
-require 'yaml'
-
-require 'rubygems'
require 'threadify'
-size = Integer(ARGV.shift || (2 ** 15))
+require 'yaml'
+
+#size = Integer(ARGV.shift || (2 ** 20))
+size = 64
haystack = Array.new(size){|i| i}
needle = 2 * (size / 3)
a, b = 4, 2
time 'without threadify' do
- a = haystack.each{|value| break value if value == needle}
+ a =
+ haystack.each do |value|
+ break value if value == needle
+ sleep(rand) # mimic work
+ end
end
time 'with threadify' do
- b = haystack.threadify(16){|value| threadify! value if value == needle}
+ b =
+ haystack.threadify(:each_slice, size / 8) do |slice|
+ slice.each{|value| threadify! value if value == needle}
+ sleep(rand) # mimic work
+ end
end
raise if a != b
View
@@ -3,11 +3,11 @@
Gem::Specification::new do |spec|
spec.name = "threadify"
- spec.version = "1.0.0"
+ spec.version = "1.1.0"
spec.platform = Gem::Platform::RUBY
spec.summary = "threadify"
- spec.files = ["gemspec.rb", "install.rb", "lib", "lib/threadify.rb", "rakefile", "README", "README.rb", "sample", "sample/a.rb", "sample/b.rb", "threadify.gemspec"]
+ spec.files = ["lib", "lib/threadify.rb", "rakefile", "README", "README.erb", "sample", "sample/a.rb", "sample/b.rb", "threadify.gemspec"]
spec.executables = []
spec.require_path = "lib"

0 comments on commit 5fe0ad6

Please sign in to comment.