Skip to content

Commit

Permalink
Code syntax improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
thibaudgg committed Apr 9, 2013
1 parent 39bdda5 commit 016b2e2
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 133 deletions.
77 changes: 42 additions & 35 deletions lib/listen/adapter.rb
Expand Up @@ -5,7 +5,8 @@


module Listen module Listen
class Adapter class Adapter
attr_accessor :directories, :latency, :paused attr_accessor :directories, :callback, :stopped, :paused,
:mutex, :changed_directories, :turnstile, :latency


# The list of existing optimized adapters. # The list of existing optimized adapters.
OPTIMIZED_ADAPTERS = %w[Darwin Linux BSD Windows] OPTIMIZED_ADAPTERS = %w[Darwin Linux BSD Windows]
Expand Down Expand Up @@ -38,8 +39,8 @@ class Adapter
# @option options [String, Boolean] polling_fallback_message to change polling fallback message or remove it # @option options [String, Boolean] polling_fallback_message to change polling fallback message or remove it
# @option options [Float] latency the delay between checking for changes in seconds # @option options [Float] latency the delay between checking for changes in seconds
# #
# @yield [changed_dirs, options] callback the callback called when a change happens # @yield [changed_directories, options] callback the callback called when a change happens
# @yieldparam [Array<String>] changed_dirs the changed directories # @yieldparam [Array<String>] changed_directories the changed directories
# @yieldparam [Hash] options callback options (like recursive: true) # @yieldparam [Hash] options callback options (like recursive: true)
# #
# @return [Listen::Adapter] the chosen adapter # @return [Listen::Adapter] the chosen adapter
Expand All @@ -65,38 +66,39 @@ def self.select_and_initialize(directories, options = {}, &callback)
Adapters::Polling.new(directories, options, &callback) Adapters::Polling.new(directories, options, &callback)
end end



# Initializes the adapter. # Initializes the adapter.
# #
# @param [String, Array<String>] directories the directories to watch # @param [String, Array<String>] directories the directories to watch
# @param [Hash] options the adapter options # @param [Hash] options the adapter options
# @option options [Float] latency the delay between checking for changes in seconds # @option options [Float] latency the delay between checking for changes in seconds
# @option options [Boolean] report_changes whether or not to automatically report changes (run the callback) # @option options [Boolean] report_changes whether or not to automatically report changes (run the callback)
# #
# @yield [changed_dirs, options] callback Callback called when a change happens # @yield [changed_directories, options] callback Callback called when a change happens
# @yieldparam [Array<String>] changed_dirs the changed directories # @yieldparam [Array<String>] changed_directories the changed directories
# @yieldparam [Hash] options callback options (like recursive: true) # @yieldparam [Hash] options callback options (like recursive: true)
# #
# @return [Listen::Adapter] the adapter # @return [Listen::Adapter] the adapter
# #
def initialize(directories, options = {}, &callback) def initialize(directories, options = {}, &callback)
@directories = Array(directories) @directories = Array(directories)
@callback = callback @callback = callback
@stopped = true @stopped = true
@paused = false @paused = false
@mutex = Mutex.new @mutex = Mutex.new
@changed_dirs = Set.new @changed_directories = Set.new
@turnstile = Turnstile.new @turnstile = Turnstile.new
@latency ||= options[:latency] || DEFAULT_LATENCY @latency ||= options[:latency] || DEFAULT_LATENCY
@report_changes = options[:report_changes].nil? ? true : options[:report_changes] @report_changes = options.fetch(:report_changes, true)
end end


# Starts the adapter. # Starts the adapter.
# #
# @param [Boolean] blocking whether or not to block the current thread after starting # @param [Boolean] blocking whether or not to block the current thread after starting
# #
def start(blocking = true) def start(blocking = true)
@mutex.synchronize do mutex.synchronize do
return unless @stopped return unless stopped
@stopped = false @stopped = false
end end
end end
Expand All @@ -105,7 +107,7 @@ def start(blocking = true)
# #
def stop def stop
@stopped = true @stopped = true
@turnstile.signal # ensure no thread is blocked turnstile.signal # ensure no thread is blocked
end end


# Pauses the adapter. # Pauses the adapter.
Expand All @@ -125,22 +127,22 @@ def unpause
# @return [Boolean] whether the adapter is started or not # @return [Boolean] whether the adapter is started or not
# #
def started? def started?
!@stopped !stopped
end end


# Returns whether the adapter is paused or not. # Returns whether the adapter is paused or not.
# #
# @return [Boolean] whether the adapter is paused or not # @return [Boolean] whether the adapter is paused or not
# #
def paused? def paused?
@paused paused
end end


# Blocks the main thread until the poll thread # Blocks the main thread until the poll thread
# runs the callback. # runs the callback.
# #
def wait_for_callback def wait_for_callback
@turnstile.wait unless @paused turnstile.wait unless paused
end end


# Blocks the main thread until N changes are # Blocks the main thread until N changes are
Expand All @@ -150,12 +152,12 @@ def wait_for_changes(threshold = 0)
changes = 0 changes = 0


loop do loop do
@mutex.synchronize { changes = @changed_dirs.size } mutex.synchronize { changes = changed_directories.size }


return if @paused || @stopped return if paused || stopped
return if changes >= threshold return if changes >= threshold


sleep(@latency) sleep(latency)
end end
end end


Expand Down Expand Up @@ -192,9 +194,10 @@ def self.usable_and_works?(directories, options = {})
# @return [Boolean] whether the adapter works or not # @return [Boolean] whether the adapter works or not
# #
def self.works?(directory, options = {}) def self.works?(directory, options = {})
work, test_file = false, "#{directory}/.listen_test" work = false
callback = lambda { |*| work = true } test_file = "#{directory}/.listen_test"
adapter = self.new(directory, options, &callback) callback = lambda { |*| work = true }
adapter = self.new(directory, options, &callback)
adapter.start(false) adapter.start(false)


FileUtils.touch(test_file) FileUtils.touch(test_file)
Expand All @@ -214,14 +217,18 @@ def self.works?(directory, options = {})
def report_changes def report_changes
changed_dirs = nil changed_dirs = nil


@mutex.synchronize do mutex.synchronize do
return if @changed_dirs.empty? return if @changed_directories.empty?
changed_dirs = @changed_dirs.to_a changed_dirs = @changed_directories.to_a
@changed_dirs.clear @changed_directories.clear
end end


@callback.call(changed_dirs, {}) callback.call(changed_dirs, {})
@turnstile.signal turnstile.signal
end

def report_changes?
@report_changes
end end


private private
Expand All @@ -243,9 +250,9 @@ def self.warn_polling_fallback(warning, options)
# Polls changed directories and reports them back # Polls changed directories and reports them back
# when there are changes. # when there are changes.
# #
def poll_changed_dirs def poll_changed_directories
until @stopped until stopped
sleep(@latency) sleep(latency)
report_changes report_changes
end end
end end
Expand Down
35 changes: 18 additions & 17 deletions lib/listen/adapters/bsd.rb
Expand Up @@ -16,13 +16,15 @@ class BSD < Adapter
# #
EVENTS = [:delete, :write, :extend, :attrib, :link, :rename, :revoke] EVENTS = [:delete, :write, :extend, :attrib, :link, :rename, :revoke]


attr_accessor :worker, :worker_thread, :poll_thread

# Initializes the Adapter. # Initializes the Adapter.
# #
# @see Listen::Adapter#initialize # @see Listen::Adapter#initialize
# #
def initialize(directories, options = {}, &callback) def initialize(directories, options = {}, &callback)
super super
@kqueue = init_kqueue @worker = init_worker
end end


# Starts the adapter. # Starts the adapter.
Expand All @@ -32,28 +34,27 @@ def initialize(directories, options = {}, &callback)
def start(blocking = true) def start(blocking = true)
super super


@kqueue_thread = Thread.new do @worker_thread = Thread.new do
until @stopped until stopped
@kqueue.poll worker.poll
sleep(@latency) sleep(latency)
end end
end end
@poll_thread = Thread.new { poll_changed_dirs } if @report_changes @poll_thread = Thread.new { poll_changed_directories } if report_changes?

worker_thread.join if blocking
@kqueue_thread.join if blocking
end end


# Stops the adapter. # Stops the adapter.
# #
def stop def stop
@mutex.synchronize do mutex.synchronize do
return if @stopped return if stopped
super super
end end


@kqueue.stop worker.stop
Thread.kill(@kqueue_thread) if @kqueue_thread Thread.kill(worker_thread) if worker_thread
@poll_thread.join if @poll_thread poll_thread.join if poll_thread
end end


# Checks if the adapter is usable on BSD. # Checks if the adapter is usable on BSD.
Expand All @@ -72,15 +73,15 @@ def self.usable?
# #
# @return [INotify::Notifier] initialized kqueue # @return [INotify::Notifier] initialized kqueue
# #
def init_kqueue def init_worker
require 'find' require 'find'


callback = lambda do |event| callback = lambda do |event|
path = event.watcher.path path = event.watcher.path
@mutex.synchronize do mutex.synchronize do
# kqueue watches everything, but Listen only needs the # kqueue watches everything, but Listen only needs the
# directory where stuffs happens. # directory where stuffs happens.
@changed_dirs << (File.directory?(path) ? path : File.dirname(path)) @changed_directories << (File.directory?(path) ? path : File.dirname(path))


# If it is a directory, and it has a write flag, it means a # If it is a directory, and it has a write flag, it means a
# file has been added so find out which and deal with it. # file has been added so find out which and deal with it.
Expand All @@ -98,7 +99,7 @@ def init_kqueue
end end


KQueue::Queue.new.tap do |queue| KQueue::Queue.new.tap do |queue|
@directories.each do |directory| directories.each do |directory|
Find.find(directory) do |path| Find.find(directory) do |path|
queue.watch_file(path, *EVENTS, &callback) queue.watch_file(path, *EVENTS, &callback)
end end
Expand Down
26 changes: 14 additions & 12 deletions lib/listen/adapters/darwin.rb
Expand Up @@ -11,6 +11,8 @@ class Darwin < Adapter


LAST_SEPARATOR_REGEX = /\/$/ LAST_SEPARATOR_REGEX = /\/$/


attr_accessor :worker, :worker_thread, :poll_thread

# Initializes the Adapter. # Initializes the Adapter.
# #
# @see Listen::Adapter#initialize # @see Listen::Adapter#initialize
Expand All @@ -27,29 +29,29 @@ def initialize(directories, options = {}, &callback)
def start(blocking = true) def start(blocking = true)
super super


@worker_thread = Thread.new { @worker.run } @worker_thread = Thread.new { worker.run }


# The FSEvent worker needs some time to start up. Turnstiles can't # The FSEvent worker needs some time to start up. Turnstiles can't
# be used to wait for it as it runs in a loop. # be used to wait for it as it runs in a loop.
# TODO: Find a better way to block until the worker starts. # TODO: Find a better way to block until the worker starts.
sleep 0.1 sleep 0.1


@poll_thread = Thread.new { poll_changed_dirs } if @report_changes @poll_thread = Thread.new { poll_changed_directories } if report_changes?


@worker_thread.join if blocking worker_thread.join if blocking
end end


# Stops the adapter. # Stops the adapter.
# #
def stop def stop
@mutex.synchronize do mutex.synchronize do
return if @stopped return if stopped
super super
end end


@worker.stop worker.stop
@worker_thread.join if @worker_thread Thread.kill(worker_thread) if worker_thread
@poll_thread.join if @poll_thread poll_thread.join if poll_thread
end end


# Checks if the adapter is usable on Mac OSX. # Checks if the adapter is usable on Mac OSX.
Expand All @@ -70,11 +72,11 @@ def self.usable?
# #
def init_worker def init_worker
FSEvent.new.tap do |worker| FSEvent.new.tap do |worker|
worker.watch(@directories.dup, latency: @latency) do |changes| worker.watch(directories.dup, latency: latency) do |changes|
next if @paused next if paused


@mutex.synchronize do mutex.synchronize do
changes.each { |path| @changed_dirs << path.sub(LAST_SEPARATOR_REGEX, '') } changes.each { |path| @changed_directories << path.sub(LAST_SEPARATOR_REGEX, '') }
end end
end end
end end
Expand Down
28 changes: 14 additions & 14 deletions lib/listen/adapters/linux.rb
Expand Up @@ -25,6 +25,8 @@ class Linux < Adapter
for information on how to solve this issue. for information on how to solve this issue.
EOS EOS


attr_accessor :worker, :worker_thread, :poll_thread

# Initializes the Adapter. # Initializes the Adapter.
# #
# @see Listen::Adapter#initialize # @see Listen::Adapter#initialize
Expand All @@ -43,23 +45,23 @@ def initialize(directories, options = {}, &callback)
def start(blocking = true) def start(blocking = true)
super super


@worker_thread = Thread.new { @worker.run } @worker_thread = Thread.new { worker.run }
@poll_thread = Thread.new { poll_changed_dirs } if @report_changes @poll_thread = Thread.new { poll_changed_directories } if report_changes?


@worker_thread.join if blocking worker_thread.join if blocking
end end


# Stops the adapter. # Stops the adapter.
# #
def stop def stop
@mutex.synchronize do mutex.synchronize do
return if @stopped return if stopped
super super
end end


@worker.stop worker.stop
Thread.kill(@worker_thread) if @worker_thread Thread.kill(worker_thread) if worker_thread
@poll_thread.join if @poll_thread poll_thread.join if poll_thread
end end


# Checks if the adapter is usable on Linux. # Checks if the adapter is usable on Linux.
Expand All @@ -80,7 +82,7 @@ def self.usable?
# #
def init_worker def init_worker
callback = lambda do |event| callback = lambda do |event|
if @paused || ( if paused || (
# Event on root directory # Event on root directory
event.name == "" event.name == ""
) || ( ) || (
Expand All @@ -94,15 +96,13 @@ def init_worker
next next
end end


@mutex.synchronize do mutex.synchronize do
@changed_dirs << File.dirname(event.absolute_name) @changed_directories << File.dirname(event.absolute_name)
end end
end end


INotify::Notifier.new.tap do |worker| INotify::Notifier.new.tap do |worker|
@directories.each do |directory| directories.each { |dir| worker.watch(dir, *EVENTS, &callback) }
worker.watch(directory, *EVENTS, &callback)
end
end end
end end


Expand Down

0 comments on commit 016b2e2

Please sign in to comment.