Skip to content

Commit

Permalink
Merge pull request #471 from guard/reduce_multiple_fsevent_processes
Browse files Browse the repository at this point in the history
Use one `fsevent_watch` process per listener instead of one per dir
  • Loading branch information
ioquatix committed Dec 5, 2019
2 parents 0cb1597 + caf46a0 commit e5a14cf
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 164 deletions.
62 changes: 24 additions & 38 deletions lib/listen/adapter/darwin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,59 +35,45 @@ def self.usable?

private

# NOTE: each directory gets a DIFFERENT callback!
def _configure(dir, &callback)
require 'rb-fsevent'
opts = { latency: options.latency }

@workers ||= ::Queue.new
@workers << FSEvent.new.tap do |worker|
_log :debug, "fsevent: watching: #{dir.to_s.inspect}"
worker.watch(dir.to_s, opts, &callback)
end
@callbacks[dir] = callback
end

def _run
first = @workers.pop

# NOTE: _run is called within a thread, so run every other
# worker in it's own thread
_run_workers_in_background(_to_array(@workers))
_run_worker(first)
require 'rb-fsevent'
worker = FSEvent.new
dirs_to_watch = @callbacks.keys.map(&:to_s)
_log(:info) { "fsevent: watching: #{dirs_to_watch.inspect}" }
worker.watch(dirs_to_watch, { latency: options.latency }, &method(:_process_changes))
Listen::Internals::ThreadPool.add { _run_worker(worker) }
end

def _process_event(dir, event)
_log :debug, "fsevent: processing event: #{event.inspect}"
event.each do |path|
new_path = Pathname.new(path.sub(%r{\/$}, ''))
_log :debug, "fsevent: #{new_path}"
# TODO: does this preserve symlinks?
rel_path = new_path.relative_path_from(dir).to_s
_queue_change(:dir, dir, rel_path, recursive: true)
def _process_changes(dirs)
dirs.each do |dir|
dir = Pathname.new(dir.sub(%r{\/$}, ''))

@callbacks.each do |watched_dir, callback|
if watched_dir.eql?(dir) || Listen::Directory.ascendant_of?(watched_dir, dir)
callback.call(dir)
end
end
end
end

def _process_event(dir, path)
_log(:debug) { "fsevent: processing path: #{path.inspect}" }
# TODO: does this preserve symlinks?
rel_path = path.relative_path_from(dir).to_s
_queue_change(:dir, dir, rel_path, recursive: true)
end

def _run_worker(worker)
_log :debug, "fsevent: running worker: #{worker.inspect}"
_log(:debug) { "fsevent: running worker: #{worker.inspect}" }
worker.run
rescue
format_string = 'fsevent: running worker failed: %s:%s called from: %s'
_log_exception format_string, caller
end

def _run_workers_in_background(workers)
workers.each do |worker|
# NOTE: while passing local variables to the block below is not
# thread safe, using 'worker' from the enumerator above is ok
Listen::Internals::ThreadPool.add { _run_worker(worker) }
end
end

def _to_array(queue)
workers = []
workers << queue.pop until queue.empty?
workers
end
end
end
end
6 changes: 6 additions & 0 deletions lib/listen/directory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ def self.scan(snapshot, rel_path, options)
raise
end

def self.ascendant_of?(base, other)
other.ascend do |ascendant|
break true if base == ascendant
end
end

def self._async_changes(snapshot, path, previous, options)
fail "Not a Pathname: #{path.inspect}" unless path.respond_to?(:children)
previous.each do |entry, data|
Expand Down
126 changes: 0 additions & 126 deletions spec/lib/listen/adapter/darwin_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,130 +69,4 @@
it { should eq 1234 }
end
end

describe 'multiple dirs' do
let(:dir1) { fake_path('/foo/dir1', cleanpath: fake_path('/foo/dir1')) }
let(:dir2) { fake_path('/foo/dir2', cleanpath: fake_path('/foo/dir1')) }
let(:dir3) { fake_path('/foo/dir3', cleanpath: fake_path('/foo/dir1')) }

before do
allow(config).to receive(:queue).and_return(queue)
allow(config).to receive(:silencer).and_return(silencer)
end

let(:foo1) { double('fsevent1') }
let(:foo2) { double('fsevent2') }
let(:foo3) { double('fsevent3') }

before do
allow(FSEvent).to receive(:new).and_return(*expectations.values, nil)
expectations.each do |dir, obj|
allow(obj).to receive(:watch).with(dir.to_s, latency: 0.1)
end
end

describe 'configuration' do
before do
subject.configure
end

context 'with 1 directory' do
let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } }

let(:expectations) { { '/foo/dir1': foo1 } }

it 'configures directory' do
expect(foo1).to have_received(:watch).with('/foo/dir1', latency: 0.1)
end
end

context 'with 2 directories' do
let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } }
let(:expectations) { { dir1: foo1, dir2: foo2 } }

it 'configures directories' do
expect(foo1).to have_received(:watch).with('dir1', latency: 0.1)
expect(foo2).to have_received(:watch).with('dir2', latency: 0.1)
end
end

context 'with 3 directories' do
let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } }
let(:expectations) do
{
'/foo/dir1': foo1,
'/foo/dir2': foo2,
'/foo/dir3': foo3
}
end

it 'configures directories' do
expect(foo1).to have_received(:watch).with('/foo/dir1', latency: 0.1)
expect(foo2).to have_received(:watch).with('/foo/dir2', latency: 0.1)
expect(foo3).to have_received(:watch).with('/foo/dir3', latency: 0.1)
end
end
end

describe 'running threads' do
let(:running) { [] }
let(:directories) { expectations.keys.map { |p| Pathname(p.to_s) } }

before do
started = ::Queue.new
threads = ::Queue.new
left = ::Queue.new

# NOTE: Travis has a hard time creating threads on OSX
thread_start_overhead = 3
max_test_time = 3 * thread_start_overhead
block_time = max_test_time + thread_start_overhead

expectations.each do |name, _|
left << name
end

expectations.each do |_, obj|
allow(obj).to receive(:run) do
current_name = left.pop
threads << Thread.current
started << current_name
sleep block_time
end
end

Timeout.timeout(max_test_time) do
subject.start
sleep 0.1 until started.size == expectations.size
end

running << started.pop until started.empty?

killed = ::Queue.new
killed << threads.pop.kill until threads.empty?
killed.pop.join until killed.empty?
end

context 'with 1 directory' do
let(:expectations) { { dir1: foo1 } }
it 'runs all the workers without blocking' do
expect(running.sort).to eq(expectations.keys)
end
end

context 'with 2 directories' do
let(:expectations) { { dir1: foo1, dir2: foo2 } }
it 'runs all the workers without blocking' do
expect(running.sort).to eq(expectations.keys)
end
end

context 'with 3 directories' do
let(:expectations) { { dir1: foo1, dir2: foo2, dir3: foo3 } }
it 'runs all the workers without blocking' do
expect(running.sort).to eq(expectations.keys)
end
end
end
end
end

0 comments on commit e5a14cf

Please sign in to comment.