diff --git a/lib/listen.rb b/lib/listen.rb index 61901ff1..f21d48e2 100644 --- a/lib/listen.rb +++ b/lib/listen.rb @@ -3,8 +3,6 @@ require 'listen/logger' require 'listen/listener' -require 'listen/internals/thread_pool' - # Always set up logging by default first time file is required # # NOTE: If you need to clear the logger completely, do so *after* @@ -43,8 +41,6 @@ def to(*args, &block) # This is used by the `listen` binary to handle Ctrl-C # def stop - Internals::ThreadPool.stop - while (listener = @listeners.deq(true)) begin listener.stop diff --git a/lib/listen/adapter/base.rb b/lib/listen/adapter/base.rb index e33349a5..277baf13 100644 --- a/lib/listen/adapter/base.rb +++ b/lib/listen/adapter/base.rb @@ -70,7 +70,7 @@ def start @started = true calling_stack = caller.dup - Listen::Internals::ThreadPool.add do + @run_thread = Thread.new do begin @snapshots.values.each do |snapshot| _timed('Record.build()') { snapshot.record.build } @@ -96,6 +96,7 @@ def self.usable? private def _stop + @run_thread.kill.join if (@run_thread ||= nil) end def _timed(title) diff --git a/lib/listen/adapter/darwin.rb b/lib/listen/adapter/darwin.rb index cdf11346..9bee821d 100644 --- a/lib/listen/adapter/darwin.rb +++ b/lib/listen/adapter/darwin.rb @@ -1,5 +1,4 @@ require 'thread' -require 'listen/internals/thread_pool' module Listen module Adapter @@ -45,7 +44,7 @@ def _run dirs_to_watch = @callbacks.keys.map(&:to_s) _log(:info) { "fsevent: watching: #{dirs_to_watch.inspect}" } worker.watch(dirs_to_watch, { latency: options.latency }, &method(:_process_changes)) - Listen::Internals::ThreadPool.add { _run_worker(worker) } + @worker_thread = Thread.new { _run_worker(worker) } end def _process_changes(dirs) @@ -74,6 +73,11 @@ def _run_worker(worker) format_string = 'fsevent: running worker failed: %s:%s called from: %s' _log_exception format_string, caller end + + def _stop + @worker_thread.kill.join if (@worker_thread ||= nil) + super + end end end end diff --git a/lib/listen/adapter/linux.rb b/lib/listen/adapter/linux.rb index c1ddf736..fab175e8 100644 --- a/lib/listen/adapter/linux.rb +++ b/lib/listen/adapter/linux.rb @@ -35,9 +35,7 @@ def _configure(directory, &callback) end def _run - Thread.current[:listen_blocking_read_thread] = true @worker.run - Thread.current[:listen_blocking_read_thread] = false end def _process_event(dir, event) @@ -99,7 +97,15 @@ def _dir_event?(event) end def _stop - @worker && @worker.close + @worker.close if (@worker ||= nil) + + # You can't kill a thread that is doing a sysread in JRuby, so + # skip `join` and pray thread dies fast... + if RUBY_ENGINE == 'jruby' + @run_thread.kill if (@run_thread ||= nil) + else + super + end end end end diff --git a/lib/listen/event/loop.rb b/lib/listen/event/loop.rb index 008dbfb7..45a40d73 100644 --- a/lib/listen/event/loop.rb +++ b/lib/listen/event/loop.rb @@ -38,7 +38,7 @@ def processing? def setup # TODO: use a Fiber instead? q = ::Queue.new - @wait_thread = Internals::ThreadPool.add do + @wait_thread = Thread.new do _wait_for_changes(q, config) end @@ -61,7 +61,7 @@ def teardown return unless wait_thread if wait_thread.alive? _wakeup(:teardown) - wait_thread.join + wait_thread.join.kill end @wait_thread = nil end diff --git a/lib/listen/internals/thread_pool.rb b/lib/listen/internals/thread_pool.rb deleted file mode 100644 index e112d90a..00000000 --- a/lib/listen/internals/thread_pool.rb +++ /dev/null @@ -1,29 +0,0 @@ -module Listen - # @private api - module Internals - module ThreadPool - def self.add(&block) - Thread.new { block.call }.tap do |th| - (@threads ||= Queue.new) << th - end - end - - def self.stop - return unless @threads ||= nil - return if @threads.empty? # return to avoid using possibly stubbed Queue - - killed = Queue.new - # You can't kill a read on a descriptor in JRuby, so let's just - # ignore running threads (listen rb-inotify waiting for disk activity - # before closing) pray threads die faster than they are created... - limit = RUBY_ENGINE == 'jruby' ? [1] : [] - - killed << @threads.pop.kill until @threads.empty? - until killed.empty? - th = killed.pop - th.join(*limit) unless th[:listen_blocking_read_thread] - end - end - end - end -end diff --git a/spec/lib/listen/adapter/base_spec.rb b/spec/lib/listen/adapter/base_spec.rb index f9df49cd..a0a65149 100644 --- a/spec/lib/listen/adapter/base_spec.rb +++ b/spec/lib/listen/adapter/base_spec.rb @@ -41,8 +41,7 @@ def _process_event(dir, event) allow(config).to receive(:silencer).and_return(silencer) allow(config).to receive(:adapter_options).and_return(adapter_options) - allow(Listen::Internals::ThreadPool). - to receive(:add) { |&block| block.call } + allow(Thread).to receive(:new) { |&block| block.call } # Stuff that happens in configure() allow(Listen::Record).to receive(:new).with(dir1).and_return(record) diff --git a/spec/lib/listen/event/loop_spec.rb b/spec/lib/listen/event/loop_spec.rb index f0b33c2f..359b6000 100644 --- a/spec/lib/listen/event/loop_spec.rb +++ b/spec/lib/listen/event/loop_spec.rb @@ -1,7 +1,6 @@ require 'thread' require 'listen/event/config' require 'listen/event/loop' -require 'listen/internals/thread_pool' RSpec.describe Listen::Event::Loop do let(:config) { instance_double(Listen::Event::Config, 'config') } @@ -26,14 +25,14 @@ allow(Listen::Event::Processor).to receive(:new).with(config, reasons). and_return(processor) - allow(Listen::Internals::ThreadPool).to receive(:add) do |*args, &block| + allow(Thread).to receive(:new) do |*args, &block| fail 'Unstubbed call:'\ - " ThreadPool.add(#{args.map(&:inspect) * ','},&#{block.inspect})" + " Thread.new(#{args.map(&:inspect) * ','},&#{block.inspect})" end allow(config).to receive(:min_delay_between_events).and_return(1.234) - allow(Listen::Internals::ThreadPool).to receive(:add) do |*_, &block| + allow(Thread).to receive(:new) do |*_, &block| blocks[:thread_block] = block thread end @@ -184,7 +183,7 @@ describe '#teardown' do before do allow(reasons).to receive(:<<) - allow(thread).to receive(:join) + allow(thread).to receive_message_chain(:join, :kill) end it 'frees the thread' do @@ -192,7 +191,7 @@ end it 'waits for the thread to finish' do - expect(thread).to receive(:join) + expect(thread).to receive_message_chain(:join, :kill) subject.teardown end diff --git a/spec/lib/listen/listener_spec.rb b/spec/lib/listen/listener_spec.rb index f48d240b..96d8d26d 100644 --- a/spec/lib/listen/listener_spec.rb +++ b/spec/lib/listen/listener_spec.rb @@ -71,7 +71,7 @@ allow(Pathname).to receive(:new).with('dir1').and_return(dir1) allow(Pathname).to receive(:new).with('dir2').and_return(dir2) - allow(Internals::ThreadPool).to receive(:add).and_return(processing_thread) + allow(Thread).to receive(:new).and_return(processing_thread) allow(processing_thread).to receive(:alive?).and_return(true) allow(processing_thread).to receive(:wakeup) allow(processing_thread).to receive(:join) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 2d2a8fc4..326f0df1 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -3,8 +3,6 @@ Listen.logger.level = Logger::WARN unless ENV['LISTEN_GEM_DEBUGGING'] -require 'listen/internals/thread_pool' - def ci? ENV['CI'] end @@ -47,9 +45,9 @@ def fake_path(str, options = {}) Thread.abort_on_exception = true RSpec.configuration.before(:each) do - Listen::Internals::ThreadPool.stop + Listen.stop end RSpec.configuration.after(:each) do - Listen::Internals::ThreadPool.stop + Listen.stop end