Skip to content
This repository has been archived by the owner on Jan 26, 2022. It is now read-only.

Commit

Permalink
Refactor Listener
Browse files Browse the repository at this point in the history
Offer a stronger contract for listeners, where the listener's block is only
called once after being closed.
  • Loading branch information
Kowshik Prakasam and Pieter Noordhuis committed Mar 2, 2013
1 parent 9e383a7 commit a7f4ff5
Showing 1 changed file with 25 additions and 14 deletions.
39 changes: 25 additions & 14 deletions em-posix-spawn/lib/em/posix/spawn/child.rb
Expand Up @@ -350,24 +350,37 @@ def initialize(name, &block)
@name = name
@block = block
@offset = 0
@closed = false
end

# Sends only the update. Also ensures that duplicate calls
# are suppressed.
# Sends the part of the buffer that has not yet been sent.
def call(buffer)
to_be_sent = buffer.slice(@offset..-1)
to_be_sent ||= ""
@offset = buffer.length
@block.call(self, to_be_sent)
return if @block.nil?

to_call = @block
to_call.call(self, slice_from_buffer(buffer))
end

def close
@closed = true
# Sends the part of the buffer that has not yet been sent,
# after closing the listener. After this, the listener
# will not receive any more calls.
def close(buffer = "")
return if @block.nil?

to_call, @block = @block, nil
to_call.call(self, slice_from_buffer(buffer))
end

def closed?
@closed
@block.nil?
end

private

def slice_from_buffer(buffer)
to_be_sent = buffer.slice(@offset..-1)
to_be_sent ||= ""
@offset = buffer.length
to_be_sent
end
end

Expand All @@ -388,8 +401,7 @@ def after_read(&block)
# receives the entire buffer if it attaches to the process only
# after its completion.
EM.next_tick do
listener.close
listener.call(@buffer)
listener.close(@buffer)
end
elsif !@buffer.empty?
# If this stream's buffer is non-empty, pass it to the listener
Expand All @@ -412,11 +424,10 @@ def notify_readable
rescue Errno::EAGAIN, Errno::EINTR
rescue EOFError
@after_read.each do |listener|
listener.close
# Ensure that the listener receives the entire buffer if it
# attaches to the process only just before the stream is
# closed.
listener.call(@buffer)
listener.close(@buffer)
end
close
set_deferred_success
Expand Down

0 comments on commit a7f4ff5

Please sign in to comment.