Skip to content

Commit

Permalink
Merge pull request #119 from celluloid/fix-timeout-semantics
Browse files Browse the repository at this point in the history
Disambiguate wakeup vs timeout (fixes #63, #66)
  • Loading branch information
tarcieri committed Dec 28, 2016
2 parents 62c1baa + 87a3471 commit b2fb065
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 24 deletions.
4 changes: 0 additions & 4 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ Lint/HandleExceptions:
Lint/Loop:
Enabled: false

# TODO: fix this
Lint/NonLocalExitFromIterator:
Enabled: false

#
# Metrics
#
Expand Down
2 changes: 1 addition & 1 deletion ext/nio4r/nio4r.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ struct NIO_Selector
struct ev_io wakeup;

int wakeup_reader, wakeup_writer;
int closed, selecting;
int closed, selecting, timed_out;
int ready_count;

VALUE ready_array;
Expand Down
12 changes: 10 additions & 2 deletions ext/nio4r/org/nio4r/Selector.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
public class Selector extends RubyObject {
private java.nio.channels.Selector selector;
private HashMap<SelectableChannel,SelectionKey> cancelledKeys;
private volatile boolean wakeupFired;

public Selector(final Ruby ruby, RubyClass rubyClass) {
super(ruby, rubyClass);
Expand All @@ -32,6 +33,8 @@ public Selector(final Ruby ruby, RubyClass rubyClass) {
@JRubyMethod
public IRubyObject initialize(ThreadContext context) {
this.cancelledKeys = new HashMap<SelectableChannel,SelectionKey>();
this.wakeupFired = false;

try {
this.selector = java.nio.channels.Selector.open();
} catch(IOException ie) {
Expand Down Expand Up @@ -172,13 +175,16 @@ public synchronized IRubyObject select(ThreadContext context, IRubyObject timeou
throw context.getRuntime().newIOError("selector is closed");
}

this.wakeupFired = false;
int ready = doSelect(runtime, context, timeout);

/* Timeout or wakeup */
if(ready <= 0)
/* Timeout */
if(ready <= 0 && !this.wakeupFired) {
return context.nil;
}

RubyArray array = null;

if(!block.isGiven()) {
array = runtime.newArray(this.selector.selectedKeys().size());
}
Expand Down Expand Up @@ -264,7 +270,9 @@ public IRubyObject wakeup(ThreadContext context) {
throw context.getRuntime().newIOError("selector is closed");
}

this.wakeupFired = true;
this.selector.wakeup();

return context.nil;
}
}
35 changes: 24 additions & 11 deletions ext/nio4r/selector.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ static VALUE NIO_Selector_allocate(VALUE klass)

selector = (struct NIO_Selector *)xmalloc(sizeof(struct NIO_Selector));
selector->ev_loop = ev_loop_new(0);

ev_init(&selector->timer, NIO_Selector_timeout_callback);
selector->timer.data = (void *)selector;

selector->wakeup_reader = fds[0];
selector->wakeup_writer = fds[1];
Expand All @@ -112,7 +114,7 @@ static VALUE NIO_Selector_allocate(VALUE klass)
selector->wakeup.data = (void *)selector;
ev_io_start(selector->ev_loop, &selector->wakeup);

selector->closed = selector->selecting = selector->ready_count = 0;
selector->closed = selector->selecting = selector->timed_out = selector->ready_count = 0;
selector->ready_array = Qnil;

return Data_Wrap_Struct(klass, NIO_Selector_mark, NIO_Selector_free, selector);
Expand Down Expand Up @@ -335,24 +337,31 @@ static VALUE NIO_Selector_select_synchronized(VALUE *args)
}

ready = NIO_Selector_run(selector, args[1]);
if(ready > 0) {
if(rb_block_given_p()) {
return INT2NUM(ready);
} else {
ready_array = selector->ready_array;

/* Timeout */
if(ready < 0) {
if(!rb_block_given_p()) {
selector->ready_array = Qnil;
return ready_array;
}

return Qnil;
}

if(rb_block_given_p()) {
return INT2NUM(ready);
} else {
ready_array = selector->ready_array;
selector->ready_array = Qnil;
return Qnil;
return ready_array;
}
}

static int NIO_Selector_run(struct NIO_Selector *selector, VALUE timeout)
{
int result;

selector->selecting = 1;
selector->timed_out = 0;

/* Implement the optional timeout (if any) as a ev_timer */
if(timeout != Qnil) {
Expand All @@ -369,7 +378,11 @@ static int NIO_Selector_run(struct NIO_Selector *selector, VALUE timeout)
result = selector->ready_count;
selector->selecting = selector->ready_count = 0;

return result;
if(!result && selector->timed_out) {
return -1;
} else {
return result;
}
}

/* Wake the selector up from another thread */
Expand Down Expand Up @@ -432,8 +445,8 @@ static VALUE NIO_Selector_is_empty(VALUE self)
/* Called whenever a timeout fires on the event loop */
static void NIO_Selector_timeout_callback(struct ev_loop *ev_loop, struct ev_timer *timer, int revents)
{
/* We don't actually need to do anything here, the mere firing of the
timer is sufficient to interrupt the selector. However, libev still wants a callback */
struct NIO_Selector *selector = (struct NIO_Selector *)timer->data;
selector->timed_out = 1;
}

/* Called whenever a wakeup request is sent to a selector */
Expand Down
7 changes: 2 additions & 5 deletions lib/nio/selector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,13 @@ def select(timeout = nil)
end

ready_readers, ready_writers = Kernel.select readers, writers, [], timeout
return unless ready_readers # timeout or wakeup
return unless ready_readers # timeout

ready_readers.each do |io|
if io == @wakeup
# Clear all wakeup signals we've received by reading them
# Wakeups should have level triggered behavior
@wakeup.read(@wakeup.stat.size)

# TODO: return something other than nil on wakeup
return
else
monitor = @selectables[io]
monitor.readiness = :r
Expand All @@ -107,7 +104,7 @@ def select(timeout = nil)
selected_monitors.each { |m| yield m }
selected_monitors.size
else
selected_monitors
selected_monitors.to_a
end
end

Expand Down
2 changes: 1 addition & 1 deletion spec/nio/selector_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@

thread = Thread.new do
started_at = Time.now
expect(subject.select).to be_nil
expect(subject.select).to eq []
Time.now - started_at
end

Expand Down

0 comments on commit b2fb065

Please sign in to comment.