Skip to content

Commit

Permalink
Merge 4e926b4 into ba5059c
Browse files Browse the repository at this point in the history
  • Loading branch information
ColinDKelley committed Aug 12, 2020
2 parents ba5059c + 4e926b4 commit d4c2bff
Show file tree
Hide file tree
Showing 15 changed files with 228 additions and 358 deletions.
9 changes: 5 additions & 4 deletions lib/listen/adapter/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,7 @@ def start

def stop
_stop
end

def self.usable?
const_get('OS_REGEXP') =~ RbConfig::CONFIG['target_os']
config.queue.close # this causes queue.pop to return `nil` to the front-end
end

private
Expand Down Expand Up @@ -130,6 +127,10 @@ def _log_exception(msg, caller_stack)
end

class << self
def usable?
const_get('OS_REGEXP') =~ RbConfig::CONFIG['target_os']
end

private

def _log(*args, &block)
Expand Down
24 changes: 6 additions & 18 deletions lib/listen/event/config.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
module Listen
module Event
class Config
attr_reader :listener
attr_reader :event_queue
attr_reader :min_delay_between_events

def initialize(
listener,
event_queue,
Expand All @@ -15,8 +19,8 @@ def initialize(
@block = block
end

def sleep(*args)
Kernel.sleep(*args)
def sleep(seconds)
Kernel.sleep(seconds)
end

def call(*args)
Expand All @@ -27,29 +31,13 @@ def timestamp
Time.now.to_f
end

attr_reader :event_queue

def callable?
@block
end

def optimize_changes(changes)
@queue_optimizer.smoosh_changes(changes)
end

attr_reader :min_delay_between_events

def stopped?
listener.state == :stopped
end

def paused?
listener.state == :paused
end

private

attr_reader :listener
end
end
end
89 changes: 39 additions & 50 deletions lib/listen/event/loop.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,97 +6,86 @@
module Listen
module Event
class Loop
include Listen::FSM

class Error < RuntimeError
class NotStarted < Error
end
class ThreadFailedToStart < Error; end
class AlreadyStarted < Error; end
end

start_state :pre_start
state :pre_start
state :starting
state :started
state :stopped

def initialize(config)
@config = config
@wait_thread = nil
@state = :paused
@reasons = ::Queue.new
super()
end

def wakeup_on_event
return if stopped?
return unless processing?
return unless wait_thread.alive?
_wakeup(:event)
if started? && @wait_thread&.alive?
_wakeup(:event)
end
end

def paused?
wait_thread && state == :paused
def started?
state == :started
end

def processing?
return false if stopped?
return false if paused?
state == :processing
end
MAX_STARTUP_SECONDS = 5.0

def start
transition! :starting do
state == :pre_start or raise Error::AlreadyStarted
end

def setup
# TODO: use a Fiber instead?
q = ::Queue.new
@wait_thread = Internals::ThreadPool.add do
_wait_for_changes(q, config)
_process_changes
end

Listen::Logger.debug('Waiting for processing to start...')
Timeout.timeout(5) { q.pop }
end
Listen::Logger.debug("Waiting for processing to start...")

wait_for_state(:started, MAX_STARTUP_SECONDS) or raise Error::ThreadFailedToStart, "thread didn't start in #{MAX_STARTUP_SECONDS} seconds (in state: #{state.inspect})"

def resume
fail Error::NotStarted if stopped?
return unless wait_thread
_wakeup(:resume)
Listen::Logger.debug('Processing started.')
end

def pause
# TODO: works?
# fail NotImplementedError
end

def teardown
return unless wait_thread
if wait_thread.alive?
_wakeup(:teardown)
wait_thread.join
def stop
return if stopped?
transition! :stopped

if @wait_thread.alive?
@wait_thread.join
end
@wait_thread = nil
end

def stopped?
!wait_thread
state == :stopped
end

private

attr_reader :config
attr_reader :wait_thread
def _process_changes
processor = Event::Processor.new(@config, @reasons)

attr_accessor :state
transition! :started

def _wait_for_changes(ready_queue, config)
processor = Event::Processor.new(config, @reasons)
processor.loop_for(@config.min_delay_between_events)

_wait_until_resumed(ready_queue)
processor.loop_for(config.min_delay_between_events)
rescue StandardError => ex
_nice_error(ex)
end

def _sleep(*args)
Kernel.sleep(*args)
end

def _wait_until_resumed(ready_queue)
self.state = :paused
ready_queue << :ready
sleep
self.state = :processing
end

def _nice_error(ex)
indent = "\n -- "
msg = format(
Expand All @@ -110,7 +99,7 @@ def _nice_error(ex)

def _wakeup(reason)
@reasons << reason
wait_thread.wakeup
@wait_thread.wakeup
end
end
end
Expand Down
44 changes: 22 additions & 22 deletions lib/listen/event/processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module Event
class Processor
def initialize(config, reasons)
@config = config
@listener = config.listener
@reasons = reasons
_reset_no_unprocessed_events
end
Expand All @@ -13,10 +14,11 @@ def loop_for(latency)
@latency = latency

loop do
_wait_until_events
event = _wait_until_events
_check_stopped
_wait_until_events_calm_down
_wait_until_no_longer_paused
_process_changes
_process_changes(event)
end
rescue Stopped
Listen::Logger.debug('Processing stopped')
Expand All @@ -38,33 +40,31 @@ def _wait_until_events_calm_down

# give events a bit of time to accumulate so they can be
# compressed/optimized
_sleep(:waiting_until_latency, diff)
_sleep(diff)
end
end

def _wait_until_no_longer_paused
# TODO: may not be a good idea?
_sleep(:waiting_for_unpause) while config.paused?
@listener.wait_for_state(*(Listener.states.keys - [:paused]))
end

def _check_stopped
return unless config.stopped?
return unless @listener.stopped?

_flush_wakeup_reasons
raise Stopped
end

def _sleep(_local_reason, *args)
def _sleep(seconds)
_check_stopped
sleep_duration = config.sleep(*args)
config.sleep(seconds)
_check_stopped

_flush_wakeup_reasons do |reason|
next unless reason == :event
_remember_time_of_first_unprocessed_event unless config.paused?
if reason == :event && !@listener.paused?
_remember_time_of_first_unprocessed_event
end
end

sleep_duration
end

def _remember_time_of_first_unprocessed_event
Expand All @@ -79,16 +79,17 @@ def _deadline
@first_unprocessed_event_time + @latency
end

# blocks until event is popped
# returns the event or `nil` when the event_queue is closed
def _wait_until_events
# TODO: long sleep may not be a good idea?
_sleep(:waiting_for_events) while config.event_queue.empty?
@first_unprocessed_event_time ||= _timestamp
config.event_queue.pop.tap do |_event|
@first_unprocessed_event_time ||= _timestamp
end
end

def _flush_wakeup_reasons
reasons = @reasons
until reasons.empty?
reason = reasons.pop
until @reasons.empty?
reason = @reasons.pop
yield reason if block_given?
end
end
Expand All @@ -98,14 +99,13 @@ def _timestamp
end

# for easier testing without sleep loop
def _process_changes
def _process_changes(event)
_reset_no_unprocessed_events

changes = []
changes = [event]
changes << config.event_queue.pop until config.event_queue.empty?

callable = config.callable?
return unless callable
return unless config.callable?

hash = config.optimize_changes(changes)
result = [hash[:modified], hash[:added], hash[:removed]]
Expand Down
7 changes: 2 additions & 5 deletions lib/listen/event/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ def relative?
end
end

def initialize(config, &block)
def initialize(config)
@event_queue = ::Queue.new
@block = block
@config = config
end

Expand All @@ -31,17 +30,15 @@ def <<(args)

dir = _safe_relative_from_cwd(dir)
event_queue.public_send(:<<, [type, change, dir, path, options])

block.call(args) if block
end

delegate empty?: :event_queue
delegate pop: :event_queue
delegate close: :event_queue

private

attr_reader :event_queue
attr_reader :block
attr_reader :config

def _safe_relative_from_cwd(dir)
Expand Down

0 comments on commit d4c2bff

Please sign in to comment.