Skip to content

Commit

Permalink
Use local communication for primitive tests to avoid non-determinism …
Browse files Browse the repository at this point in the history
…in unit tests.
  • Loading branch information
kuujo committed Dec 7, 2017
1 parent 7550ca5 commit 034534f
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 86 deletions.
57 changes: 32 additions & 25 deletions core/src/main/java/io/atomix/Atomix.java
Expand Up @@ -78,7 +78,7 @@ public static Builder builder() {
return new Builder(); return new Builder();
} }


private static final Logger LOGGER = LoggerFactory.getLogger(Atomix.class); protected static final Logger LOGGER = LoggerFactory.getLogger(Atomix.class);


private final ManagedClusterService cluster; private final ManagedClusterService cluster;
private final ManagedMessagingService messagingService; private final ManagedMessagingService messagingService;
Expand Down Expand Up @@ -255,21 +255,21 @@ public String toString() {
* Atomix builder. * Atomix builder.
*/ */
public static class Builder implements io.atomix.utils.Builder<Atomix> { public static class Builder implements io.atomix.utils.Builder<Atomix> {
private static final String DEFAULT_CLUSTER_NAME = "atomix"; protected static final String DEFAULT_CLUSTER_NAME = "atomix";
private static final int DEFAULT_COORDINATION_PARTITION_SIZE = 3; protected static final int DEFAULT_COORDINATION_PARTITION_SIZE = 3;
private static final int DEFAULT_DATA_PARTITIONS = 71; protected static final int DEFAULT_DATA_PARTITIONS = 71;
private static final String COORDINATION_GROUP_NAME = "coordination"; protected static final String COORDINATION_GROUP_NAME = "coordination";
private static final String DATA_GROUP_NAME = "data"; protected static final String DATA_GROUP_NAME = "data";


private String name = DEFAULT_CLUSTER_NAME; protected String name = DEFAULT_CLUSTER_NAME;
private Node localNode; protected Node localNode;
private Collection<Node> bootstrapNodes; protected Collection<Node> bootstrapNodes;
private File dataDirectory = new File(System.getProperty("user.dir"), "data"); protected File dataDirectory = new File(System.getProperty("user.dir"), "data");
private int numCoordinationPartitions; protected int numCoordinationPartitions;
private int coordinationPartitionSize = DEFAULT_COORDINATION_PARTITION_SIZE; protected int coordinationPartitionSize = DEFAULT_COORDINATION_PARTITION_SIZE;
private int numDataPartitions = DEFAULT_DATA_PARTITIONS; protected int numDataPartitions = DEFAULT_DATA_PARTITIONS;
private Collection<ManagedPartitionGroup> partitionGroups = new ArrayList<>(); protected Collection<ManagedPartitionGroup> partitionGroups = new ArrayList<>();
private PrimitiveTypeRegistry primitiveTypes = new PrimitiveTypeRegistry(); protected PrimitiveTypeRegistry primitiveTypes = new PrimitiveTypeRegistry();


/** /**
* Sets the cluster name. * Sets the cluster name.
Expand Down Expand Up @@ -444,10 +444,7 @@ public Atomix build() {
ManagedClusterService clusterService = buildClusterService(messagingService); ManagedClusterService clusterService = buildClusterService(messagingService);
ManagedClusterCommunicationService clusterCommunicator = buildClusterCommunicationService(clusterService, messagingService); ManagedClusterCommunicationService clusterCommunicator = buildClusterCommunicationService(clusterService, messagingService);
ManagedClusterEventService clusterEventService = buildClusterEventService(clusterService, clusterCommunicator); ManagedClusterEventService clusterEventService = buildClusterEventService(clusterService, clusterCommunicator);
ManagedPartitionGroup corePartitionGroup = RaftPartitionGroup.builder("core") ManagedPartitionGroup corePartitionGroup = buildCorePartitionGroup();
.withNumPartitions(1)
.withDataDirectory(new File(dataDirectory, "core"))
.build();
ManagedPartitionService partitionService = buildPartitionService(); ManagedPartitionService partitionService = buildPartitionService();
return new Atomix( return new Atomix(
clusterService, clusterService,
Expand All @@ -462,7 +459,7 @@ public Atomix build() {
/** /**
* Builds a default messaging service. * Builds a default messaging service.
*/ */
private ManagedMessagingService buildMessagingService() { protected ManagedMessagingService buildMessagingService() {
return NettyMessagingService.builder() return NettyMessagingService.builder()
.withName(name) .withName(name)
.withEndpoint(localNode.endpoint()) .withEndpoint(localNode.endpoint())
Expand All @@ -472,7 +469,7 @@ private ManagedMessagingService buildMessagingService() {
/** /**
* Builds a cluster service. * Builds a cluster service.
*/ */
private ManagedClusterService buildClusterService(MessagingService messagingService) { protected ManagedClusterService buildClusterService(MessagingService messagingService) {
return new DefaultClusterService(ClusterMetadata.builder() return new DefaultClusterService(ClusterMetadata.builder()
.withLocalNode(localNode) .withLocalNode(localNode)
.withBootstrapNodes(bootstrapNodes) .withBootstrapNodes(bootstrapNodes)
Expand All @@ -482,23 +479,33 @@ private ManagedClusterService buildClusterService(MessagingService messagingServ
/** /**
* Builds a cluster communication service. * Builds a cluster communication service.
*/ */
private ManagedClusterCommunicationService buildClusterCommunicationService( protected ManagedClusterCommunicationService buildClusterCommunicationService(
ClusterService clusterService, MessagingService messagingService) { ClusterService clusterService, MessagingService messagingService) {
return new DefaultClusterCommunicationService(clusterService, messagingService); return new DefaultClusterCommunicationService(clusterService, messagingService);
} }


/** /**
* Builds a cluster event service. * Builds a cluster event service.
*/ */
private ManagedClusterEventService buildClusterEventService( protected ManagedClusterEventService buildClusterEventService(
ClusterService clusterService, ClusterCommunicationService clusterCommunicator) { ClusterService clusterService, ClusterCommunicationService clusterCommunicator) {
return new DefaultClusterEventService(clusterService, clusterCommunicator); return new DefaultClusterEventService(clusterService, clusterCommunicator);
} }


/**
* Builds the core partition group.
*/
protected ManagedPartitionGroup buildCorePartitionGroup() {
return RaftPartitionGroup.builder("core")
.withNumPartitions(1)
.withDataDirectory(new File(dataDirectory, "core"))
.build();
}

/** /**
* Builds a partition service. * Builds a partition service.
*/ */
private ManagedPartitionService buildPartitionService() { protected ManagedPartitionService buildPartitionService() {
if (partitionGroups.isEmpty()) { if (partitionGroups.isEmpty()) {
partitionGroups.add(RaftPartitionGroup.builder(COORDINATION_GROUP_NAME) partitionGroups.add(RaftPartitionGroup.builder(COORDINATION_GROUP_NAME)
.withDataDirectory(new File(dataDirectory, COORDINATION_GROUP_NAME)) .withDataDirectory(new File(dataDirectory, COORDINATION_GROUP_NAME))
Expand Down
110 changes: 60 additions & 50 deletions core/src/test/java/io/atomix/AbstractAtomixTest.java
Expand Up @@ -15,21 +15,24 @@
*/ */
package io.atomix; package io.atomix;


import io.atomix.cluster.ManagedClusterService;
import io.atomix.cluster.Node; import io.atomix.cluster.Node;
import io.atomix.cluster.NodeId; import io.atomix.cluster.NodeId;
import io.atomix.cluster.messaging.ManagedClusterCommunicationService;
import io.atomix.cluster.messaging.ManagedClusterEventService;
import io.atomix.messaging.Endpoint; import io.atomix.messaging.Endpoint;
import io.atomix.messaging.ManagedMessagingService;
import io.atomix.primitive.PrimitiveTypeRegistry;
import io.atomix.primitive.partition.ManagedPartitionGroup;
import io.atomix.primitive.partition.ManagedPartitionService;
import io.atomix.primitive.partition.impl.DefaultPartitionService;
import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup;
import io.atomix.protocols.raft.partition.RaftPartitionGroup;
import io.atomix.storage.StorageLevel;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;


import java.io.File; import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
Expand All @@ -43,13 +46,26 @@
* Base Atomix test. * Base Atomix test.
*/ */
public abstract class AbstractAtomixTest { public abstract class AbstractAtomixTest {
private static final int BASE_PORT = 5000;
private static TestMessagingServiceFactory messagingServiceFactory;
private static List<Atomix> instances; private static List<Atomix> instances;
private static Map<Integer, Endpoint> endpoints; private static Map<Integer, Endpoint> endpoints;
private static int id = 10; private static int id = 10;


/**
* Returns a new Atomix instance.
*
* @return a new Atomix instance.
*/
protected Atomix atomix() {
Atomix instance = createAtomix(id++, 1, 2, 3).open().join();
instances.add(instance);
return instance;
}

@BeforeClass @BeforeClass
public static void setupAtomix() throws Exception { public static void setupAtomix() throws Exception {
deleteData(); messagingServiceFactory = new TestMessagingServiceFactory();
endpoints = new HashMap<>(); endpoints = new HashMap<>();
instances = new ArrayList<>(); instances = new ArrayList<>();
instances.add(createAtomix(1, 1, 2, 3)); instances.add(createAtomix(1, 1, 2, 3));
Expand All @@ -65,17 +81,17 @@ public static void setupAtomix() throws Exception {
private static Atomix createAtomix(int id, Integer... ids) { private static Atomix createAtomix(int id, Integer... ids) {
Node localNode = Node.builder() Node localNode = Node.builder()
.withId(NodeId.from(String.valueOf(id))) .withId(NodeId.from(String.valueOf(id)))
.withEndpoint(endpoints.computeIfAbsent(id, i -> Endpoint.from("localhost", findAvailablePort(5000)))) .withEndpoint(endpoints.computeIfAbsent(id, i -> Endpoint.from("localhost", BASE_PORT + id)))
.build(); .build();


Collection<Node> bootstrapNodes = Stream.of(ids) Collection<Node> bootstrapNodes = Stream.of(ids)
.map(nodeId -> Node.builder() .map(nodeId -> Node.builder()
.withId(NodeId.from(String.valueOf(nodeId))) .withId(NodeId.from(String.valueOf(nodeId)))
.withEndpoint(endpoints.computeIfAbsent(nodeId, i -> Endpoint.from("localhost", findAvailablePort(5000)))) .withEndpoint(endpoints.computeIfAbsent(nodeId, i -> Endpoint.from("localhost", BASE_PORT + nodeId)))
.build()) .build())
.collect(Collectors.toList()); .collect(Collectors.toList());


return Atomix.builder() return new TestAtomix.Builder()
.withClusterName("test") .withClusterName("test")
.withDataDirectory(new File("target/test-logs/" + id)) .withDataDirectory(new File("target/test-logs/" + id))
.withLocalNode(localNode) .withLocalNode(localNode)
Expand All @@ -84,54 +100,48 @@ private static Atomix createAtomix(int id, Integer... ids) {
.build(); .build();
} }


/**
* Returns a new Atomix instance.
*
* @return a new Atomix instance.
*/
protected Atomix atomix() {
Atomix instance = createAtomix(id++, 1, 2, 3).open().join();
instances.add(instance);
return instance;
}

@AfterClass @AfterClass
public static void teardownAtomix() throws Exception { public static void teardownAtomix() throws Exception {
List<CompletableFuture<Void>> futures = instances.stream().map(Atomix::close).collect(Collectors.toList()); List<CompletableFuture<Void>> futures = instances.stream().map(Atomix::close).collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join(); CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
} }


/** /**
* Deletes data from the test data directory. * Atomix implementation used for testing.
*/ */
private static void deleteData() throws Exception { static class TestAtomix extends Atomix {
Path directory = Paths.get("target/test-logs/"); TestAtomix(ManagedClusterService cluster, ManagedMessagingService messagingService, ManagedClusterCommunicationService clusterCommunicator, ManagedClusterEventService clusterEventService, ManagedPartitionGroup corePartitionGroup, ManagedPartitionService partitions, PrimitiveTypeRegistry primitiveTypes) {
if (Files.exists(directory)) { super(cluster, messagingService, clusterCommunicator, clusterEventService, corePartitionGroup, partitions, primitiveTypes);
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
} }
}


private static int findAvailablePort(int defaultPort) { static class Builder extends Atomix.Builder {
try { @Override
ServerSocket socket = new ServerSocket(0); protected ManagedMessagingService buildMessagingService() {
socket.setReuseAddress(true); return messagingServiceFactory.newMessagingService(localNode.endpoint());
int port = socket.getLocalPort(); }
socket.close();
return port; @Override
} catch (IOException ex) { protected ManagedPartitionGroup buildCorePartitionGroup() {
return defaultPort; return RaftPartitionGroup.builder("core")
.withStorageLevel(StorageLevel.MEMORY)
.withNumPartitions(1)
.build();
}

@Override
protected ManagedPartitionService buildPartitionService() {
if (partitionGroups.isEmpty()) {
partitionGroups.add(RaftPartitionGroup.builder(COORDINATION_GROUP_NAME)
.withStorageLevel(StorageLevel.MEMORY)
.withNumPartitions(numCoordinationPartitions > 0 ? numCoordinationPartitions : bootstrapNodes.size())
.withPartitionSize(coordinationPartitionSize)
.build());
partitionGroups.add(PrimaryBackupPartitionGroup.builder(DATA_GROUP_NAME)
.withNumPartitions(numDataPartitions)
.build());
}
return new DefaultPartitionService(partitionGroups);
}
} }
} }
} }

0 comments on commit 034534f

Please sign in to comment.