Skip to content

Commit

Permalink
Remove generic type argument from various interfaces (#540)
Browse files Browse the repository at this point in the history
* Remove generic type argument from AtomixCluster.

* Remove generic type argument from PartitionGroup.
  • Loading branch information
kuujo committed May 4, 2018
1 parent ffd5cd2 commit 32334d6
Show file tree
Hide file tree
Showing 17 changed files with 58 additions and 62 deletions.
15 changes: 7 additions & 8 deletions cluster/src/main/java/io/atomix/cluster/AtomixCluster.java
Expand Up @@ -52,7 +52,7 @@
/**
* Cluster configuration.
*/
public class AtomixCluster<T extends AtomixCluster<T>> implements Managed<T> {
public class AtomixCluster implements Managed<Void> {

/**
* Returns a new Atomix cluster builder.
Expand Down Expand Up @@ -92,7 +92,7 @@ public static Builder builder(ClusterConfig config) {
protected final ManagedClusterMembershipService membershipService;
protected final ManagedClusterMessagingService clusterMessagingService;
protected final ManagedClusterEventingService clusterEventingService;
protected volatile CompletableFuture openFuture;
protected volatile CompletableFuture<Void> openFuture;
protected volatile CompletableFuture<Void> closeFuture;
private final ThreadContext threadContext = new SingleThreadContext("atomix-cluster-%d");
private final AtomicBoolean started = new AtomicBoolean();
Expand Down Expand Up @@ -144,7 +144,7 @@ public ClusterEventingService eventingService() {

@Override
@SuppressWarnings("unchecked")
public synchronized CompletableFuture<T> start() {
public synchronized CompletableFuture<Void> start() {
if (closeFuture != null) {
return Futures.exceptionalFuture(new IllegalStateException("AtomixCluster instance " +
(closeFuture.isDone() ? "shutdown" : "shutting down")));
Expand All @@ -156,8 +156,7 @@ public synchronized CompletableFuture<T> start() {

openFuture = startServices()
.thenComposeAsync(v -> joinCluster(), threadContext)
.thenComposeAsync(v -> completeStartup(), threadContext)
.thenApply(v -> this);
.thenComposeAsync(v -> completeStartup(), threadContext);

return openFuture;
}
Expand Down Expand Up @@ -337,7 +336,7 @@ protected static ManagedClusterEventingService buildClusterEventService(
/**
* Cluster builder.
*/
public static class Builder<T extends AtomixCluster<T>> implements io.atomix.utils.Builder<AtomixCluster<T>> {
public static class Builder implements io.atomix.utils.Builder<AtomixCluster> {
protected final ClusterConfig config;

protected Builder() {
Expand Down Expand Up @@ -434,8 +433,8 @@ public Builder withMulticastAddress(Address address) {
}

@Override
public AtomixCluster<T> build() {
return new AtomixCluster<>(config);
public AtomixCluster build() {
return new AtomixCluster(config);
}
}
}
Expand Up @@ -47,23 +47,23 @@ public void testMembers() throws Exception {
.withAddress("localhost:5002")
.build());

AtomixCluster<?> cluster1 = AtomixCluster.builder()
AtomixCluster cluster1 = AtomixCluster.builder()
.withLocalMember("foo")
.withMembers(members)
.build();
cluster1.start().join();

assertEquals("foo", cluster1.membershipService().getLocalMember().id().id());

AtomixCluster<?> cluster2 = AtomixCluster.builder()
AtomixCluster cluster2 = AtomixCluster.builder()
.withLocalMember("bar")
.withMembers(members)
.build();
cluster2.start().join();

assertEquals("bar", cluster2.membershipService().getLocalMember().id().id());

AtomixCluster<?> cluster3 = AtomixCluster.builder()
AtomixCluster cluster3 = AtomixCluster.builder()
.withLocalMember("baz")
.withMembers(members)
.build();
Expand All @@ -72,7 +72,7 @@ public void testMembers() throws Exception {
assertEquals("baz", cluster3.membershipService().getLocalMember().id().id());

List<CompletableFuture<Void>> futures = Stream.of(cluster1, cluster2, cluster3).map(AtomixCluster::stop)
.collect(Collectors.toList());
.collect(Collectors.toList());
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
} catch (Exception e) {
Expand Down
22 changes: 9 additions & 13 deletions core/src/main/java/io/atomix/core/Atomix.java
Expand Up @@ -48,7 +48,6 @@
import io.atomix.primitive.partition.PartitionGroups;
import io.atomix.primitive.partition.PartitionService;
import io.atomix.primitive.partition.impl.DefaultPartitionService;
import io.atomix.utils.Managed;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.SingleThreadContext;
import io.atomix.utils.concurrent.ThreadContext;
Expand All @@ -73,7 +72,7 @@
/**
* Atomix!
*/
public class Atomix extends AtomixCluster<Atomix> implements PrimitivesService, Managed<Atomix> {
public class Atomix extends AtomixCluster implements PrimitivesService {

/**
* Returns a new Atomix builder.
Expand Down Expand Up @@ -280,26 +279,23 @@ public <P extends DistributedPrimitive> P getPrimitive(String name) {
* @return a future to be completed once the instance has completed startup
*/
@Override
@SuppressWarnings("unchecked")
public synchronized CompletableFuture<Atomix> start() {
public synchronized CompletableFuture<Void> start() {
if (closeFuture != null) {
return Futures.exceptionalFuture(new IllegalStateException("Atomix instance " +
(closeFuture.isDone() ? "shutdown" : "shutting down")));
}

return super.start().thenApply(atomix -> {
return super.start().thenRun(() -> {
if (enableShutdownHook) {
if (shutdownHook == null) {
shutdownHook = new Thread(() -> super.stop().join());
Runtime.getRuntime().addShutdownHook(shutdownHook);
}
}
return atomix;
});
}

@Override
@SuppressWarnings("unchecked")
protected CompletableFuture<Void> startServices() {
return super.startServices()
.thenComposeAsync(v -> partitions.start(), threadContext)
Expand Down Expand Up @@ -377,7 +373,7 @@ private static ManagedPartitionService buildPartitionService(
ClusterMembershipService clusterMembershipService,
ClusterMessagingService messagingService,
PrimitiveTypeRegistry primitiveTypeRegistry) {
List<ManagedPartitionGroup<?>> partitionGroups = new ArrayList<>();
List<ManagedPartitionGroup> partitionGroups = new ArrayList<>();
for (PartitionGroupConfig partitionGroupConfig : config.getPartitionGroups().values()) {
partitionGroups.add(PartitionGroups.createGroup(partitionGroupConfig));
}
Expand All @@ -387,7 +383,7 @@ private static ManagedPartitionService buildPartitionService(
/**
* Atomix builder.
*/
public static class Builder extends AtomixCluster.Builder<Atomix> {
public static class Builder extends AtomixCluster.Builder {
private final AtomixConfig config;

private Builder() {
Expand Down Expand Up @@ -457,7 +453,7 @@ public Builder addProfile(Profile profile) {
* @param systemManagementGroup the system management partition group
* @return the Atomix builder
*/
public Builder withManagementGroup(ManagedPartitionGroup<?> systemManagementGroup) {
public Builder withManagementGroup(ManagedPartitionGroup systemManagementGroup) {
config.setManagementGroup(systemManagementGroup.config());
return this;
}
Expand All @@ -469,7 +465,7 @@ public Builder withManagementGroup(ManagedPartitionGroup<?> systemManagementGrou
* @return the Atomix builder
* @throws NullPointerException if the partition groups are null
*/
public Builder withPartitionGroups(ManagedPartitionGroup<?>... partitionGroups) {
public Builder withPartitionGroups(ManagedPartitionGroup... partitionGroups) {
return withPartitionGroups(Arrays.asList(checkNotNull(partitionGroups, "partitionGroups cannot be null")));
}

Expand All @@ -480,7 +476,7 @@ public Builder withPartitionGroups(ManagedPartitionGroup<?>... partitionGroups)
* @return the Atomix builder
* @throws NullPointerException if the partition groups are null
*/
public Builder withPartitionGroups(Collection<ManagedPartitionGroup<?>> partitionGroups) {
public Builder withPartitionGroups(Collection<ManagedPartitionGroup> partitionGroups) {
partitionGroups.forEach(group -> config.addPartitionGroup(group.config()));
return this;
}
Expand All @@ -492,7 +488,7 @@ public Builder withPartitionGroups(Collection<ManagedPartitionGroup<?>> partitio
* @return the Atomix builder
* @throws NullPointerException if the partition group is null
*/
public Builder addPartitionGroup(ManagedPartitionGroup<?> partitionGroup) {
public Builder addPartitionGroup(ManagedPartitionGroup partitionGroup) {
config.addPartitionGroup(partitionGroup.config());
return this;
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/java/io/atomix/core/AbstractPrimitiveTest.java
Expand Up @@ -51,7 +51,8 @@ public abstract class AbstractPrimitiveTest extends AbstractAtomixTest {
* @return a new Atomix instance.
*/
protected Atomix atomix() throws Exception {
Atomix instance = createAtomix(Member.Type.EPHEMERAL, id++, Arrays.asList(1, 2, 3), Arrays.asList()).start().get(10, TimeUnit.SECONDS);
Atomix instance = createAtomix(Member.Type.EPHEMERAL, id++, Arrays.asList(1, 2, 3), Arrays.asList());
instance.start().get(10, TimeUnit.SECONDS);
instances.add(instance);
return instance;
}
Expand All @@ -76,7 +77,7 @@ public static void setupAtomix() throws Exception {
instances.add(createAtomix(Member.Type.PERSISTENT, 1, Arrays.asList(1, 2, 3), Arrays.asList(), build));
instances.add(createAtomix(Member.Type.PERSISTENT, 2, Arrays.asList(1, 2, 3), Arrays.asList(), build));
instances.add(createAtomix(Member.Type.PERSISTENT, 3, Arrays.asList(1, 2, 3), Arrays.asList(), build));
List<CompletableFuture<Atomix>> futures = instances.stream().map(Atomix::start).collect(Collectors.toList());
List<CompletableFuture<Atomix>> futures = instances.stream().map(a -> a.start().thenApply(v -> a)).collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).get(30, TimeUnit.SECONDS);
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/io/atomix/core/AtomixTest.java
Expand Up @@ -96,7 +96,7 @@ protected CompletableFuture<Atomix> startAtomix(Member.Type type, int id, List<I
protected CompletableFuture<Atomix> startAtomix(Member.Type type, int id, List<Integer> persistentIds, List<Integer> ephemeralIds, Function<Atomix.Builder, Atomix> builderFunction) {
Atomix atomix = createAtomix(type, id, persistentIds, ephemeralIds, builderFunction);
instances.add(atomix);
return atomix.start();
return atomix.start().thenApply(v -> atomix);
}

/**
Expand All @@ -105,7 +105,7 @@ protected CompletableFuture<Atomix> startAtomix(Member.Type type, int id, List<I
protected CompletableFuture<Atomix> startAtomix(Member.Type type, int id, List<Integer> persistentIds, List<Integer> ephemeralIds, Map<String, String> metadata, Function<Atomix.Builder, Atomix> builderFunction) {
Atomix atomix = createAtomix(type, id, persistentIds, ephemeralIds, metadata, builderFunction);
instances.add(atomix);
return atomix.start();
return atomix.start().thenApply(v -> atomix);
}

/**
Expand Down
Expand Up @@ -20,23 +20,23 @@
/**
* Managed partition group.
*/
public interface ManagedPartitionGroup<P extends Partition> extends PartitionGroup<P> {
public interface ManagedPartitionGroup extends PartitionGroup {

/**
* Joins the partition group.
*
* @param managementService the partition management service
* @return a future to be completed once the partition group has been joined
*/
CompletableFuture<ManagedPartitionGroup<P>> join(PartitionManagementService managementService);
CompletableFuture<ManagedPartitionGroup> join(PartitionManagementService managementService);

/**
* Connects to the partition group.
*
* @param managementService the partition management service
* @return a future to be completed once the partition group has been connected
*/
CompletableFuture<ManagedPartitionGroup<P>> connect(PartitionManagementService managementService);
CompletableFuture<ManagedPartitionGroup> connect(PartitionManagementService managementService);

/**
* Closes the partition group.
Expand Down
Expand Up @@ -26,7 +26,7 @@
/**
* Primitive partition group.
*/
public interface PartitionGroup<P extends Partition> extends Configured<PartitionGroupConfig> {
public interface PartitionGroup extends Configured<PartitionGroupConfig> {

/**
* Primitive protocol type.
Expand Down Expand Up @@ -75,15 +75,15 @@ interface Type {
* @return the partition or {@code null} if no partition with the given identifier exists
* @throws NullPointerException if the partition identifier is {@code null}
*/
P getPartition(PartitionId partitionId);
Partition getPartition(PartitionId partitionId);

/**
* Returns the partition for the given key.
*
* @param key the key for which to return the partition
* @return the partition for the given key
*/
default P getPartition(String key) {
default Partition getPartition(String key) {
int hashCode = Hashing.sha256().hashString(key, StandardCharsets.UTF_8).asInt();
return getPartition(getPartitionIds().get(Math.abs(hashCode) % getPartitionIds().size()));
}
Expand All @@ -93,7 +93,7 @@ default P getPartition(String key) {
*
* @return a collection of all partitions
*/
Collection<P> getPartitions();
Collection<Partition> getPartitions();

/**
* Returns a sorted list of partition IDs.
Expand Down
Expand Up @@ -29,15 +29,15 @@ public interface PartitionService {
*
* @return the system partition group
*/
<P extends Partition> PartitionGroup<P> getSystemPartitionGroup();
PartitionGroup getSystemPartitionGroup();

/**
* Returns a partition group by name.
*
* @param name the name of the partition group
* @return the partition group
*/
<P extends Partition> PartitionGroup<P> getPartitionGroup(String name);
PartitionGroup getPartitionGroup(String name);

/**
* Returns the first partition group that matches the given primitive type.
Expand All @@ -46,7 +46,7 @@ public interface PartitionService {
* @return the first partition group that matches the given primitive type
*/
@SuppressWarnings("unchecked")
default <P extends Partition> PartitionGroup<P> getPartitionGroup(PrimitiveProtocol.Type type) {
default PartitionGroup getPartitionGroup(PrimitiveProtocol.Type type) {
return getPartitionGroups().stream()
.filter(group -> group.protocol().equals(type))
.findFirst()
Expand All @@ -60,9 +60,9 @@ default <P extends Partition> PartitionGroup<P> getPartitionGroup(PrimitiveProto
* @return the first partition group that matches the given primitive protocol
*/
@SuppressWarnings("unchecked")
default <P extends Partition> PartitionGroup<P> getPartitionGroup(PrimitiveProtocol protocol) {
default PartitionGroup getPartitionGroup(PrimitiveProtocol protocol) {
if (protocol.group() != null) {
PartitionGroup<P> group = getPartitionGroup(protocol.group());
PartitionGroup group = getPartitionGroup(protocol.group());
if (group != null) {
return group;
}
Expand Down
Expand Up @@ -79,8 +79,8 @@ public class DefaultPartitionGroupMembershipService
public DefaultPartitionGroupMembershipService(
ClusterMembershipService membershipService,
ClusterMessagingService messagingService,
ManagedPartitionGroup<?> systemGroup,
Collection<ManagedPartitionGroup<?>> groups) {
ManagedPartitionGroup systemGroup,
Collection<ManagedPartitionGroup> groups) {
this.membershipService = membershipService;
this.messagingService = messagingService;
this.systemGroup = systemGroup != null
Expand Down
Expand Up @@ -58,7 +58,7 @@ public class DefaultPartitionService implements ManagedPartitionService {
private final ManagedPartitionGroupMembershipService groupMembershipService;
private ManagedPartitionGroup systemGroup;
private volatile PartitionManagementService partitionManagementService;
private final Map<String, ManagedPartitionGroup<?>> groups = Maps.newConcurrentMap();
private final Map<String, ManagedPartitionGroup> groups = Maps.newConcurrentMap();
private final PartitionGroupMembershipEventListener groupMembershipEventListener = this::handleMembershipChange;
private final AtomicBoolean started = new AtomicBoolean();

Expand All @@ -67,8 +67,8 @@ public DefaultPartitionService(
ClusterMembershipService membershipService,
ClusterMessagingService messagingService,
PrimitiveTypeRegistry primitiveTypeRegistry,
ManagedPartitionGroup<?> systemGroup,
Collection<ManagedPartitionGroup<?>> groups) {
ManagedPartitionGroup systemGroup,
Collection<ManagedPartitionGroup> groups) {
this.clusterMembershipService = membershipService;
this.clusterMessagingService = messagingService;
this.primitiveTypeRegistry = primitiveTypeRegistry;
Expand Down
Expand Up @@ -54,7 +54,7 @@ public class DefaultPrimaryElectionService implements ManagedPrimaryElectionServ
.register(PrimaryElectorEvents.NAMESPACE)
.build());

private final PartitionGroup<?> partitions;
private final PartitionGroup partitions;
private final Set<PrimaryElectionEventListener> listeners = Sets.newCopyOnWriteArraySet();
private final Consumer<PrimitiveEvent> eventListener = event -> {
PrimaryElectionEvent electionEvent = SERIALIZER.decode(event.value());
Expand Down
Expand Up @@ -39,7 +39,7 @@ public class ReplicatedSessionIdService implements ManagedSessionIdService {
.build());
private static final String PRIMITIVE_NAME = "session-id";

private final PartitionGroup<?> systemPartitionGroup;
private final PartitionGroup systemPartitionGroup;
private PartitionProxy proxy;
private final AtomicBoolean started = new AtomicBoolean();

Expand Down
Expand Up @@ -91,10 +91,10 @@ public String group() {

@Override
public PrimitiveProxy newProxy(String primitiveName, PrimitiveType primitiveType, ServiceConfig serviceConfig, PartitionService partitionService) {
Collection<PartitionProxy> partitions = partitionService.<PrimaryBackupPartition>getPartitionGroup(this)
Collection<PartitionProxy> partitions = partitionService.getPartitionGroup(this)
.getPartitions()
.stream()
.map(partition -> partition.getProxyClient().proxyBuilder(primitiveName, primitiveType, serviceConfig)
.map(partition -> ((PrimaryBackupClient) partition.getProxyClient()).proxyBuilder(primitiveName, primitiveType, serviceConfig)
.withConsistency(config.getConsistency())
.withReplication(config.getReplication())
.withRecovery(config.getRecovery())
Expand Down

0 comments on commit 32334d6

Please sign in to comment.