From 7d4469443ca046762770894dc97fe5162a65b4ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Wed, 17 Sep 2014 12:58:39 +0200 Subject: [PATCH 01/18] use absolute constant references consistently in future.rb --- lib/futuroscope/future.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/futuroscope/future.rb b/lib/futuroscope/future.rb index 601815f..be574d3 100644 --- a/lib/futuroscope/future.rb +++ b/lib/futuroscope/future.rb @@ -8,7 +8,7 @@ module Futuroscope # the future. That is, will block when the result is not ready until it is, # and will return it instantly if the thread's execution already finished. # - class Future < Delegator + class Future < ::Delegator extend ::Forwardable # Initializes a future with a block and starts its execution. @@ -30,7 +30,7 @@ def initialize(pool = ::Futuroscope.default_pool, &block) @queue = ::SizedQueue.new(1) @pool = pool @block = block - @mutex = Mutex.new + @mutex = ::Mutex.new @pool.queue self end @@ -41,7 +41,7 @@ def run_future @queue.push(exception: e) end - # Semipublic: Returns the future's value. Will wait for the future to be + # Semipublic: Returns the future's value. Will wait for the future to be # completed or return its value otherwise. Can be called multiple times. # # Returns the Future's block execution result. @@ -70,7 +70,7 @@ def marshal_load value def resolved_future_value_or_raise resolved = resolved_future_value - Kernel.raise resolved[:exception] if resolved[:exception] + ::Kernel.raise resolved[:exception] if resolved[:exception] resolved end From e022587130928dab76498ceeb6bd214a74ec7d64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 00:03:40 +0200 Subject: [PATCH 02/18] add deadlock detection and smart up/down spinning --- lib/futuroscope.rb | 5 +- lib/futuroscope/deadlock_error.rb | 1 + lib/futuroscope/future.rb | 54 ++++++++++---- lib/futuroscope/pool.rb | 119 +++++++++++++++++++++++------- lib/futuroscope/worker.rb | 24 +++--- spec/futuroscope/worker_spec.rb | 9 --- 6 files changed, 147 insertions(+), 65 deletions(-) create mode 100644 lib/futuroscope/deadlock_error.rb diff --git a/lib/futuroscope.rb b/lib/futuroscope.rb index 8065bc6..73661dd 100644 --- a/lib/futuroscope.rb +++ b/lib/futuroscope.rb @@ -1,7 +1,8 @@ -require "futuroscope/version" -require "futuroscope/pool" +require "futuroscope/deadlock_error" require "futuroscope/future" require "futuroscope/map" +require "futuroscope/pool" +require "futuroscope/version" module Futuroscope # Gets the default futuroscope's pool. diff --git a/lib/futuroscope/deadlock_error.rb b/lib/futuroscope/deadlock_error.rb new file mode 100644 index 0000000..e9f72f6 --- /dev/null +++ b/lib/futuroscope/deadlock_error.rb @@ -0,0 +1 @@ +Futuroscope::DeadlockError = Class.new StandardError diff --git a/lib/futuroscope/future.rb b/lib/futuroscope/future.rb index be574d3..583480e 100644 --- a/lib/futuroscope/future.rb +++ b/lib/futuroscope/future.rb @@ -11,6 +11,12 @@ module Futuroscope class Future < ::Delegator extend ::Forwardable + attr_reader :worker_thread + + def worker_thread=(thread) + @mutex.synchronize { @worker_thread = thread } + end + # Initializes a future with a block and starts its execution. # # Examples: @@ -27,18 +33,29 @@ class Future < ::Delegator # # Returns a Future def initialize(pool = ::Futuroscope.default_pool, &block) - @queue = ::SizedQueue.new(1) + @worker_finished = ConditionVariable.new @pool = pool @block = block @mutex = ::Mutex.new - @pool.queue self + @worker_thread = nil + @pool.push self end # Semipublic: Forces this future to be run. - def run_future - @queue.push(value: @block.call) - rescue ::Exception => e - @queue.push(exception: e) + def resolve! + @mutex.synchronize do + begin + Thread.handle_interrupt(DeadlockError => :on_blocking) do + @resolved_future = { value: @block.call } + end + rescue ::Exception => e + @resolved_future = { exception: e } + ensure + @pool.done_with self + @worker_thread = nil + @worker_finished.broadcast + end + end end # Semipublic: Returns the future's value. Will wait for the future to be @@ -61,6 +78,10 @@ def marshal_load value @resolved_future = value end + def resolved? + instance_variable_defined? :@resolved_future + end + def_delegators :__getobj__, :class, :kind_of?, :is_a?, :nil? alias_method :future_value, :__getobj__ @@ -68,16 +89,23 @@ def marshal_load value private def resolved_future_value_or_raise - resolved = resolved_future_value - - ::Kernel.raise resolved[:exception] if resolved[:exception] - resolved + resolved_future.tap do |resolved| + ::Kernel.raise resolved[:exception] if resolved.has_key?(:exception) + end end - def resolved_future_value - @resolved_future || @mutex.synchronize do - @resolved_future ||= @queue.pop + def resolved_future + unless resolved? + @pool.depend self + wait_until_resolved end + @resolved_future + end + + def wait_until_resolved + @mutex.synchronize do + @worker_finished.wait(@mutex) unless resolved? + end unless resolved? end end end diff --git a/lib/futuroscope/pool.rb b/lib/futuroscope/pool.rb index 2c29775..ef23126 100644 --- a/lib/futuroscope/pool.rb +++ b/lib/futuroscope/pool.rb @@ -16,40 +16,54 @@ class Pool def initialize(range = 8..16) @min_workers = range.min @max_workers = range.max - @queue = Queue.new - @workers = Set.new - @mutex = Mutex.new + @dependencies = {} + @priorities = {} + @future_needs_worker = ConditionVariable.new + @workers = ::Set.new + @mutex = ::Mutex.new warm_up_workers end - # Public: Enqueues a new Future into the pool. + # Public: Pushes a Future into the worklist with low priority. # - # future - The Future to enqueue. - def queue(future) + # future - The Future to push. + def push(future) @mutex.synchronize do - spin_worker if can_spin_extra_workers? - @queue.push future + spin_worker if need_extra_worker? + @priorities[future.__id__] = 0 + @future_needs_worker.signal end end - # Public: Pops a new job from the pool. It will return nil if there's + # Public: Pops a new job from the pool. It will return nil if there's # enough workers in the pool to take care of it. # # Returns a Future def pop @mutex.synchronize do - return nil if @queue.empty? && more_workers_than_needed? + kill_worker = more_workers_than_needed? && @priorities.empty? + await_future(kill_worker ? 5 : nil) end - return @queue.pop end - # Private: Notifies that a worker just died so it can be removed from the - # pool. + # Public: Indicates that the current thread is waiting for a Future. # - # worker - A Worker - def worker_died(worker) + # dependee - The Future being waited for. + def depend(future) @mutex.synchronize do - @workers.delete(worker) + @dependencies[Thread.current] = future + handle_deadlocks + dependent_future_id = current_thread_future_id + incr = 1 + (dependent_future_id.nil? ? 0 : @priorities[dependent_future_id]) + increment_priority(future, incr) + end + end + + # Semipublic: Called by a worker to indicate that it finished resolving a future. + def done_with(future) + @mutex.synchronize do + @priorities.delete_if { |future_id, priority| future_id == future.__id__ } + @dependencies.delete_if { |dependent, dependee| dependee.__id__ == future.__id__ } end end @@ -62,32 +76,85 @@ def min_workers=(count) def warm_up_workers @mutex.synchronize do - while(@workers.length < @min_workers) do + while workers.length < @min_workers do spin_worker end end end - def can_spin_extra_workers? - @workers.length < @max_workers && span_chance + def need_extra_worker? + workers.length < max_workers && @priorities.length > workers.count(&:free) end - def span_chance - [true, false].sample + def more_workers_than_needed? + workers.length > min_workers && @priorities.length < workers.count(&:free) end - def more_workers_than_needed? - @workers.length > @min_workers + def finalize + workers.each { |worker| worker.thread.kill } end + # The below methods should only be called with @mutex already acquired. + # These are only extracted for readability purposes. + + def spin_worker worker = Worker.new(self) - @workers << worker + workers << worker worker.run end - def finalize - @workers.each(&:stop) + def find_cycle + chain = [Thread.current] + loop do + last_thread = chain.last + return nil unless @dependencies.has_key?(last_thread) + next_future = @dependencies[last_thread] + next_thread = next_future.worker_thread + return nil if next_thread.nil? + return chain if next_thread == chain.first + chain << next_thread + end + end + + def increment_priority(future, increment) + return nil if NilClass === future + @priorities[future.__id__] += increment + increment_priority(@dependencies[future.worker_thread], increment) + end + + def current_thread_future_id + @priorities.keys.find { |id| ObjectSpace._id2ref(id).worker_thread == Thread.current } + end + + def await_future(timeout) + until @priorities.any? { |future_id, priority| ObjectSpace._id2ref(future_id).worker_thread.nil? } + @future_needs_worker.wait(@mutex, timeout) + unless timeout.nil? || @priorities.any? { |future_id, priority| ObjectSpace._id2ref(future_id).worker_thread.nil? } + workers.delete_if { |worker| worker.thread == Thread.current } + return nil + end + end + future_id = @priorities.select { |future_id, priority| ObjectSpace._id2ref(future_id).worker_thread.nil? } + .max_by { |future_id, priority| priority }.first + future = ObjectSpace._id2ref(future_id) + future.worker_thread = Thread.current + future + end + + def handle_deadlocks + Thread.handle_interrupt(DeadlockError => :immediate) do + Thread.handle_interrupt(DeadlockError => :never) do + unless (cycle = find_cycle).nil? + cycle.each { |thread| thread.raise DeadlockError, "Cyclical dependency detected, the future was aborted." } + end + if workers.all? { |worker| @dependencies.has_key?(worker.thread) } && workers.count == max_workers + least_priority_future = ObjectSpace._id2ref(@priorities.min_by { |future_id, priority| priority }.first) + least_priority_future.worker_thread.raise DeadlockError, "Pool size is too low, the future was aborted." + end + end + end end + end end diff --git a/lib/futuroscope/worker.rb b/lib/futuroscope/worker.rb index b0eb2f5..65cf32c 100644 --- a/lib/futuroscope/worker.rb +++ b/lib/futuroscope/worker.rb @@ -2,11 +2,14 @@ module Futuroscope # A futuroscope worker takes care of resolving a future's value. It works # together with a Pool. class Worker + attr_reader :thread, :free + # Public: Initializes a new Worker. # # pool - The worker Pool it belongs to. def initialize(pool) @pool = pool + @free = true end # Runs the worker. It keeps asking the Pool for a new job. If the pool @@ -16,23 +19,14 @@ def initialize(pool) # def run @thread = Thread.new do - while(future = @pool.pop) do - future.run_future + Thread.handle_interrupt(DeadlockError => :never) do + while future = @pool.pop do + @free = false + future.resolve! + @free = true + end end - die end end - - # Public: Stops this worker. - def stop - @thread.kill - die - end - - private - - def die - @pool.worker_died(self) - end end end diff --git a/spec/futuroscope/worker_spec.rb b/spec/futuroscope/worker_spec.rb index cade905..d7380de 100644 --- a/spec/futuroscope/worker_spec.rb +++ b/spec/futuroscope/worker_spec.rb @@ -12,14 +12,5 @@ module Futuroscope Worker.new(pool).run sleep(1) end - - it "notifies the pool when the worker died because there's no job" do - pool = [] - worker = Worker.new(pool) - - expect(pool).to receive(:worker_died).with(worker) - worker.run - sleep(1) - end end end From 2f27666ea8bc62b79bd25f10338bb542c2cb320e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 01:16:34 +0200 Subject: [PATCH 03/18] add logging and minor fixes --- lib/futuroscope.rb | 13 +++++++++++++ lib/futuroscope/future.rb | 2 +- lib/futuroscope/pool.rb | 22 +++++++++++++++++----- 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/lib/futuroscope.rb b/lib/futuroscope.rb index 73661dd..7ca074e 100644 --- a/lib/futuroscope.rb +++ b/lib/futuroscope.rb @@ -21,4 +21,17 @@ def self.default_pool def self.default_pool=(pool) @default_pool = pool end + + # Gets the current loggers. Add objects to it that have the below methods defined to log on them. + # For example, instances of Ruby's core Logger will work. + def self.loggers + @loggers ||= [] + end + + # Inward facing methods, called whenever a component wants to log something to the loggers. + [:debug, :info, :warn, :error, :fatal].each do |log_method| + define_singleton_method(log_method) do |message| + loggers.each { |logger| logger.send(log_method, message) } + end + end end diff --git a/lib/futuroscope/future.rb b/lib/futuroscope/future.rb index 583480e..217debc 100644 --- a/lib/futuroscope/future.rb +++ b/lib/futuroscope/future.rb @@ -45,7 +45,7 @@ def initialize(pool = ::Futuroscope.default_pool, &block) def resolve! @mutex.synchronize do begin - Thread.handle_interrupt(DeadlockError => :on_blocking) do + Thread.handle_interrupt(DeadlockError => :immediate) do @resolved_future = { value: @block.call } end rescue ::Exception => e diff --git a/lib/futuroscope/pool.rb b/lib/futuroscope/pool.rb index ef23126..db082cf 100644 --- a/lib/futuroscope/pool.rb +++ b/lib/futuroscope/pool.rb @@ -29,8 +29,10 @@ def initialize(range = 8..16) # future - The Future to push. def push(future) @mutex.synchronize do + Futuroscope.info "PUSH: added future #{future.__id__}" spin_worker if need_extra_worker? @priorities[future.__id__] = 0 + Futuroscope.info " sending signal to wake up a thread. the priorities are: #{@priorities.map { |k, v| ["future #{(k)}", v] }.to_h}" @future_needs_worker.signal end end @@ -48,10 +50,12 @@ def pop # Public: Indicates that the current thread is waiting for a Future. # - # dependee - The Future being waited for. + # future - The Future being waited for. def depend(future) @mutex.synchronize do + Futuroscope.info "DEPEND: thread #{Thread.current.__id__} depends on future #{future.__id__}" @dependencies[Thread.current] = future + Futuroscope.info " the current dependencies are: #{@dependencies.map { |k, v| ["thread #{k.__id__}", "future #{v.__id__}"] }.to_h}" handle_deadlocks dependent_future_id = current_thread_future_id incr = 1 + (dependent_future_id.nil? ? 0 : @priorities[dependent_future_id]) @@ -62,8 +66,9 @@ def depend(future) # Semipublic: Called by a worker to indicate that it finished resolving a future. def done_with(future) @mutex.synchronize do - @priorities.delete_if { |future_id, priority| future_id == future.__id__ } - @dependencies.delete_if { |dependent, dependee| dependee.__id__ == future.__id__ } + Futuroscope.info "DONE: thread #{Thread.current.__id__} is done with future #{future.__id__}" + @priorities.delete_if { |future_id, priority| Futuroscope.info " deleting future #{future_id}" if future_id == future.__id__; future_id == future.__id__ } + @dependencies.delete_if { |dependent, dependee| Futuroscope.info " deleting dependency from thread #{dependent.__id__} to future #{dependee.__id__}" if dependee.__id__ == future.__id__; dependee.__id__ == future.__id__ } end end @@ -102,6 +107,7 @@ def spin_worker worker = Worker.new(self) workers << worker worker.run + Futuroscope.info " spun up worker with thread #{worker.thread.__id__}" end def find_cycle @@ -119,6 +125,7 @@ def find_cycle def increment_priority(future, increment) return nil if NilClass === future + Futuroscope.info " incrementing priority for future #{future.__id__}" @priorities[future.__id__] += increment increment_priority(@dependencies[future.worker_thread], increment) end @@ -129,14 +136,19 @@ def current_thread_future_id def await_future(timeout) until @priorities.any? { |future_id, priority| ObjectSpace._id2ref(future_id).worker_thread.nil? } + Futuroscope.info "POP: thread #{Thread.current.__id__} going to sleep until there's something to do#{timeout && " or #{timeout} seconds"}..." @future_needs_worker.wait(@mutex, timeout) + Futuroscope.info "POP: ... thread #{Thread.current.__id__} woke up, the priorities are: #{@priorities.map { |k, v| ["future #{(k)}", v] }.to_h}" + Futuroscope.info " current future workers are: #{@priorities.map { |k, v| ["future #{(k)}", (thread = ObjectSpace._id2ref(k).worker_thread; thread.nil? ? nil : "thread #{thread.__id__}")] }.to_h}" unless timeout.nil? || @priorities.any? { |future_id, priority| ObjectSpace._id2ref(future_id).worker_thread.nil? } + Futuroscope.info " thread #{Thread.current.__id__} is dying because there's nothing to do" workers.delete_if { |worker| worker.thread == Thread.current } return nil end end future_id = @priorities.select { |future_id, priority| ObjectSpace._id2ref(future_id).worker_thread.nil? } .max_by { |future_id, priority| priority }.first + Futuroscope.info "POP: thread #{Thread.current.__id__} will start working on future #{future_id}" future = ObjectSpace._id2ref(future_id) future.worker_thread = Thread.current future @@ -149,8 +161,8 @@ def handle_deadlocks cycle.each { |thread| thread.raise DeadlockError, "Cyclical dependency detected, the future was aborted." } end if workers.all? { |worker| @dependencies.has_key?(worker.thread) } && workers.count == max_workers - least_priority_future = ObjectSpace._id2ref(@priorities.min_by { |future_id, priority| priority }.first) - least_priority_future.worker_thread.raise DeadlockError, "Pool size is too low, the future was aborted." + least_priority_future_id = @priorities.sort_by(&:last).map(&:first).find { |future_id| !ObjectSpace._id2ref(future_id).worker_thread.nil? } + ObjectSpace._id2ref(least_priority_future_id).worker_thread.raise DeadlockError, "Pool size is too low, the future was aborted." end end end From fc77ddc8d5be2fa33ace2b64f2eb4ed6bf92b7a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 12:55:04 +0200 Subject: [PATCH 04/18] fix cycleless deadlocks --- lib/futuroscope/pool.rb | 45 ++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/lib/futuroscope/pool.rb b/lib/futuroscope/pool.rb index db082cf..354f86f 100644 --- a/lib/futuroscope/pool.rb +++ b/lib/futuroscope/pool.rb @@ -110,19 +110,6 @@ def spin_worker Futuroscope.info " spun up worker with thread #{worker.thread.__id__}" end - def find_cycle - chain = [Thread.current] - loop do - last_thread = chain.last - return nil unless @dependencies.has_key?(last_thread) - next_future = @dependencies[last_thread] - next_thread = next_future.worker_thread - return nil if next_thread.nil? - return chain if next_thread == chain.first - chain << next_thread - end - end - def increment_priority(future, increment) return nil if NilClass === future Futuroscope.info " incrementing priority for future #{future.__id__}" @@ -158,15 +145,41 @@ def handle_deadlocks Thread.handle_interrupt(DeadlockError => :immediate) do Thread.handle_interrupt(DeadlockError => :never) do unless (cycle = find_cycle).nil? + Futuroscope.info " deadlock! cyclical dependency, sending interrupt to all threads involved" cycle.each { |thread| thread.raise DeadlockError, "Cyclical dependency detected, the future was aborted." } end - if workers.all? { |worker| @dependencies.has_key?(worker.thread) } && workers.count == max_workers - least_priority_future_id = @priorities.sort_by(&:last).map(&:first).find { |future_id| !ObjectSpace._id2ref(future_id).worker_thread.nil? } - ObjectSpace._id2ref(least_priority_future_id).worker_thread.raise DeadlockError, "Pool size is too low, the future was aborted." + if cycleless_deadlock? + thread_to_interrupt = least_priority_independent_thread + Futuroscope.info " deadlock! ran out of workers, sending interrupt to thread #{thread_to_interrupt.__id__}" + thread_to_interrupt.raise DeadlockError, "Pool size is too low, the future was aborted." end end end end + def find_cycle + chain = [Thread.current] + loop do + last_thread = chain.last + return nil unless @dependencies.has_key?(last_thread) + next_future = @dependencies[last_thread] + next_thread = next_future.worker_thread + return nil if next_thread.nil? + return chain if next_thread == chain.first + chain << next_thread + end + end + + def cycleless_deadlock? + workers.all? { |worker| @dependencies.has_key?(worker.thread) } && workers.count == max_workers + end + + def least_priority_independent_thread + @priorities.sort_by(&:last).map(&:first).each do |future_id| + its_thread = ObjectSpace._id2ref(future_id).worker_thread + return its_thread if !its_thread.nil? && @dependencies[its_thread].worker_thread.nil? + end + end + end end From 072de6f7aafca6f505625cfce68b89858b23f99d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 12:55:28 +0200 Subject: [PATCH 05/18] add whitespaces --- lib/futuroscope/pool.rb | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/lib/futuroscope/pool.rb b/lib/futuroscope/pool.rb index 354f86f..c034e1b 100644 --- a/lib/futuroscope/pool.rb +++ b/lib/futuroscope/pool.rb @@ -24,6 +24,7 @@ def initialize(range = 8..16) warm_up_workers end + # Public: Pushes a Future into the worklist with low priority. # # future - The Future to push. @@ -37,6 +38,7 @@ def push(future) end end + # Public: Pops a new job from the pool. It will return nil if there's # enough workers in the pool to take care of it. # @@ -48,6 +50,7 @@ def pop end end + # Public: Indicates that the current thread is waiting for a Future. # # future - The Future being waited for. @@ -63,6 +66,7 @@ def depend(future) end end + # Semipublic: Called by a worker to indicate that it finished resolving a future. def done_with(future) @mutex.synchronize do @@ -72,11 +76,13 @@ def done_with(future) end end + def min_workers=(count) @min_workers = count warm_up_workers end + private def warm_up_workers @@ -87,18 +93,22 @@ def warm_up_workers end end + def need_extra_worker? workers.length < max_workers && @priorities.length > workers.count(&:free) end + def more_workers_than_needed? workers.length > min_workers && @priorities.length < workers.count(&:free) end + def finalize workers.each { |worker| worker.thread.kill } end + # The below methods should only be called with @mutex already acquired. # These are only extracted for readability purposes. @@ -110,6 +120,7 @@ def spin_worker Futuroscope.info " spun up worker with thread #{worker.thread.__id__}" end + def increment_priority(future, increment) return nil if NilClass === future Futuroscope.info " incrementing priority for future #{future.__id__}" @@ -117,10 +128,12 @@ def increment_priority(future, increment) increment_priority(@dependencies[future.worker_thread], increment) end + def current_thread_future_id @priorities.keys.find { |id| ObjectSpace._id2ref(id).worker_thread == Thread.current } end + def await_future(timeout) until @priorities.any? { |future_id, priority| ObjectSpace._id2ref(future_id).worker_thread.nil? } Futuroscope.info "POP: thread #{Thread.current.__id__} going to sleep until there's something to do#{timeout && " or #{timeout} seconds"}..." @@ -141,6 +154,7 @@ def await_future(timeout) future end + def handle_deadlocks Thread.handle_interrupt(DeadlockError => :immediate) do Thread.handle_interrupt(DeadlockError => :never) do @@ -157,6 +171,7 @@ def handle_deadlocks end end + def find_cycle chain = [Thread.current] loop do @@ -170,10 +185,12 @@ def find_cycle end end + def cycleless_deadlock? workers.all? { |worker| @dependencies.has_key?(worker.thread) } && workers.count == max_workers end + def least_priority_independent_thread @priorities.sort_by(&:last).map(&:first).each do |future_id| its_thread = ObjectSpace._id2ref(future_id).worker_thread From 9a61cef614b07398ba09ddfd28066baceb7608a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 13:09:30 +0200 Subject: [PATCH 06/18] fix log levels --- lib/futuroscope/pool.rb | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/lib/futuroscope/pool.rb b/lib/futuroscope/pool.rb index c034e1b..fc92892 100644 --- a/lib/futuroscope/pool.rb +++ b/lib/futuroscope/pool.rb @@ -33,7 +33,8 @@ def push(future) Futuroscope.info "PUSH: added future #{future.__id__}" spin_worker if need_extra_worker? @priorities[future.__id__] = 0 - Futuroscope.info " sending signal to wake up a thread. the priorities are: #{@priorities.map { |k, v| ["future #{(k)}", v] }.to_h}" + Futuroscope.info " sending signal to wake up a thread" + Futuroscope.debug " current priorities: #{@priorities.map { |k, v| ["future #{(k)}", v] }.to_h}" @future_needs_worker.signal end end @@ -58,7 +59,7 @@ def depend(future) @mutex.synchronize do Futuroscope.info "DEPEND: thread #{Thread.current.__id__} depends on future #{future.__id__}" @dependencies[Thread.current] = future - Futuroscope.info " the current dependencies are: #{@dependencies.map { |k, v| ["thread #{k.__id__}", "future #{v.__id__}"] }.to_h}" + Futuroscope.debug " current dependencies: #{@dependencies.map { |k, v| ["thread #{k.__id__}", "future #{v.__id__}"] }.to_h}" handle_deadlocks dependent_future_id = current_thread_future_id incr = 1 + (dependent_future_id.nil? ? 0 : @priorities[dependent_future_id]) @@ -138,8 +139,9 @@ def await_future(timeout) until @priorities.any? { |future_id, priority| ObjectSpace._id2ref(future_id).worker_thread.nil? } Futuroscope.info "POP: thread #{Thread.current.__id__} going to sleep until there's something to do#{timeout && " or #{timeout} seconds"}..." @future_needs_worker.wait(@mutex, timeout) - Futuroscope.info "POP: ... thread #{Thread.current.__id__} woke up, the priorities are: #{@priorities.map { |k, v| ["future #{(k)}", v] }.to_h}" - Futuroscope.info " current future workers are: #{@priorities.map { |k, v| ["future #{(k)}", (thread = ObjectSpace._id2ref(k).worker_thread; thread.nil? ? nil : "thread #{thread.__id__}")] }.to_h}" + Futuroscope.info "POP: ... thread #{Thread.current.__id__} woke up" + Futuroscope.debug " current priorities: #{@priorities.map { |k, v| ["future #{(k)}", v] }.to_h}" + Futuroscope.debug " current future workers: #{@priorities.map { |k, v| ["future #{(k)}", (thread = ObjectSpace._id2ref(k).worker_thread; thread.nil? ? nil : "thread #{thread.__id__}")] }.to_h}" unless timeout.nil? || @priorities.any? { |future_id, priority| ObjectSpace._id2ref(future_id).worker_thread.nil? } Futuroscope.info " thread #{Thread.current.__id__} is dying because there's nothing to do" workers.delete_if { |worker| worker.thread == Thread.current } @@ -159,12 +161,12 @@ def handle_deadlocks Thread.handle_interrupt(DeadlockError => :immediate) do Thread.handle_interrupt(DeadlockError => :never) do unless (cycle = find_cycle).nil? - Futuroscope.info " deadlock! cyclical dependency, sending interrupt to all threads involved" + Futuroscope.error " deadlock! cyclical dependency, sending interrupt to all threads involved" cycle.each { |thread| thread.raise DeadlockError, "Cyclical dependency detected, the future was aborted." } end if cycleless_deadlock? thread_to_interrupt = least_priority_independent_thread - Futuroscope.info " deadlock! ran out of workers, sending interrupt to thread #{thread_to_interrupt.__id__}" + Futuroscope.error " deadlock! ran out of workers, sending interrupt to thread #{thread_to_interrupt.__id__}" thread_to_interrupt.raise DeadlockError, "Pool size is too low, the future was aborted." end end From 161eaa25caa41f8d4c18de66bf6d42b9169a0496 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 13:15:51 +0200 Subject: [PATCH 07/18] remove unnecessary require --- lib/futuroscope/pool.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/futuroscope/pool.rb b/lib/futuroscope/pool.rb index fc92892..359c9c9 100644 --- a/lib/futuroscope/pool.rb +++ b/lib/futuroscope/pool.rb @@ -1,5 +1,4 @@ require 'set' -require 'thread' require 'futuroscope/worker' module Futuroscope From df61e688bfd808ef5f013cc1cde0678bd095e9bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 13:19:30 +0200 Subject: [PATCH 08/18] remove unneeded requires --- .rspec | 3 ++- spec/futuroscope/convenience_spec.rb | 1 - spec/futuroscope/future_spec.rb | 2 -- spec/futuroscope/map_spec.rb | 5 +---- spec/futuroscope/pool_spec.rb | 3 --- spec/futuroscope/worker_spec.rb | 6 +----- spec/futuroscope_spec.rb | 3 --- spec/spec_helper.rb | 1 - 8 files changed, 4 insertions(+), 20 deletions(-) diff --git a/.rspec b/.rspec index b782d24..ec13759 100644 --- a/.rspec +++ b/.rspec @@ -1 +1,2 @@ ---color --format documentation \ No newline at end of file +--color --format documentation +--require spec_helper diff --git a/spec/futuroscope/convenience_spec.rb b/spec/futuroscope/convenience_spec.rb index 276f0be..36b034e 100644 --- a/spec/futuroscope/convenience_spec.rb +++ b/spec/futuroscope/convenience_spec.rb @@ -1,4 +1,3 @@ -require 'spec_helper' require 'futuroscope/convenience' require 'timeout' diff --git a/spec/futuroscope/future_spec.rb b/spec/futuroscope/future_spec.rb index b2c8f93..7c15c1b 100644 --- a/spec/futuroscope/future_spec.rb +++ b/spec/futuroscope/future_spec.rb @@ -1,5 +1,3 @@ -require 'spec_helper' -require 'futuroscope/future' require 'timeout' module Futuroscope diff --git a/spec/futuroscope/map_spec.rb b/spec/futuroscope/map_spec.rb index 3ba0104..434ed78 100644 --- a/spec/futuroscope/map_spec.rb +++ b/spec/futuroscope/map_spec.rb @@ -1,6 +1,3 @@ -require 'spec_helper' -require 'futuroscope/map' - module Futuroscope describe Map do it "behaves like a normal map" do @@ -9,7 +6,7 @@ module Futuroscope sleep(item) "Item #{item}" end - + Timeout::timeout(4) do expect(result.first).to eq("Item 1") expect(result[1]).to eq("Item 2") diff --git a/spec/futuroscope/pool_spec.rb b/spec/futuroscope/pool_spec.rb index 7d13f16..c963a7b 100644 --- a/spec/futuroscope/pool_spec.rb +++ b/spec/futuroscope/pool_spec.rb @@ -1,6 +1,3 @@ -require 'spec_helper' -require 'futuroscope/pool' - module Futuroscope describe Pool do it "spins up a number of workers" do diff --git a/spec/futuroscope/worker_spec.rb b/spec/futuroscope/worker_spec.rb index d7380de..aa996b3 100644 --- a/spec/futuroscope/worker_spec.rb +++ b/spec/futuroscope/worker_spec.rb @@ -1,7 +1,3 @@ -require 'spec_helper' -require 'futuroscope/worker' -require 'futuroscope/pool' - module Futuroscope describe Worker do it "asks the pool for a new job and runs the future" do @@ -9,7 +5,7 @@ module Futuroscope pool = [future] expect(future).to receive :run_future - Worker.new(pool).run + described_class.new(pool).run sleep(1) end end diff --git a/spec/futuroscope_spec.rb b/spec/futuroscope_spec.rb index 2e3da06..15a2f3a 100644 --- a/spec/futuroscope_spec.rb +++ b/spec/futuroscope_spec.rb @@ -1,6 +1,3 @@ -require 'spec_helper' -require 'futuroscope' - describe Futuroscope do describe "default_pool" do it "returns a pool by default" do diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 82b918d..55af872 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -4,4 +4,3 @@ end require 'rspec/collection_matchers' -require 'futuroscope' From 4c08c510c5a04b4686e31b366b0615e9b2d73eb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 13:23:16 +0200 Subject: [PATCH 09/18] rename run_future to resolve! in spec --- spec/futuroscope/worker_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/futuroscope/worker_spec.rb b/spec/futuroscope/worker_spec.rb index aa996b3..5235594 100644 --- a/spec/futuroscope/worker_spec.rb +++ b/spec/futuroscope/worker_spec.rb @@ -3,7 +3,7 @@ module Futuroscope it "asks the pool for a new job and runs the future" do future = double(:future) pool = [future] - expect(future).to receive :run_future + expect(future).to receive :resolve! described_class.new(pool).run sleep(1) From 22fca86f19065435189ce2c594f970aee02ba481 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 13:28:34 +0200 Subject: [PATCH 10/18] add back require --- spec/spec_helper.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 55af872..82b918d 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -4,3 +4,4 @@ end require 'rspec/collection_matchers' +require 'futuroscope' From a16a000c9b9ff9b7676712ead7f824cdb3c1ad60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 13:32:21 +0200 Subject: [PATCH 11/18] remove workers from the set on finalize --- lib/futuroscope/pool.rb | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/futuroscope/pool.rb b/lib/futuroscope/pool.rb index 359c9c9..2b1002c 100644 --- a/lib/futuroscope/pool.rb +++ b/lib/futuroscope/pool.rb @@ -105,7 +105,10 @@ def more_workers_than_needed? def finalize - workers.each { |worker| worker.thread.kill } + workers.each do |worker| + workers.delete worker + worker.thread.kill + end end From fa997fb8d73b5503dc481c52438d40cf3f28e78b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 13:32:38 +0200 Subject: [PATCH 12/18] change worker linger time to 2 sec --- lib/futuroscope/pool.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/futuroscope/pool.rb b/lib/futuroscope/pool.rb index 2b1002c..01e8b7b 100644 --- a/lib/futuroscope/pool.rb +++ b/lib/futuroscope/pool.rb @@ -46,7 +46,7 @@ def push(future) def pop @mutex.synchronize do kill_worker = more_workers_than_needed? && @priorities.empty? - await_future(kill_worker ? 5 : nil) + await_future(kill_worker ? 2 : nil) end end From ccc19404c9c05938f8620dc9aacc9647be1ea987 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 14:20:40 +0200 Subject: [PATCH 13/18] keep references to futures to avoid GC --- lib/futuroscope/pool.rb | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/lib/futuroscope/pool.rb b/lib/futuroscope/pool.rb index 01e8b7b..0f9b560 100644 --- a/lib/futuroscope/pool.rb +++ b/lib/futuroscope/pool.rb @@ -21,6 +21,12 @@ def initialize(range = 8..16) @workers = ::Set.new @mutex = ::Mutex.new warm_up_workers + + # We need to keep references to the futures to prevent them from being GC'd. + # However, they can't be the keys of @priorities, because Hash will call #hash on them, which is forwarded to the + # wrapped object, causing a deadlock. Not forwarding is not an option, because then to the outside world + # futures won't be transparent: hsh[:key] will not be the same as hsh[future { :key }]. + @futures = {} end @@ -30,10 +36,11 @@ def initialize(range = 8..16) def push(future) @mutex.synchronize do Futuroscope.info "PUSH: added future #{future.__id__}" - spin_worker if need_extra_worker? @priorities[future.__id__] = 0 + @futures[future.__id__] = future + spin_worker if need_extra_worker? Futuroscope.info " sending signal to wake up a thread" - Futuroscope.debug " current priorities: #{@priorities.map { |k, v| ["future #{(k)}", v] }.to_h}" + Futuroscope.debug " current priorities: #{@priorities.map { |k, v| ["future #{k}", v] }.to_h}" @future_needs_worker.signal end end @@ -71,8 +78,14 @@ def depend(future) def done_with(future) @mutex.synchronize do Futuroscope.info "DONE: thread #{Thread.current.__id__} is done with future #{future.__id__}" - @priorities.delete_if { |future_id, priority| Futuroscope.info " deleting future #{future_id}" if future_id == future.__id__; future_id == future.__id__ } - @dependencies.delete_if { |dependent, dependee| Futuroscope.info " deleting dependency from thread #{dependent.__id__} to future #{dependee.__id__}" if dependee.__id__ == future.__id__; dependee.__id__ == future.__id__ } + Futuroscope.info " deleting future #{future_id} from the task list" + @futures.delete future.__id__ + @priorities.delete future.__id__ + dependencies_to_delete = @dependencies.select { |dependent, dependee| dependee.__id__ == future.__id__ } + dependencies_to_delete.each do |dependent, dependee| + Futuroscope.info " deleting dependency from thread #{dependent.__id__} to future #{dependee.__id__}" + @dependencies.delete dependent + end end end @@ -133,27 +146,26 @@ def increment_priority(future, increment) def current_thread_future_id - @priorities.keys.find { |id| ObjectSpace._id2ref(id).worker_thread == Thread.current } + @priorities.keys.find { |id| @futures[id].worker_thread == Thread.current } end def await_future(timeout) - until @priorities.any? { |future_id, priority| ObjectSpace._id2ref(future_id).worker_thread.nil? } + until @priorities.any? { |future_id, priority| @futures[future_id].worker_thread.nil? } Futuroscope.info "POP: thread #{Thread.current.__id__} going to sleep until there's something to do#{timeout && " or #{timeout} seconds"}..." @future_needs_worker.wait(@mutex, timeout) Futuroscope.info "POP: ... thread #{Thread.current.__id__} woke up" - Futuroscope.debug " current priorities: #{@priorities.map { |k, v| ["future #{(k)}", v] }.to_h}" - Futuroscope.debug " current future workers: #{@priorities.map { |k, v| ["future #{(k)}", (thread = ObjectSpace._id2ref(k).worker_thread; thread.nil? ? nil : "thread #{thread.__id__}")] }.to_h}" - unless timeout.nil? || @priorities.any? { |future_id, priority| ObjectSpace._id2ref(future_id).worker_thread.nil? } + Futuroscope.debug " current priorities: #{@priorities.map { |k, v| ["future #{k}", v] }.to_h}" + Futuroscope.debug " current future workers: #{@priorities.map { |k, v| ["future #{k}", (thread = @futures[k].worker_thread; thread.nil? ? nil : "thread #{thread.__id__}")] }.to_h}" + unless timeout.nil? || @priorities.any? { |future_id, priority| @futures[future_id].worker_thread.nil? } Futuroscope.info " thread #{Thread.current.__id__} is dying because there's nothing to do" workers.delete_if { |worker| worker.thread == Thread.current } return nil end end - future_id = @priorities.select { |future_id, priority| ObjectSpace._id2ref(future_id).worker_thread.nil? } - .max_by { |future_id, priority| priority }.first + future_id = @priorities.select { |future_id, priority| @futures[future_id].worker_thread.nil? }.max_by { |future_id, priority| priority }.first Futuroscope.info "POP: thread #{Thread.current.__id__} will start working on future #{future_id}" - future = ObjectSpace._id2ref(future_id) + future = @futures[future_id] future.worker_thread = Thread.current future end @@ -197,7 +209,7 @@ def cycleless_deadlock? def least_priority_independent_thread @priorities.sort_by(&:last).map(&:first).each do |future_id| - its_thread = ObjectSpace._id2ref(future_id).worker_thread + its_thread = @futures[future_id].worker_thread return its_thread if !its_thread.nil? && @dependencies[its_thread].worker_thread.nil? end end From 3aafd5ef5efd9ba18f8500450d3e38b19dffda37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 15:17:27 +0200 Subject: [PATCH 14/18] fix spinning down to minimum --- lib/futuroscope/pool.rb | 37 +++++++++++++++++++---------------- spec/futuroscope/pool_spec.rb | 22 ++++++--------------- 2 files changed, 26 insertions(+), 33 deletions(-) diff --git a/lib/futuroscope/pool.rb b/lib/futuroscope/pool.rb index 0f9b560..33f3cef 100644 --- a/lib/futuroscope/pool.rb +++ b/lib/futuroscope/pool.rb @@ -51,10 +51,7 @@ def push(future) # # Returns a Future def pop - @mutex.synchronize do - kill_worker = more_workers_than_needed? && @priorities.empty? - await_future(kill_worker ? 2 : nil) - end + @mutex.synchronize { await_future(more_workers_than_needed? ? 2 : nil) } end @@ -78,7 +75,7 @@ def depend(future) def done_with(future) @mutex.synchronize do Futuroscope.info "DONE: thread #{Thread.current.__id__} is done with future #{future.__id__}" - Futuroscope.info " deleting future #{future_id} from the task list" + Futuroscope.info " deleting future #{future.__id__} from the task list" @futures.delete future.__id__ @priorities.delete future.__id__ dependencies_to_delete = @dependencies.select { |dependent, dependee| dependee.__id__ == future.__id__ } @@ -107,16 +104,6 @@ def warm_up_workers end - def need_extra_worker? - workers.length < max_workers && @priorities.length > workers.count(&:free) - end - - - def more_workers_than_needed? - workers.length > min_workers && @priorities.length < workers.count(&:free) - end - - def finalize workers.each do |worker| workers.delete worker @@ -157,11 +144,12 @@ def await_future(timeout) Futuroscope.info "POP: ... thread #{Thread.current.__id__} woke up" Futuroscope.debug " current priorities: #{@priorities.map { |k, v| ["future #{k}", v] }.to_h}" Futuroscope.debug " current future workers: #{@priorities.map { |k, v| ["future #{k}", (thread = @futures[k].worker_thread; thread.nil? ? nil : "thread #{thread.__id__}")] }.to_h}" - unless timeout.nil? || @priorities.any? { |future_id, priority| @futures[future_id].worker_thread.nil? } - Futuroscope.info " thread #{Thread.current.__id__} is dying because there's nothing to do" + if more_workers_than_needed? && !@priorities.any? { |future_id, priority| @futures[future_id].worker_thread.nil? } + Futuroscope.info " thread #{Thread.current.__id__} is dying because there's nothing to do and there are more threads than the minimum" workers.delete_if { |worker| worker.thread == Thread.current } return nil end + timeout = nil end future_id = @priorities.select { |future_id, priority| @futures[future_id].worker_thread.nil? }.max_by { |future_id, priority| priority }.first Futuroscope.info "POP: thread #{Thread.current.__id__} will start working on future #{future_id}" @@ -214,5 +202,20 @@ def least_priority_independent_thread end end + + def need_extra_worker? + workers.count < max_workers && futures_needing_worker.count > workers.count(&:free) + end + + + def more_workers_than_needed? + workers.count > min_workers && futures_needing_worker.count < workers.count(&:free) + end + + + def futures_needing_worker + @futures.values.select { |future| future.worker_thread.nil? } + end + end end diff --git a/spec/futuroscope/pool_spec.rb b/spec/futuroscope/pool_spec.rb index c963a7b..e2bda4a 100644 --- a/spec/futuroscope/pool_spec.rb +++ b/spec/futuroscope/pool_spec.rb @@ -8,13 +8,13 @@ module Futuroscope expect(pool.workers).to have(3).workers end - describe "queue" do + describe "push" do it "enqueues a job and runs it" do pool = Pool.new - future = double(:future) + future = Struct.new(:worker_thread).new(nil) - expect(future).to receive :run_future - pool.queue future + expect(future).to receive :resolve! + pool.push future sleep(0.1) end end @@ -22,15 +22,14 @@ module Futuroscope describe "worker control" do it "adds more workers when needed and returns to the default amount" do pool = Pool.new(2..8) - allow(pool).to receive(:span_chance).and_return true - 10.times do |future| + 10.times do Future.new(pool){ sleep(1) } end sleep(0.5) expect(pool.workers).to have(8).workers - sleep(1.5) + sleep(3) expect(pool.workers).to have(2).workers end @@ -63,14 +62,5 @@ module Futuroscope expect(pool.workers).to have(0).workers end end - - describe "#span_chance" do - it "returns true or false randomly" do - pool = Pool.new - chance = pool.send(:span_chance) - - expect([true, false]).to include(chance) - end - end end end From 62fb3f180495fa60368911ffcea350082ef40b97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 15:38:16 +0200 Subject: [PATCH 15/18] adjust accessor strategy for min_workers --- lib/futuroscope/pool.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/futuroscope/pool.rb b/lib/futuroscope/pool.rb index 33f3cef..3ac1b3a 100644 --- a/lib/futuroscope/pool.rb +++ b/lib/futuroscope/pool.rb @@ -6,8 +6,8 @@ module Futuroscope # certain benefits. Moreover, we warm up the threads beforehand so we don't # have to spin them up each time a future is created. class Pool - attr_reader :workers - attr_accessor :min_workers, :max_workers + attr_reader :workers, :min_workers + attr_accessor :max_workers # Public: Initializes a new Pool. # @@ -97,7 +97,7 @@ def min_workers=(count) def warm_up_workers @mutex.synchronize do - while workers.length < @min_workers do + while workers.length < min_workers do spin_worker end end From 20cf980d238fe3ea594f8adbbcb4b9f30ecec1c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 15:39:00 +0200 Subject: [PATCH 16/18] add specs for deadlock situations --- spec/futuroscope/pool_spec.rb | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/spec/futuroscope/pool_spec.rb b/spec/futuroscope/pool_spec.rb index e2bda4a..c0d24f4 100644 --- a/spec/futuroscope/pool_spec.rb +++ b/spec/futuroscope/pool_spec.rb @@ -19,6 +19,24 @@ module Futuroscope end end + describe "depend" do + it "detects cyclical dependencies" do + pool = Pool.new(2..2) + f2 = nil + f1 = Future.new(pool) { f2 = Future.new(pool) { f1.future_value }; f2.future_value } + + expect { f1.future_value }.to raise_error Futuroscope::DeadlockError, /Cyclical dependency detected/ + expect { f2.future_value }.to raise_error Futuroscope::DeadlockError, /Cyclical dependency detected/ + end + + it "detects non-cyclical deadlocks (when the pool is full and all futures are waiting for another future)" do + pool = Pool.new(1..1) + f = Future.new(pool) { Future.new(pool) { 1 } + 1 } + + expect { f.future_value }.to raise_error Futuroscope::DeadlockError, /Pool size is too low/ + end + end + describe "worker control" do it "adds more workers when needed and returns to the default amount" do pool = Pool.new(2..8) From e323b220a63e0e68e615650e41fcf0474611d425 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 15:48:06 +0200 Subject: [PATCH 17/18] add logging spec --- spec/futuroscope_spec.rb | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/spec/futuroscope_spec.rb b/spec/futuroscope_spec.rb index 15a2f3a..78e1534 100644 --- a/spec/futuroscope_spec.rb +++ b/spec/futuroscope_spec.rb @@ -12,4 +12,21 @@ expect(Futuroscope.default_pool).to equal(pool) end end + + describe "logging" do + it "logs messages to all the given loggers" do + logger1 = double "Logger 1" + logger2 = double "Logger 2" + Futuroscope.loggers << logger1 << logger2 + + expect(logger1).to receive(:info).at_least(33).times + expect(logger2).to receive(:info).at_least(33).times + + expect(logger1).to receive(:debug).at_least(7).times + expect(logger2).to receive(:debug).at_least(7).times + + Futuroscope::Future.new { Futuroscope::Future.new { 1 } + 1 } + sleep(0.1) + end + end end From 3326db380d502fbb9122f0c6d790d221ea8f1bf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferenczy=20P=C3=A9ter?= Date: Mon, 29 Sep 2014 16:49:59 +0200 Subject: [PATCH 18/18] only raise for one type of deadlock --- lib/futuroscope/pool.rb | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/futuroscope/pool.rb b/lib/futuroscope/pool.rb index 3ac1b3a..9b92435 100644 --- a/lib/futuroscope/pool.rb +++ b/lib/futuroscope/pool.rb @@ -162,11 +162,10 @@ def await_future(timeout) def handle_deadlocks Thread.handle_interrupt(DeadlockError => :immediate) do Thread.handle_interrupt(DeadlockError => :never) do - unless (cycle = find_cycle).nil? + if !(cycle = find_cycle).nil? Futuroscope.error " deadlock! cyclical dependency, sending interrupt to all threads involved" cycle.each { |thread| thread.raise DeadlockError, "Cyclical dependency detected, the future was aborted." } - end - if cycleless_deadlock? + elsif cycleless_deadlock? thread_to_interrupt = least_priority_independent_thread Futuroscope.error " deadlock! ran out of workers, sending interrupt to thread #{thread_to_interrupt.__id__}" thread_to_interrupt.raise DeadlockError, "Pool size is too low, the future was aborted."