Skip to content

Commit

Permalink
Remove Listen::Internals::ThreadPool
Browse files Browse the repository at this point in the history
`Listen::Internals::ThreadPool` manages all listener threads.  Thus the
only way to kill and subsequently garbage collect listener threads is by
calling the `Listen::Internals::ThreadPool.stop` method (typically done
via `Listen.stop`).  This is a problem when individual listeners must be
stopped and abandoned for garbage collection.

This commit removes `Listen::Internals::ThreadPool` in favor of listener
instances managing their own threads.

Partially addresses #476.
  • Loading branch information
jonathanhefner committed Jul 23, 2020
1 parent 63eff89 commit f9f26c2
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 54 deletions.
4 changes: 0 additions & 4 deletions lib/listen.rb
Expand Up @@ -3,8 +3,6 @@
require 'listen/logger'
require 'listen/listener'

require 'listen/internals/thread_pool'

# Always set up logging by default first time file is required
#
# NOTE: If you need to clear the logger completely, do so *after*
Expand Down Expand Up @@ -43,8 +41,6 @@ def to(*args, &block)
# This is used by the `listen` binary to handle Ctrl-C
#
def stop
Internals::ThreadPool.stop

while (listener = @listeners.deq(true))
begin
listener.stop
Expand Down
3 changes: 2 additions & 1 deletion lib/listen/adapter/base.rb
Expand Up @@ -70,7 +70,7 @@ def start
@started = true

calling_stack = caller.dup
Listen::Internals::ThreadPool.add do
@run_thread = Thread.new do
begin
@snapshots.values.each do |snapshot|
_timed('Record.build()') { snapshot.record.build }
Expand All @@ -96,6 +96,7 @@ def self.usable?
private

def _stop
@run_thread.kill.join if (@run_thread ||= nil)
end

def _timed(title)
Expand Down
8 changes: 6 additions & 2 deletions lib/listen/adapter/darwin.rb
@@ -1,5 +1,4 @@
require 'thread'
require 'listen/internals/thread_pool'

module Listen
module Adapter
Expand Down Expand Up @@ -45,7 +44,7 @@ def _run
dirs_to_watch = @callbacks.keys.map(&:to_s)
_log(:info) { "fsevent: watching: #{dirs_to_watch.inspect}" }
worker.watch(dirs_to_watch, { latency: options.latency }, &method(:_process_changes))
Listen::Internals::ThreadPool.add { _run_worker(worker) }
@worker_thread = Thread.new { _run_worker(worker) }
end

def _process_changes(dirs)
Expand Down Expand Up @@ -74,6 +73,11 @@ def _run_worker(worker)
format_string = 'fsevent: running worker failed: %s:%s called from: %s'
_log_exception format_string, caller
end

def _stop
@worker_thread.kill.join if (@worker_thread ||= nil)
super
end
end
end
end
12 changes: 9 additions & 3 deletions lib/listen/adapter/linux.rb
Expand Up @@ -35,9 +35,7 @@ def _configure(directory, &callback)
end

def _run
Thread.current[:listen_blocking_read_thread] = true
@worker.run
Thread.current[:listen_blocking_read_thread] = false
end

def _process_event(dir, event)
Expand Down Expand Up @@ -99,7 +97,15 @@ def _dir_event?(event)
end

def _stop
@worker && @worker.close
@worker.close if (@worker ||= nil)

# You can't kill a thread that is doing a sysread in JRuby, so
# skip `join` and pray thread dies fast...
if RUBY_ENGINE == 'jruby'
@run_thread.kill if (@run_thread ||= nil)
else
super
end
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/listen/event/loop.rb
Expand Up @@ -38,7 +38,7 @@ def processing?
def setup
# TODO: use a Fiber instead?
q = ::Queue.new
@wait_thread = Internals::ThreadPool.add do
@wait_thread = Thread.new do
_wait_for_changes(q, config)
end

Expand All @@ -61,7 +61,7 @@ def teardown
return unless wait_thread
if wait_thread.alive?
_wakeup(:teardown)
wait_thread.join
wait_thread.join.kill
end
@wait_thread = nil
end
Expand Down
29 changes: 0 additions & 29 deletions lib/listen/internals/thread_pool.rb

This file was deleted.

3 changes: 1 addition & 2 deletions spec/lib/listen/adapter/base_spec.rb
Expand Up @@ -41,8 +41,7 @@ def _process_event(dir, event)
allow(config).to receive(:silencer).and_return(silencer)
allow(config).to receive(:adapter_options).and_return(adapter_options)

allow(Listen::Internals::ThreadPool).
to receive(:add) { |&block| block.call }
allow(Thread).to receive(:new) { |&block| block.call }

# Stuff that happens in configure()
allow(Listen::Record).to receive(:new).with(dir1).and_return(record)
Expand Down
11 changes: 5 additions & 6 deletions spec/lib/listen/event/loop_spec.rb
@@ -1,7 +1,6 @@
require 'thread'
require 'listen/event/config'
require 'listen/event/loop'
require 'listen/internals/thread_pool'

RSpec.describe Listen::Event::Loop do
let(:config) { instance_double(Listen::Event::Config, 'config') }
Expand All @@ -26,14 +25,14 @@
allow(Listen::Event::Processor).to receive(:new).with(config, reasons).
and_return(processor)

allow(Listen::Internals::ThreadPool).to receive(:add) do |*args, &block|
allow(Thread).to receive(:new) do |*args, &block|
fail 'Unstubbed call:'\
" ThreadPool.add(#{args.map(&:inspect) * ','},&#{block.inspect})"
" Thread.new(#{args.map(&:inspect) * ','},&#{block.inspect})"
end

allow(config).to receive(:min_delay_between_events).and_return(1.234)

allow(Listen::Internals::ThreadPool).to receive(:add) do |*_, &block|
allow(Thread).to receive(:new) do |*_, &block|
blocks[:thread_block] = block
thread
end
Expand Down Expand Up @@ -184,15 +183,15 @@
describe '#teardown' do
before do
allow(reasons).to receive(:<<)
allow(thread).to receive(:join)
allow(thread).to receive_message_chain(:join, :kill)
end

it 'frees the thread' do
subject.teardown
end

it 'waits for the thread to finish' do
expect(thread).to receive(:join)
expect(thread).to receive_message_chain(:join, :kill)
subject.teardown
end

Expand Down
2 changes: 1 addition & 1 deletion spec/lib/listen/listener_spec.rb
Expand Up @@ -71,7 +71,7 @@
allow(Pathname).to receive(:new).with('dir1').and_return(dir1)
allow(Pathname).to receive(:new).with('dir2').and_return(dir2)

allow(Internals::ThreadPool).to receive(:add).and_return(processing_thread)
allow(Thread).to receive(:new).and_return(processing_thread)
allow(processing_thread).to receive(:alive?).and_return(true)
allow(processing_thread).to receive(:wakeup)
allow(processing_thread).to receive(:join)
Expand Down
6 changes: 2 additions & 4 deletions spec/spec_helper.rb
Expand Up @@ -3,8 +3,6 @@

Listen.logger.level = Logger::WARN unless ENV['LISTEN_GEM_DEBUGGING']

require 'listen/internals/thread_pool'

def ci?
ENV['CI']
end
Expand Down Expand Up @@ -47,9 +45,9 @@ def fake_path(str, options = {})
Thread.abort_on_exception = true

RSpec.configuration.before(:each) do
Listen::Internals::ThreadPool.stop
Listen.stop
end

RSpec.configuration.after(:each) do
Listen::Internals::ThreadPool.stop
Listen.stop
end

0 comments on commit f9f26c2

Please sign in to comment.