Skip to content

Commit

Permalink
Update test tools.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Feb 14, 2015
1 parent a28499f commit 9279ccf
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 62 deletions.
67 changes: 33 additions & 34 deletions test-tools/src/main/java/net/kuujo/copycat/test/ProtocolTest.java
Expand Up @@ -19,19 +19,19 @@
import net.kuujo.copycat.cluster.Cluster; import net.kuujo.copycat.cluster.Cluster;
import net.kuujo.copycat.cluster.ClusterConfig; import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.MembershipEvent; import net.kuujo.copycat.cluster.MembershipEvent;
import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig;
import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
import net.kuujo.copycat.log.BufferedLog; import net.kuujo.copycat.log.BufferedLog;
import net.kuujo.copycat.protocol.Protocol; import net.kuujo.copycat.protocol.Protocol;
import net.kuujo.copycat.protocol.ProtocolClient; import net.kuujo.copycat.protocol.ProtocolClient;
import net.kuujo.copycat.protocol.ProtocolServer; import net.kuujo.copycat.protocol.ProtocolServer;
import net.kuujo.copycat.resource.ResourceContext;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
import org.testng.annotations.Test; import org.testng.annotations.Test;


import java.net.URI; import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;


/** /**
Expand All @@ -56,10 +56,8 @@ public abstract class ProtocolTest extends ConcurrentTestCase {
* Creates a new test resource. * Creates a new test resource.
*/ */
private TestResource createTestResource(ClusterConfig cluster) { private TestResource createTestResource(ClusterConfig cluster) {
ClusterCoordinator coordinator = new DefaultClusterCoordinator(new CoordinatorConfig().withName("test").withClusterConfig(cluster)); return new TestResource(new ResourceContext("test", new TestResource.Config().withLog(new BufferedLog()), cluster, Executors
return coordinator.<TestResource>getResource("test", new TestResource.Config().withLog(new BufferedLog()).resolve(cluster)) .newSingleThreadScheduledExecutor(new NamedThreadFactory("copycat-test-%d"))));
.addStartupTask(() -> coordinator.open().thenApply(v -> null))
.addShutdownTask(coordinator::close);
} }


/** /**
Expand Down Expand Up @@ -180,42 +178,43 @@ public void testSendReceive() throws Throwable {
ProtocolServer server = protocol.createServer(new URI(uri)); ProtocolServer server = protocol.createServer(new URI(uri));
ProtocolClient client = protocol.createClient(new URI(uri)); ProtocolClient client = protocol.createClient(new URI(uri));


server.handler(buffer -> { server.connectListener(connection -> {
byte[] bytes = new byte[buffer.remaining()]; connection.handler(buffer -> {
buffer.get(bytes); byte[] bytes = new byte[buffer.remaining()];
threadAssertEquals(new String(bytes), "Hello world!"); buffer.get(bytes);
return CompletableFuture.completedFuture(ByteBuffer.wrap("Hello world back!".getBytes())); threadAssertEquals(new String(bytes), "Hello world!");
return CompletableFuture.completedFuture(ByteBuffer.wrap("Hello world back!".getBytes()));
});
}); });

server.listen().thenRunAsync(this::resume); server.listen().thenRunAsync(this::resume);
await(5000); await(5000);


client.connect().thenRunAsync(this::resume); client.connect().thenRunAsync(this::resume);
await(5000); await(5000);


expectResume(); expectResumes(3);
client.write(ByteBuffer.wrap("Hello world!".getBytes())).thenAcceptAsync(buffer -> { client.connect().thenAccept(connection -> {
byte[] bytes = new byte[buffer.remaining()]; connection.write(ByteBuffer.wrap("Hello world!".getBytes())).thenAcceptAsync(buffer -> {
buffer.get(bytes); byte[] bytes = new byte[buffer.remaining()];
threadAssertEquals(new String(bytes), "Hello world back!"); buffer.get(bytes);
resume(); threadAssertEquals(new String(bytes), "Hello world back!");
}); resume();
await(5000); });


expectResume(); connection.write(ByteBuffer.wrap("Hello world!".getBytes())).thenAcceptAsync(buffer -> {
client.write(ByteBuffer.wrap("Hello world!".getBytes())).thenAcceptAsync(buffer -> { byte[] bytes = new byte[buffer.remaining()];
byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes);
buffer.get(bytes); threadAssertEquals(new String(bytes), "Hello world back!");
threadAssertEquals(new String(bytes), "Hello world back!"); resume();
resume(); });
});
await(5000);


expectResume(); connection.write(ByteBuffer.wrap("Hello world!".getBytes())).thenAcceptAsync(buffer -> {
client.write(ByteBuffer.wrap("Hello world!".getBytes())).thenAcceptAsync(buffer -> { byte[] bytes = new byte[buffer.remaining()];
byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes);
buffer.get(bytes); threadAssertEquals(new String(bytes), "Hello world back!");
threadAssertEquals(new String(bytes), "Hello world back!"); resume();
resume(); });
}); });
await(5000); await(5000);


Expand Down
30 changes: 2 additions & 28 deletions test-tools/src/main/java/net/kuujo/copycat/test/TestResource.java
Expand Up @@ -15,15 +15,12 @@
*/ */
package net.kuujo.copycat.test; package net.kuujo.copycat.test;


import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig;
import net.kuujo.copycat.resource.ResourceConfig; import net.kuujo.copycat.resource.ResourceConfig;
import net.kuujo.copycat.resource.ResourceContext;
import net.kuujo.copycat.resource.ResourceState; import net.kuujo.copycat.resource.ResourceState;
import net.kuujo.copycat.resource.internal.AbstractResource; import net.kuujo.copycat.resource.internal.AbstractResource;
import net.kuujo.copycat.resource.internal.ResourceManager;


import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;


/** /**
* Test resource implementation. * Test resource implementation.
Expand All @@ -32,7 +29,7 @@
*/ */
public class TestResource extends AbstractResource<TestResource> { public class TestResource extends AbstractResource<TestResource> {


public TestResource(ResourceManager context) { public TestResource(ResourceContext context) {
super(context); super(context);
} }


Expand All @@ -41,16 +38,6 @@ public ResourceState state() {
return ResourceState.HEALTHY; return ResourceState.HEALTHY;
} }


@Override
public CompletableFuture<TestResource> open() {
return runStartupTasks().thenCompose(v -> context.open()).thenApply(v -> this);
}

@Override
public CompletableFuture<Void> close() {
return context.close().thenCompose(v -> runShutdownTasks());
}

/** /**
* Test resource configuration. * Test resource configuration.
*/ */
Expand All @@ -70,19 +57,6 @@ public Config(String... resources) {
super(resources); super(resources);
} }


@Override
public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
return new CoordinatedResourceConfig(super.toMap())
.withElectionTimeout(getElectionTimeout())
.withHeartbeatInterval(getHeartbeatInterval())
.withResourceType(TestResource.class)
.withLog(getLog())
.withSerializer(getSerializer())
.withExecutor(getExecutor())
.withResourceConfig(this)
.withReplicas(getReplicas().isEmpty() ? cluster.getMembers() : getReplicas());
}

} }


} }

0 comments on commit 9279ccf

Please sign in to comment.