Skip to content

Commit

Permalink
Add base protocol test and clean up protocol test implementations.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jan 28, 2015
1 parent b48b0d6 commit cd941ee
Show file tree
Hide file tree
Showing 13 changed files with 203 additions and 335 deletions.
Expand Up @@ -58,7 +58,9 @@ public void channelRead(ChannelHandlerContext context, Object message) {
CompletableFuture<ByteBuffer> responseFuture = responseFutures.remove(responseId); CompletableFuture<ByteBuffer> responseFuture = responseFutures.remove(responseId);
if (responseFuture != null) { if (responseFuture != null) {
int length = response.readInt(); int length = response.readInt();
ByteBuffer buffer = response.nioBuffer(response.readerIndex(), length); ByteBuffer buffer = ByteBuffer.allocateDirect(length);
response.readBytes(buffer);
buffer.flip();
responseFuture.complete(buffer); responseFuture.complete(buffer);
} }
response.release(); response.release();
Expand Down
Expand Up @@ -161,8 +161,9 @@ public void channelRead(final ChannelHandlerContext context, Object message) {
response.writeLong(requestId); response.writeLong(requestId);
response.writeInt(result.remaining()); response.writeInt(result.remaining());
response.writeBytes(result); response.writeBytes(result);
context.writeAndFlush(response); context.writeAndFlush(response).addListener(future -> {
request.release(); request.release();
});
}); });
} }
}); });
Expand Down
Expand Up @@ -15,50 +15,26 @@
*/ */
package net.kuujo.copycat.netty; package net.kuujo.copycat.netty;


import net.jodah.concurrentunit.ConcurrentTestCase; import net.kuujo.copycat.protocol.Protocol;
import net.kuujo.copycat.protocol.ProtocolClient; import net.kuujo.copycat.test.ProtocolTest;
import net.kuujo.copycat.protocol.ProtocolServer;
import org.testng.annotations.Test; import org.testng.annotations.Test;


import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

/** /**
* Netty TCP protocol test. * Netty TCP protocol test.
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
@Test @Test
public class NettyTcpProtocolTest extends ConcurrentTestCase { public class NettyTcpProtocolTest extends ProtocolTest {

/**
* Tests sending from a client to sever and back.
*/
public void testSendReceive() throws Throwable {
NettyTcpProtocol protocol = new NettyTcpProtocol();
ProtocolServer server = protocol.createServer(new URI("tcp://localhost:5555"));
ProtocolClient client = protocol.createClient(new URI("tcp://localhost:5555"));


server.handler(buffer -> { @Override
byte[] bytes = new byte[buffer.remaining()]; protected Protocol createProtocol() {
buffer.get(bytes); return new NettyTcpProtocol();
threadAssertEquals(new String(bytes), "Hello world!"); }
return CompletableFuture.completedFuture(ByteBuffer.wrap("Hello world back!".getBytes()));
});
server.listen().thenRun(this::resume);
await(5000);

client.connect().thenRun(this::resume);
await(5000);


client.write(ByteBuffer.wrap("Hello world!".getBytes())).thenAccept(buffer -> { @Override
byte[] bytes = new byte[buffer.remaining()]; protected String createUri() {
buffer.get(bytes); return "tcp://localhost:5555";
threadAssertEquals(new String(bytes), "Hello world back!");
resume();
});
await(5000);
} }


} }
10 changes: 10 additions & 0 deletions test-tools/pom.xml
Expand Up @@ -31,6 +31,16 @@
<artifactId>copycat-core</artifactId> <artifactId>copycat-core</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.8.8</version>
</dependency>
<dependency>
<groupId>net.jodah</groupId>
<artifactId>concurrentunit</artifactId>
<version>0.3.2</version>
</dependency>
</dependencies> </dependencies>


<build> <build>
Expand Down
92 changes: 92 additions & 0 deletions test-tools/src/main/java/net/kuujo/copycat/test/ProtocolTest.java
@@ -0,0 +1,92 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.test;

import net.jodah.concurrentunit.ConcurrentTestCase;
import net.kuujo.copycat.protocol.Protocol;
import net.kuujo.copycat.protocol.ProtocolClient;
import net.kuujo.copycat.protocol.ProtocolServer;
import org.testng.annotations.Test;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

/**
* Base protocol test.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
@Test
public abstract class ProtocolTest extends ConcurrentTestCase {

/**
* Creates a test protocol.
*/
protected abstract Protocol createProtocol();

/**
* Creates a test URI.
*/
protected abstract String createUri();

/**
* Tests sending from a client to sever and back.
*/
public void testSendReceive() throws Throwable {
Protocol protocol = createProtocol();
String uri = createUri();
ProtocolServer server = protocol.createServer(new URI(uri));
ProtocolClient client = protocol.createClient(new URI(uri));

server.handler(buffer -> {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
threadAssertEquals(new String(bytes), "Hello world!");
return CompletableFuture.completedFuture(ByteBuffer.wrap("Hello world back!".getBytes()));
});
server.listen().thenRunAsync(this::resume);
await(5000);

client.connect().thenRunAsync(this::resume);
await(5000);

client.write(ByteBuffer.wrap("Hello world!".getBytes())).thenAcceptAsync(buffer -> {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
threadAssertEquals(new String(bytes), "Hello world back!");
resume();
});
await(5000);

client.write(ByteBuffer.wrap("Hello world!".getBytes())).thenAcceptAsync(buffer -> {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
threadAssertEquals(new String(bytes), "Hello world back!");
resume();
});
await(5000);

client.write(ByteBuffer.wrap("Hello world!".getBytes())).thenAcceptAsync(buffer -> {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
threadAssertEquals(new String(bytes), "Hello world back!");
resume();
});
await(5000);
}

}
2 changes: 1 addition & 1 deletion vertx/pom.xml
Expand Up @@ -51,7 +51,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>net.kuujo.copycat</groupId> <groupId>net.kuujo.copycat</groupId>
<artifactId>copycat</artifactId> <artifactId>copycat-test-tools</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
Expand Down
Expand Up @@ -39,6 +39,8 @@ public class VertxEventBusProtocol extends AbstractProtocol {
private static final String CONFIGURATION = "eventbus"; private static final String CONFIGURATION = "eventbus";
private static final String DEFAULT_CONFIGURATION = "eventbus-defaults"; private static final String DEFAULT_CONFIGURATION = "eventbus-defaults";


private Vertx vertx;

public VertxEventBusProtocol() { public VertxEventBusProtocol() {
super(CONFIGURATION, DEFAULT_CONFIGURATION); super(CONFIGURATION, DEFAULT_CONFIGURATION);
} }
Expand Down Expand Up @@ -72,7 +74,7 @@ public VertxEventBusProtocol copy() {
* @param vertx The Vert.x instance. * @param vertx The Vert.x instance.
*/ */
public void setVertx(Vertx vertx) { public void setVertx(Vertx vertx) {
this.config = config.withValue(VERTX, ConfigValueFactory.fromAnyRef(Assert.isNotNull(vertx, "vertx"))); this.vertx = vertx;
} }


/** /**
Expand All @@ -81,7 +83,7 @@ public void setVertx(Vertx vertx) {
* @return The Vert.x instance. * @return The Vert.x instance.
*/ */
public Vertx getVertx() { public Vertx getVertx() {
return config.hasPath(VERTX) ? (Vertx) config.getValue(VERTX) : null; return vertx;
} }


/** /**
Expand All @@ -102,7 +104,7 @@ public VertxEventBusProtocol withVertx(Vertx vertx) {
* @throws java.lang.NullPointerException If the host is {@code null} * @throws java.lang.NullPointerException If the host is {@code null}
*/ */
public void setHost(String host) { public void setHost(String host) {
this.config = config.withValue(VERTX_HOST, ConfigValueFactory.fromAnyRef(Assert.isNotNull(host, "host"))); this.config = host != null ? config.withoutPath(VERTX_HOST) : config.withValue(VERTX_HOST, ConfigValueFactory.fromAnyRef(host));
} }


/** /**
Expand Down Expand Up @@ -160,7 +162,7 @@ public VertxEventBusProtocol withPort(int port) {
private Vertx createVertx() { private Vertx createVertx() {
String host = getHost(); String host = getHost();
int port = getPort(); int port = getPort();
return host != null ? VertxFactory.newVertx(port, host) : VertxFactory.newVertx(); return host != null && !host.isEmpty() ? VertxFactory.newVertx(port, host) : VertxFactory.newVertx();
} }


@Override @Override
Expand Down
Expand Up @@ -16,9 +16,9 @@
package net.kuujo.copycat.vertx; package net.kuujo.copycat.vertx;


import net.kuujo.copycat.CopycatException; import net.kuujo.copycat.CopycatException;
import net.kuujo.copycat.util.internal.Assert;
import net.kuujo.copycat.protocol.ProtocolClient; import net.kuujo.copycat.protocol.ProtocolClient;
import net.kuujo.copycat.protocol.ProtocolException; import net.kuujo.copycat.protocol.ProtocolException;
import net.kuujo.copycat.util.internal.Assert;
import org.vertx.java.core.AsyncResult; import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.Context; import org.vertx.java.core.Context;
import org.vertx.java.core.Handler; import org.vertx.java.core.Handler;
Expand Down Expand Up @@ -51,7 +51,22 @@ public CompletableFuture<ByteBuffer> write(ByteBuffer request) {
final CompletableFuture<ByteBuffer> future = new CompletableFuture<>(); final CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
byte[] bytes = new byte[request.remaining()]; byte[] bytes = new byte[request.remaining()];
request.get(bytes); request.get(bytes);
context.runOnContext(v -> { if (context != null) {
context.runOnContext(v -> {
vertx.eventBus().sendWithTimeout(address, bytes, 5000, (Handler<AsyncResult<Message<byte[]>>>) result -> {
if (result.succeeded()) {
future.complete(ByteBuffer.wrap(result.result().body()));
} else {
ReplyException exception = (ReplyException) result.cause();
if (exception.failureType() == ReplyFailure.NO_HANDLERS || exception.failureType() == ReplyFailure.TIMEOUT) {
future.completeExceptionally(new ProtocolException(exception));
} else {
future.completeExceptionally(new CopycatException(exception.getMessage()));
}
}
});
});
} else {
vertx.eventBus().sendWithTimeout(address, bytes, 5000, (Handler<AsyncResult<Message<byte[]>>>) result -> { vertx.eventBus().sendWithTimeout(address, bytes, 5000, (Handler<AsyncResult<Message<byte[]>>>) result -> {
if (result.succeeded()) { if (result.succeeded()) {
future.complete(ByteBuffer.wrap(result.result().body())); future.complete(ByteBuffer.wrap(result.result().body()));
Expand All @@ -64,7 +79,7 @@ public CompletableFuture<ByteBuffer> write(ByteBuffer request) {
} }
} }
}); });
}); }
return future; return future;
} }


Expand Down

0 comments on commit cd941ee

Please sign in to comment.