Skip to content
Browse files

vendor parallel so install voodoo can stop

  • Loading branch information...
1 parent 30aa23c commit 89d09b6a25ef9b52e34a0065b911a78f7cf96621 @grosser committed
Showing with 276 additions and 23 deletions.
  1. +1 −2 Gemfile
  2. +0 −2 Gemfile.lock
  3. +0 −13 Rakefile
  4. +2 −5 lib/rubygems/dependent.rb
  5. +272 −0 lib/rubygems/dependent_parallel.rb
  6. +1 −1 spec/dependent_spec.rb
View
3 Gemfile
@@ -4,5 +4,4 @@ group :dev do
gem 'rake'
gem 'rspec', '~>2'
gem 'jeweler'
- gem 'parallel', '>= 0.5.1'
-end
+end
View
2 Gemfile.lock
@@ -9,7 +9,6 @@ GEM
git (>= 1.2.5)
rubyforge (>= 2.0.0)
json_pure (1.4.6)
- parallel (0.5.1)
rake (0.8.7)
rspec (2.0.1)
rspec-core (~> 2.0.1)
@@ -29,6 +28,5 @@ PLATFORMS
DEPENDENCIES
jeweler
- parallel (>= 0.5.1)
rake
rspec (~> 2)
View
13 Rakefile
@@ -12,19 +12,6 @@ begin
gem.email = "grosser.michael@gmail.com"
gem.homepage = "http://github.com/grosser/#{gem.name}"
gem.authors = ["Michael Grosser"]
-
-
- # if parallel was a dependency, every requirement of rubygems would
- # load parallel, which causes overhead and problems
- gem.post_install_message = <<-POST_INSTALL_MESSAGE
-#{'*'*50}
-
-Since parallel cannot be a dependency, please install by hand:
-
-gem install parallel
-
-#{'*'*50}
-POST_INSTALL_MESSAGE
end
Jeweler::GemcutterTasks.new
View
7 lib/rubygems/dependent.rb
@@ -1,9 +1,6 @@
-require 'parallel'
+require 'rubygems/dependent_parallel'
require 'rubygems/spec_fetcher'
-# older parallel versions can produce strange bugs
-puts "update parallel gem" if Parallel::VERSION < '0.5.1'
-
module Gem
class Dependent
VERSION = File.read( File.join(File.dirname(__FILE__),'..','..','VERSION') ).strip
@@ -34,7 +31,7 @@ def self.find(gem, options={})
def self.fetch_all_dependencies(specs_and_sources, options)
parallel = (options[:parallel] || 15)
- Parallel.map(specs_and_sources, :in_processes => parallel) do |spec, source|
+ Gem::Dependent::Parallel.map(specs_and_sources, :in_processes => parallel) do |spec, source|
yield if block_given?
name = spec.first
dependencies = fetch_dependencies(spec, source)
View
272 lib/rubygems/dependent_parallel.rb
@@ -0,0 +1,272 @@
+# if parallel was a dependency, every requirement of rubygems would
+# load parallel, which causes overhead and problems
+# copied from https://github.com/grosser/parallel/blob/master/lib/parallel.rb
+
+require 'thread' # to get Thread.exclusive
+require 'base64'
+require 'rbconfig'
+
+module Gem;end
+class Gem::Dependent;end
+class Gem::Dependent::Parallel
+ VERSION = '0.5.8'
+
+ def self.in_threads(options={:count => 2})
+ count, options = extract_count_from_options(options)
+
+ out = []
+ threads = []
+
+ count.times do |i|
+ threads[i] = Thread.new do
+ out[i] = yield(i)
+ end
+ end
+
+ wait_for_threads(threads)
+
+ out
+ end
+
+ def self.in_processes(options = {}, &block)
+ count, options = extract_count_from_options(options)
+ count ||= processor_count
+ map(0...count, options.merge(:in_processes => count), &block)
+ end
+
+ def self.each(array, options={}, &block)
+ map(array, options.merge(:preserve_results => false), &block)
+ array
+ end
+
+ def self.each_with_index(array, options={}, &block)
+ each(array, options.merge(:with_index => true), &block)
+ end
+
+ def self.map(array, options = {}, &block)
+ array = array.to_a # turn Range and other Enumerable-s into an Array
+
+ if options[:in_threads]
+ method = :in_threads
+ size = options[method]
+ else
+ method = :in_processes
+ size = options[method] || processor_count
+ end
+ size = [array.size, size].min
+
+ return work_direct(array, options, &block) if size == 0
+
+ if method == :in_threads
+ work_in_threads(array, options.merge(:count => size), &block)
+ else
+ work_in_processes(array, options.merge(:count => size), &block)
+ end
+ end
+
+ def self.map_with_index(array, options={}, &block)
+ map(array, options.merge(:with_index => true), &block)
+ end
+
+ def self.processor_count
+ case RbConfig::CONFIG['host_os']
+ when /darwin9/
+ `hwprefs cpu_count`.to_i
+ when /darwin/
+ (hwprefs_available? ? `hwprefs thread_count` : `sysctl -n hw.ncpu`).to_i
+ when /linux/
+ `grep -c processor /proc/cpuinfo`.to_i
+ when /freebsd/
+ `sysctl -n hw.ncpu`.to_i
+ when /mswin|mingw/
+ require 'win32ole'
+ wmi = WIN32OLE.connect("winmgmts://")
+ cpu = wmi.ExecQuery("select NumberOfLogicalProcessors from Win32_Processor")
+ cpu.to_enum.first.NumberOfLogicalProcessors
+ end
+ end
+
+ private
+
+ def self.work_direct(array, options)
+ results = []
+ array.each_with_index do |e,i|
+ results << (options[:with_index] ? yield(e,i) : yield(e))
+ end
+ results
+ end
+
+ def self.hwprefs_available?
+ `which hwprefs` != ''
+ end
+
+ def self.work_in_threads(items, options, &block)
+ results = []
+ current = -1
+ exception = nil
+
+ in_threads(options[:count]) do
+ # as long as there are more items, work on one of them
+ loop do
+ break if exception
+
+ index = Thread.exclusive{ current+=1 }
+ break if index >= items.size
+
+ begin
+ results[index] = call_with_index(items, index, options, &block)
+ rescue Exception => e
+ exception = e
+ break
+ end
+ end
+ end
+
+ raise exception if exception
+
+ results
+ end
+
+ def self.work_in_processes(items, options, &blk)
+ current_index = -1
+ results = []
+ pids = []
+ exception = nil
+
+ kill_on_ctrl_c(pids)
+
+ in_threads(options[:count]) do |i|
+ x = i
+ worker = worker(items, options, &blk)
+ pids[i] = worker[:pid]
+
+ begin
+ loop do
+ break if exception
+ index = Thread.exclusive{ current_index += 1 }
+ break if index >= items.size
+
+ write_to_pipe(worker[:write], index)
+ output = decode(worker[:read].gets.chomp)
+
+ if ExceptionWrapper === output
+ exception = output.exception
+ else
+ results[index] = output
+ end
+ end
+ ensure
+ worker[:read].close
+ worker[:write].close
+
+ # if it goes zombie, rather wait here to be able to debug
+ wait_for_process worker[:pid]
+ end
+ end
+
+ raise exception if exception
+
+ results
+ end
+
+ def self.worker(items, options, &block)
+ # use less memory on REE
+ GC.copy_on_write_friendly = true if GC.respond_to?(:copy_on_write_friendly=)
+
+ child_read, parent_write = IO.pipe
+ parent_read, child_write = IO.pipe
+
+ pid = Process.fork do
+ begin
+ parent_write.close
+ parent_read.close
+
+ process_incoming_jobs(child_read, child_write, items, options, &block)
+ ensure
+ child_read.close
+ child_write.close
+ end
+ end
+
+ child_read.close
+ child_write.close
+
+ {:read => parent_read, :write => parent_write, :pid => pid}
+ end
+
+ def self.process_incoming_jobs(read, write, items, options, &block)
+ while input = read.gets and input != "\n"
+ index = decode(input.chomp)
+ begin
+ result = call_with_index(items, index, options, &block)
+ result = nil if options[:preserve_results] == false
+ rescue Exception => e
+ result = ExceptionWrapper.new(e)
+ end
+ write_to_pipe(write, result)
+ end
+ end
+
+ def self.write_to_pipe(pipe, item)
+ pipe.write(encode(item))
+ end
+
+ def self.wait_for_threads(threads)
+ threads.compact.each do |t|
+ begin
+ t.join
+ rescue Interrupt
+ # thread died, do not stop other threads
+ end
+ end
+ end
+
+ def self.wait_for_process(pid)
+ begin
+ Process.wait(pid)
+ rescue Interrupt
+ # process died
+ end
+ end
+
+ def self.encode(obj)
+ Base64.encode64(Marshal.dump(obj)).split("\n").join + "\n"
+ end
+
+ def self.decode(str)
+ Marshal.load(Base64.decode64(str))
+ end
+
+ # options is either a Integer or a Hash with :count
+ def self.extract_count_from_options(options)
+ if options.is_a?(Hash)
+ count = options[:count]
+ else
+ count = options
+ options = {}
+ end
+ [count, options]
+ end
+
+ # kill all these processes (children) if user presses Ctrl+c
+ def self.kill_on_ctrl_c(pids)
+ Signal.trap :SIGINT do
+ $stderr.puts 'Parallel execution interrupted, exiting ...'
+ pids.each { |pid| Process.kill(:KILL, pid) if pid }
+ exit 1 # Quit with 'failed' signal
+ end
+ end
+
+ def self.call_with_index(array, index, options, &block)
+ args = [array[index]]
+ args << index if options[:with_index]
+ block.call(*args)
+ end
+
+ class ExceptionWrapper
+ attr_reader :exception
+ def initialize(exception)
+ @exception = exception
+ end
+ end
+end
View
2 spec/dependent_spec.rb
@@ -53,7 +53,7 @@ def stub_source(gem_source = nil)
it "obeys parallel option" do
stub_source
- Parallel.should_receive(:map).with(anything, :in_processes => 3).and_return []
+ Gem::Dependent::Parallel.should_receive(:map).with(anything, :in_processes => 3).and_return []
Gem::Dependent.find('hoe', :parallel => 3)
end

0 comments on commit 89d09b6

Please sign in to comment.
Something went wrong with that request. Please try again.