Skip to content

Commit

Permalink
Cancelling a timeoutstream should not call endHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed May 11, 2015
1 parent f2a06ed commit bbb7864
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 36 deletions.
11 changes: 4 additions & 7 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Expand Up @@ -773,7 +773,9 @@ public TimeoutStream exceptionHandler(Handler<Throwable> handler) {

@Override
public void cancel() {
handler(null);
if (id != null) {
VertxImpl.this.cancelTimer(id);
}
}

@Override
Expand All @@ -785,12 +787,7 @@ public synchronized TimeoutStream handler(Handler<Long> handler) {
this.handler = handler;
id = scheduleTimeout(getOrCreateContext(), this, delay, periodic);
} else {
if (id != null) {
VertxImpl.this.cancelTimer(id);
if (endHandler != null) {
runOnContext(endHandler);
}
}
cancel();
}
return this;
}
Expand Down
50 changes: 21 additions & 29 deletions src/test/java/io/vertx/test/core/TimerTest.java
Expand Up @@ -91,6 +91,7 @@ private void periodic(long delay) throws Exception {
final AtomicLong id = new AtomicLong(-1);
id.set(vertx.setPeriodic(delay, new Handler<Long>() {
int count;

public void handle(Long timerID) {
assertEquals(id.get(), timerID.longValue());
count++;
Expand Down Expand Up @@ -163,26 +164,33 @@ public void testTimerStreamExceptionDuringHandle() throws Exception {
@Test
public void testTimerStreamCallingWithNullHandlerCancelsTheTimer() throws Exception {
ReadStream<Long> timer = vertx.timerStream(10);
AtomicInteger count = new AtomicInteger();
timer.handler(l -> {
fail();
});
timer.endHandler(v -> {
testComplete();
if (count.incrementAndGet() == 1) {
timer.handler(null);
vertx.setTimer(200, id -> {
assertEquals(1, count.get());
testComplete();
});
} else {
fail();
}
});
timer.handler(null);
await();
}

@Test
public void testTimerStreamCancellation() throws Exception {
TimeoutStream timer = vertx.timerStream(10);
AtomicBoolean called = new AtomicBoolean();
timer.handler(l -> {
fail();
called.set(true);
});
timer.endHandler(v -> {
timer.cancel();
vertx.setTimer(500, id -> {
assertFalse(called.get());
testComplete();
});
timer.cancel();
await();
}

Expand Down Expand Up @@ -220,7 +228,7 @@ public void testTimerPause() throws Exception {

@Test
public void testPeriodicStreamHandler() throws Exception {
ReadStream<Long> timer = vertx.periodicStream(10);
TimeoutStream timer = vertx.periodicStream(10);
AtomicInteger count = new AtomicInteger();
timer.handler(l -> {
int value = count.incrementAndGet();
Expand All @@ -230,14 +238,15 @@ public void testPeriodicStreamHandler() throws Exception {
case 1:
throw new RuntimeException();
case 2:
timer.handler(null);
timer.cancel();
testComplete();
break;
default:
fail();
}
});
timer.endHandler(v -> {
testComplete();
fail();
});
await();
}
Expand Down Expand Up @@ -274,7 +283,7 @@ public void testPeriodicPauseResume() throws Exception {
}

@Test
public void testTimeoutStreamEndCallbackAsynchronously1() {
public void testTimeoutStreamEndCallbackAsynchronously() {
TimeoutStream stream = vertx.timerStream(10);
ThreadLocal<Object> stack = new ThreadLocal<>();
stack.set(true);
Expand All @@ -283,24 +292,7 @@ public void testTimeoutStreamEndCallbackAsynchronously1() {
assertNull(stack.get());
testComplete();
});
stream.handler(id -> {});
await();
}

// This test does not pass
@Test
public void testTimeoutStreamEndCallbackAsynchronously2() {
TimeoutStream stream = vertx.periodicStream(10);
ThreadLocal<Object> stack = new ThreadLocal<>();
stream.endHandler(v -> {
assertTrue(Vertx.currentContext().isEventLoopContext());
assertNull(stack.get());
testComplete();
});
stream.handler(id -> {
stack.set(true);
stream.cancel();
stack.set(null);
});
await();
}
Expand Down

0 comments on commit bbb7864

Please sign in to comment.