Skip to content

Commit

Permalink
Fix problem with local message report metric and test it properly
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Jul 5, 2016
1 parent 5338f63 commit 0235938
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java
Expand Up @@ -160,7 +160,7 @@ protected <R> void sendReply(MessageImpl msg, DeliveryOptions options, Handler<A
} }
} }


boolean isLocal() { protected boolean isLocal() {
return true; return true;
} }
} }
Expand Up @@ -246,7 +246,7 @@ public boolean isFromWire() {
return fromWire; return fromWire;
} }


boolean isLocal() { protected boolean isLocal() {
return isFromWire(); return !isFromWire();
} }
} }
12 changes: 6 additions & 6 deletions src/test/java/io/vertx/test/core/MetricsTest.java
Expand Up @@ -38,7 +38,6 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -278,10 +277,11 @@ private void testHandlerProcessMessage(Vertx from, Vertx to, int expectedLocalCo
assertEquals(ADDRESS1, registration.address); assertEquals(ADDRESS1, registration.address);
assertEquals(null, registration.repliedAddress); assertEquals(null, registration.repliedAddress);
assertEquals(1, registration.scheduleCount.get()); assertEquals(1, registration.scheduleCount.get());
assertEquals(expectedLocalCount, registration.localScheduleCount.get());
assertEquals(1, registration.beginCount.get()); assertEquals(1, registration.beginCount.get());
assertEquals(0, registration.endCount.get()); assertEquals(0, registration.endCount.get());
assertEquals(0, registration.failureCount.get()); assertEquals(0, registration.failureCount.get());
assertEquals(expectedLocalCount, registration.localCount.get()); assertEquals(expectedLocalCount, registration.localBeginCount.get());
msg.reply("pong"); msg.reply("pong");
}).completionHandler(onSuccess(v2 -> { }).completionHandler(onSuccess(v2 -> {
to.runOnContext(v3 -> { to.runOnContext(v3 -> {
Expand Down Expand Up @@ -309,7 +309,7 @@ private void testHandlerProcessMessage(Vertx from, Vertx to, int expectedLocalCo
// This might take a little time // This might take a little time
waitUntil(() -> 1 == registration.endCount.get()); waitUntil(() -> 1 == registration.endCount.get());
assertEquals(0, registration.failureCount.get()); assertEquals(0, registration.failureCount.get());
assertEquals(expectedLocalCount, registration.localCount.get()); assertEquals(expectedLocalCount, registration.localBeginCount.get());
testComplete(); testComplete();
}); });
waitUntil(() -> registration.scheduleCount.get() == 1); waitUntil(() -> registration.scheduleCount.get() == 1);
Expand Down Expand Up @@ -361,7 +361,7 @@ public void testHandlerMetricReply() throws Exception {
assertEquals(0, registration.scheduleCount.get()); assertEquals(0, registration.scheduleCount.get());
assertEquals(0, registration.beginCount.get()); assertEquals(0, registration.beginCount.get());
assertEquals(0, registration.endCount.get()); assertEquals(0, registration.endCount.get());
assertEquals(0, registration.localCount.get()); assertEquals(0, registration.localBeginCount.get());
msg.reply("pong"); msg.reply("pong");
}).completionHandler(ar -> { }).completionHandler(ar -> {
assertTrue(ar.succeeded()); assertTrue(ar.succeeded());
Expand All @@ -375,14 +375,14 @@ public void testHandlerMetricReply() throws Exception {
assertEquals(1, registration.scheduleCount.get()); assertEquals(1, registration.scheduleCount.get());
assertEquals(1, registration.beginCount.get()); assertEquals(1, registration.beginCount.get());
assertEquals(0, registration.endCount.get()); assertEquals(0, registration.endCount.get());
assertEquals(1, registration.localCount.get()); assertEquals(1, registration.localBeginCount.get());
vertx.runOnContext(v -> { vertx.runOnContext(v -> {
assertEquals(ADDRESS1, metrics.getRegistrations().get(0).address); assertEquals(ADDRESS1, metrics.getRegistrations().get(0).address);
assertEquals(ADDRESS1, registration.repliedAddress); assertEquals(ADDRESS1, registration.repliedAddress);
assertEquals(1, registration.scheduleCount.get()); assertEquals(1, registration.scheduleCount.get());
assertEquals(1, registration.beginCount.get()); assertEquals(1, registration.beginCount.get());
assertEquals(1, registration.endCount.get()); assertEquals(1, registration.endCount.get());
assertEquals(1, registration.localCount.get()); assertEquals(1, registration.localBeginCount.get());
}); });
testComplete(); testComplete();
}); });
Expand Down
Expand Up @@ -96,13 +96,16 @@ public void handlerUnregistered(HandlerMetric handler) {
@Override @Override
public void scheduleMessage(HandlerMetric handler, boolean local) { public void scheduleMessage(HandlerMetric handler, boolean local) {
handler.scheduleCount.incrementAndGet(); handler.scheduleCount.incrementAndGet();
if (local) {
handler.localScheduleCount.incrementAndGet();
}
} }


@Override @Override
public void beginHandleMessage(HandlerMetric handler, boolean local) { public void beginHandleMessage(HandlerMetric handler, boolean local) {
handler.beginCount.incrementAndGet(); handler.beginCount.incrementAndGet();
if (local) { if (local) {
handler.localCount.incrementAndGet(); handler.localBeginCount.incrementAndGet();
} }
} }


Expand Down
5 changes: 3 additions & 2 deletions src/test/java/io/vertx/test/fakemetrics/HandlerMetric.java
Expand Up @@ -26,10 +26,11 @@ public class HandlerMetric {
public final String address; public final String address;
public final String repliedAddress; public final String repliedAddress;
public final AtomicInteger scheduleCount = new AtomicInteger(); public final AtomicInteger scheduleCount = new AtomicInteger();
public final AtomicInteger localScheduleCount = new AtomicInteger();
public final AtomicInteger beginCount = new AtomicInteger(); public final AtomicInteger beginCount = new AtomicInteger();
public final AtomicInteger endCount = new AtomicInteger(); public final AtomicInteger endCount = new AtomicInteger();
public final AtomicInteger failureCount = new AtomicInteger(); public final AtomicInteger failureCount = new AtomicInteger();
public final AtomicInteger localCount = new AtomicInteger(); public final AtomicInteger localBeginCount = new AtomicInteger();


public HandlerMetric(String address, String repliedAddress) { public HandlerMetric(String address, String repliedAddress) {
this.address = address; this.address = address;
Expand All @@ -39,6 +40,6 @@ public HandlerMetric(String address, String repliedAddress) {
@Override @Override
public String toString() { public String toString() {
return "HandlerRegistration[address=" + address + ",repliedAddress=" + repliedAddress + ",beginCount=" + beginCount.get() + return "HandlerRegistration[address=" + address + ",repliedAddress=" + repliedAddress + ",beginCount=" + beginCount.get() +
",endCount=" + endCount.get() + ",failureCount=" + failureCount + ",localCount=" + localCount.get() + "]"; ",endCount=" + endCount.get() + ",failureCount=" + failureCount + ",localCount=" + localBeginCount.get() + "]";
} }
} }

0 comments on commit 0235938

Please sign in to comment.