Skip to content

Commit

Permalink
Rename CoreMetadataService to PersistentMetadataService.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 26, 2018
1 parent 6cf9671 commit 003173b
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 52 deletions.
Expand Up @@ -18,7 +18,7 @@
/** /**
* Managed core metadata service. * Managed core metadata service.
*/ */
public interface ManagedCoreMetadataService extends CoreMetadataService, ManagedClusterMetadataService { public interface ManagedPersistentMetadataService extends PersistentMetadataService, ManagedClusterMetadataService {


/** /**
* Adds the given node to the cluster metadata. * Adds the given node to the cluster metadata.
Expand Down
Expand Up @@ -18,5 +18,5 @@
/** /**
* Core metadata service. * Core metadata service.
*/ */
public interface CoreMetadataService extends ClusterMetadataService { public interface PersistentMetadataService extends ClusterMetadataService {
} }
Expand Up @@ -24,7 +24,7 @@
import io.atomix.cluster.ClusterMetadataEvent; import io.atomix.cluster.ClusterMetadataEvent;
import io.atomix.cluster.ClusterMetadataEventListener; import io.atomix.cluster.ClusterMetadataEventListener;
import io.atomix.cluster.ClusterService; import io.atomix.cluster.ClusterService;
import io.atomix.cluster.CoreMetadataService; import io.atomix.cluster.PersistentMetadataService;
import io.atomix.cluster.ManagedClusterService; import io.atomix.cluster.ManagedClusterService;
import io.atomix.cluster.Node; import io.atomix.cluster.Node;
import io.atomix.cluster.Node.State; import io.atomix.cluster.Node.State;
Expand Down Expand Up @@ -85,13 +85,13 @@ public class DefaultClusterService
.register(Node.State.class) .register(Node.State.class)
.register(ClusterHeartbeat.class) .register(ClusterHeartbeat.class)
.register(StatefulNode.class) .register(StatefulNode.class)
.register(new DefaultCoreMetadataService.AddressSerializer(), Address.class) .register(new DefaultPersistentMetadataService.AddressSerializer(), Address.class)
.build("ClusterService")); .build("ClusterService"));


private final MessagingService messagingService; private final MessagingService messagingService;
private final BroadcastService broadcastService; private final BroadcastService broadcastService;
private final BootstrapMetadataService bootstrapMetadataService; private final BootstrapMetadataService bootstrapMetadataService;
private final CoreMetadataService coreMetadataService; private final PersistentMetadataService persistentMetadataService;
private final AtomicBoolean started = new AtomicBoolean(); private final AtomicBoolean started = new AtomicBoolean();
private final StatefulNode localNode; private final StatefulNode localNode;
private final Map<NodeId, StatefulNode> nodes = Maps.newConcurrentMap(); private final Map<NodeId, StatefulNode> nodes = Maps.newConcurrentMap();
Expand All @@ -108,11 +108,11 @@ public class DefaultClusterService
public DefaultClusterService( public DefaultClusterService(
Node localNode, Node localNode,
BootstrapMetadataService bootstrapMetadataService, BootstrapMetadataService bootstrapMetadataService,
CoreMetadataService coreMetadataService, PersistentMetadataService persistentMetadataService,
MessagingService messagingService, MessagingService messagingService,
BroadcastService broadcastService) { BroadcastService broadcastService) {
this.bootstrapMetadataService = checkNotNull(bootstrapMetadataService, "bootstrapMetadataService cannot be null"); this.bootstrapMetadataService = checkNotNull(bootstrapMetadataService, "bootstrapMetadataService cannot be null");
this.coreMetadataService = checkNotNull(coreMetadataService, "coreMetadataService cannot be null"); this.persistentMetadataService = checkNotNull(persistentMetadataService, "coreMetadataService cannot be null");
this.messagingService = checkNotNull(messagingService, "messagingService cannot be null"); this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
this.broadcastService = checkNotNull(broadcastService, "broadcastService cannot be null"); this.broadcastService = checkNotNull(broadcastService, "broadcastService cannot be null");
this.localNode = new StatefulNode( this.localNode = new StatefulNode(
Expand Down Expand Up @@ -349,12 +349,12 @@ private void handleMetadataEvent(ClusterMetadataEvent event) {
@Override @Override
public CompletableFuture<ClusterService> start() { public CompletableFuture<ClusterService> start() {
if (started.compareAndSet(false, true)) { if (started.compareAndSet(false, true)) {
coreMetadataService.addListener(metadataEventListener); persistentMetadataService.addListener(metadataEventListener);
broadcastService.addListener(broadcastListener); broadcastService.addListener(broadcastListener);
LOGGER.info("{} - Node activated: {}", localNode.id(), localNode); LOGGER.info("{} - Node activated: {}", localNode.id(), localNode);
localNode.setState(State.ACTIVE); localNode.setState(State.ACTIVE);
nodes.put(localNode.id(), localNode); nodes.put(localNode.id(), localNode);
coreMetadataService.getMetadata().nodes() persistentMetadataService.getMetadata().nodes()
.forEach(node -> nodes.putIfAbsent(node.id(), new StatefulNode( .forEach(node -> nodes.putIfAbsent(node.id(), new StatefulNode(
node.id(), node.id(),
node.type(), node.type(),
Expand Down Expand Up @@ -398,7 +398,7 @@ public CompletableFuture<Void> stop() {
nodes.clear(); nodes.clear();
heartbeatFuture.cancel(true); heartbeatFuture.cancel(true);
messagingService.unregisterHandler(HEARTBEAT_MESSAGE); messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
coreMetadataService.removeListener(metadataEventListener); persistentMetadataService.removeListener(metadataEventListener);
LOGGER.info("Stopped"); LOGGER.info("Stopped");
} }
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
Expand Down
Expand Up @@ -25,7 +25,7 @@
import io.atomix.cluster.ClusterMetadataEvent; import io.atomix.cluster.ClusterMetadataEvent;
import io.atomix.cluster.ClusterMetadataEventListener; import io.atomix.cluster.ClusterMetadataEventListener;
import io.atomix.cluster.ClusterMetadataService; import io.atomix.cluster.ClusterMetadataService;
import io.atomix.cluster.ManagedCoreMetadataService; import io.atomix.cluster.ManagedPersistentMetadataService;
import io.atomix.cluster.Node; import io.atomix.cluster.Node;
import io.atomix.cluster.NodeId; import io.atomix.cluster.NodeId;
import io.atomix.messaging.MessagingService; import io.atomix.messaging.MessagingService;
Expand Down Expand Up @@ -61,9 +61,9 @@
/** /**
* Default cluster metadata service. * Default cluster metadata service.
*/ */
public class DefaultCoreMetadataService public class DefaultPersistentMetadataService
extends AbstractListenerManager<ClusterMetadataEvent, ClusterMetadataEventListener> extends AbstractListenerManager<ClusterMetadataEvent, ClusterMetadataEventListener>
implements ManagedCoreMetadataService { implements ManagedPersistentMetadataService {


private static final String BOOTSTRAP_MESSAGE = "atomix-cluster-metadata-bootstrap"; private static final String BOOTSTRAP_MESSAGE = "atomix-cluster-metadata-bootstrap";
private static final String UPDATE_MESSAGE = "atomix-cluster-metadata-update"; private static final String UPDATE_MESSAGE = "atomix-cluster-metadata-update";
Expand Down Expand Up @@ -96,7 +96,7 @@ public class DefaultCoreMetadataService
namedThreads("atomix-cluster-metadata-receiver", log)); namedThreads("atomix-cluster-metadata-receiver", log));
private ScheduledFuture<?> metadataFuture; private ScheduledFuture<?> metadataFuture;


public DefaultCoreMetadataService(ClusterMetadata metadata, MessagingService messagingService) { public DefaultPersistentMetadataService(ClusterMetadata metadata, MessagingService messagingService) {
metadata.nodes().forEach(node -> nodes.put(node.id(), new ReplicatedNode( metadata.nodes().forEach(node -> nodes.put(node.id(), new ReplicatedNode(
node.id(), node.id(),
node.type(), node.type(),
Expand Down
Expand Up @@ -67,23 +67,23 @@ public void testClusterService() throws Exception {
ManagedClusterService clusterService1 = new DefaultClusterService( ManagedClusterService clusterService1 = new DefaultClusterService(
localNode1, localNode1,
new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())), new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())),
new TestCoreMetadataService(clusterMetadata), new TestPersistentMetadataService(clusterMetadata),
messagingServiceFactory.newMessagingService(localNode1.address()).start().join(), messagingServiceFactory.newMessagingService(localNode1.address()).start().join(),
broadcastServiceFactory.newBroadcastService().start().join()); broadcastServiceFactory.newBroadcastService().start().join());


Node localNode2 = buildNode(2, Node.Type.PERSISTENT); Node localNode2 = buildNode(2, Node.Type.PERSISTENT);
ManagedClusterService clusterService2 = new DefaultClusterService( ManagedClusterService clusterService2 = new DefaultClusterService(
localNode2, localNode2,
new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())), new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())),
new TestCoreMetadataService(clusterMetadata), new TestPersistentMetadataService(clusterMetadata),
messagingServiceFactory.newMessagingService(localNode2.address()).start().join(), messagingServiceFactory.newMessagingService(localNode2.address()).start().join(),
broadcastServiceFactory.newBroadcastService().start().join()); broadcastServiceFactory.newBroadcastService().start().join());


Node localNode3 = buildNode(3, Node.Type.PERSISTENT); Node localNode3 = buildNode(3, Node.Type.PERSISTENT);
ManagedClusterService clusterService3 = new DefaultClusterService( ManagedClusterService clusterService3 = new DefaultClusterService(
localNode3, localNode3,
new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())), new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())),
new TestCoreMetadataService(clusterMetadata), new TestPersistentMetadataService(clusterMetadata),
messagingServiceFactory.newMessagingService(localNode3.address()).start().join(), messagingServiceFactory.newMessagingService(localNode3.address()).start().join(),
broadcastServiceFactory.newBroadcastService().start().join()); broadcastServiceFactory.newBroadcastService().start().join());


Expand Down Expand Up @@ -119,7 +119,7 @@ public void testClusterService() throws Exception {
ManagedClusterService dataClusterService = new DefaultClusterService( ManagedClusterService dataClusterService = new DefaultClusterService(
dataNode, dataNode,
new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())), new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())),
new TestCoreMetadataService(clusterMetadata), new TestPersistentMetadataService(clusterMetadata),
messagingServiceFactory.newMessagingService(dataNode.address()).start().join(), messagingServiceFactory.newMessagingService(dataNode.address()).start().join(),
broadcastServiceFactory.newBroadcastService().start().join()); broadcastServiceFactory.newBroadcastService().start().join());


Expand All @@ -145,7 +145,7 @@ public void testClusterService() throws Exception {
ManagedClusterService clientClusterService = new DefaultClusterService( ManagedClusterService clientClusterService = new DefaultClusterService(
clientNode, clientNode,
new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())), new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())),
new TestCoreMetadataService(clusterMetadata), new TestPersistentMetadataService(clusterMetadata),
messagingServiceFactory.newMessagingService(clientNode.address()).start().join(), messagingServiceFactory.newMessagingService(clientNode.address()).start().join(),
broadcastServiceFactory.newBroadcastService().start().join()); broadcastServiceFactory.newBroadcastService().start().join());


Expand Down
Expand Up @@ -19,7 +19,7 @@
import io.atomix.cluster.ClusterMetadataEvent; import io.atomix.cluster.ClusterMetadataEvent;
import io.atomix.cluster.ClusterMetadataEventListener; import io.atomix.cluster.ClusterMetadataEventListener;
import io.atomix.cluster.ClusterMetadataService; import io.atomix.cluster.ClusterMetadataService;
import io.atomix.cluster.ManagedCoreMetadataService; import io.atomix.cluster.ManagedPersistentMetadataService;
import io.atomix.cluster.Node; import io.atomix.cluster.Node;
import io.atomix.cluster.messaging.impl.TestMessagingServiceFactory; import io.atomix.cluster.messaging.impl.TestMessagingServiceFactory;
import io.atomix.utils.concurrent.Futures; import io.atomix.utils.concurrent.Futures;
Expand All @@ -36,23 +36,23 @@
/** /**
* Default cluster metadata service test. * Default cluster metadata service test.
*/ */
public class DefaultCoreMetadataServiceTest { public class DefaultPersistentMetadataServiceTest {
@Test @Test
public void testSingleNodeBootstrap() throws Exception { public void testSingleNodeBootstrap() throws Exception {
TestMessagingServiceFactory messagingServiceFactory = new TestMessagingServiceFactory(); TestMessagingServiceFactory messagingServiceFactory = new TestMessagingServiceFactory();


ClusterMetadata clusterMetadata = buildClusterMetadata(1); ClusterMetadata clusterMetadata = buildClusterMetadata(1);


Node localNode1 = buildNode(1, Node.Type.PERSISTENT); Node localNode1 = buildNode(1, Node.Type.PERSISTENT);
ManagedCoreMetadataService metadataService1 = new DefaultCoreMetadataService( ManagedPersistentMetadataService metadataService1 = new DefaultPersistentMetadataService(
clusterMetadata, messagingServiceFactory.newMessagingService(localNode1.address()).start().join()); clusterMetadata, messagingServiceFactory.newMessagingService(localNode1.address()).start().join());


metadataService1.start().join(); metadataService1.start().join();


assertEquals(1, metadataService1.getMetadata().nodes().size()); assertEquals(1, metadataService1.getMetadata().nodes().size());


Node localNode2 = buildNode(2, Node.Type.PERSISTENT); Node localNode2 = buildNode(2, Node.Type.PERSISTENT);
ManagedCoreMetadataService metadataService2 = new DefaultCoreMetadataService( ManagedPersistentMetadataService metadataService2 = new DefaultPersistentMetadataService(
clusterMetadata, messagingServiceFactory.newMessagingService(localNode2.address()).start().join()); clusterMetadata, messagingServiceFactory.newMessagingService(localNode2.address()).start().join());
metadataService2.start().join(); metadataService2.start().join();
metadataService2.addNode(localNode2); metadataService2.addNode(localNode2);
Expand All @@ -67,15 +67,15 @@ public void testClusterMetadataService() throws Exception {
ClusterMetadata clusterMetadata = buildClusterMetadata(1, 2, 3); ClusterMetadata clusterMetadata = buildClusterMetadata(1, 2, 3);


Node localNode1 = buildNode(1, Node.Type.PERSISTENT); Node localNode1 = buildNode(1, Node.Type.PERSISTENT);
ManagedCoreMetadataService metadataService1 = new DefaultCoreMetadataService( ManagedPersistentMetadataService metadataService1 = new DefaultPersistentMetadataService(
clusterMetadata, messagingServiceFactory.newMessagingService(localNode1.address()).start().join()); clusterMetadata, messagingServiceFactory.newMessagingService(localNode1.address()).start().join());


Node localNode2 = buildNode(2, Node.Type.PERSISTENT); Node localNode2 = buildNode(2, Node.Type.PERSISTENT);
ManagedCoreMetadataService metadataService2 = new DefaultCoreMetadataService( ManagedPersistentMetadataService metadataService2 = new DefaultPersistentMetadataService(
clusterMetadata, messagingServiceFactory.newMessagingService(localNode2.address()).start().join()); clusterMetadata, messagingServiceFactory.newMessagingService(localNode2.address()).start().join());


Node localNode3 = buildNode(3, Node.Type.PERSISTENT); Node localNode3 = buildNode(3, Node.Type.PERSISTENT);
ManagedCoreMetadataService metadataService3 = new DefaultCoreMetadataService( ManagedPersistentMetadataService metadataService3 = new DefaultPersistentMetadataService(
clusterMetadata, messagingServiceFactory.newMessagingService(localNode3.address()).start().join()); clusterMetadata, messagingServiceFactory.newMessagingService(localNode3.address()).start().join());


List<CompletableFuture<ClusterMetadataService>> futures = new ArrayList<>(); List<CompletableFuture<ClusterMetadataService>> futures = new ArrayList<>();
Expand All @@ -89,7 +89,7 @@ public void testClusterMetadataService() throws Exception {
assertEquals(3, metadataService3.getMetadata().nodes().size()); assertEquals(3, metadataService3.getMetadata().nodes().size());


Node localNode4 = buildNode(4, Node.Type.PERSISTENT); Node localNode4 = buildNode(4, Node.Type.PERSISTENT);
ManagedCoreMetadataService metadataService4 = new DefaultCoreMetadataService( ManagedPersistentMetadataService metadataService4 = new DefaultPersistentMetadataService(
clusterMetadata, messagingServiceFactory.newMessagingService(localNode4.address()).start().join()); clusterMetadata, messagingServiceFactory.newMessagingService(localNode4.address()).start().join());
metadataService4.start().join(); metadataService4.start().join();


Expand Down Expand Up @@ -119,7 +119,7 @@ public void testClusterMetadataService() throws Exception {
assertEquals(4, metadataService3.getMetadata().nodes().size()); assertEquals(4, metadataService3.getMetadata().nodes().size());


Node localNode5 = buildNode(5, Node.Type.PERSISTENT); Node localNode5 = buildNode(5, Node.Type.PERSISTENT);
ManagedCoreMetadataService metadataService5 = new DefaultCoreMetadataService( ManagedPersistentMetadataService metadataService5 = new DefaultPersistentMetadataService(
clusterMetadata, messagingServiceFactory.newMessagingService(localNode5.address()).start().join()); clusterMetadata, messagingServiceFactory.newMessagingService(localNode5.address()).start().join());
metadataService5.start().join(); metadataService5.start().join();
assertEquals(4, metadataService5.getMetadata().nodes().size()); assertEquals(4, metadataService5.getMetadata().nodes().size());
Expand Down
Expand Up @@ -17,15 +17,15 @@


import io.atomix.cluster.ClusterMetadata; import io.atomix.cluster.ClusterMetadata;
import io.atomix.cluster.ClusterMetadataEventListener; import io.atomix.cluster.ClusterMetadataEventListener;
import io.atomix.cluster.CoreMetadataService; import io.atomix.cluster.PersistentMetadataService;


/** /**
* Test cluster metadata service. * Test cluster metadata service.
*/ */
public class TestCoreMetadataService implements CoreMetadataService { public class TestPersistentMetadataService implements PersistentMetadataService {
private final ClusterMetadata metadata; private final ClusterMetadata metadata;


public TestCoreMetadataService(ClusterMetadata metadata) { public TestPersistentMetadataService(ClusterMetadata metadata) {
this.metadata = metadata; this.metadata = metadata;
} }


Expand Down
Expand Up @@ -21,7 +21,7 @@
import io.atomix.cluster.Node; import io.atomix.cluster.Node;
import io.atomix.cluster.impl.DefaultBootstrapMetadataService; import io.atomix.cluster.impl.DefaultBootstrapMetadataService;
import io.atomix.cluster.impl.DefaultClusterService; import io.atomix.cluster.impl.DefaultClusterService;
import io.atomix.cluster.impl.TestCoreMetadataService; import io.atomix.cluster.impl.TestPersistentMetadataService;
import io.atomix.cluster.messaging.ClusterEventingService; import io.atomix.cluster.messaging.ClusterEventingService;
import io.atomix.messaging.MessagingService; import io.atomix.messaging.MessagingService;
import io.atomix.utils.serializer.KryoNamespaces; import io.atomix.utils.serializer.KryoNamespaces;
Expand Down Expand Up @@ -73,7 +73,7 @@ public void testClusterEventService() throws Exception {
ClusterService clusterService1 = new DefaultClusterService( ClusterService clusterService1 = new DefaultClusterService(
localNode1, localNode1,
new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())), new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())),
new TestCoreMetadataService(clusterMetadata), new TestPersistentMetadataService(clusterMetadata),
messagingService1, messagingService1,
broadcastServiceFactory.newBroadcastService().start().join()) broadcastServiceFactory.newBroadcastService().start().join())
.start() .start()
Expand All @@ -85,7 +85,7 @@ public void testClusterEventService() throws Exception {
ClusterService clusterService2 = new DefaultClusterService( ClusterService clusterService2 = new DefaultClusterService(
localNode2, localNode2,
new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())), new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())),
new TestCoreMetadataService(clusterMetadata), new TestPersistentMetadataService(clusterMetadata),
messagingService2, messagingService2,
broadcastServiceFactory.newBroadcastService().start().join()) broadcastServiceFactory.newBroadcastService().start().join())
.start() .start()
Expand All @@ -97,7 +97,7 @@ public void testClusterEventService() throws Exception {
ClusterService clusterService3 = new DefaultClusterService( ClusterService clusterService3 = new DefaultClusterService(
localNode3, localNode3,
new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())), new DefaultBootstrapMetadataService(new ClusterMetadata(Collections.emptyList())),
new TestCoreMetadataService(clusterMetadata), new TestPersistentMetadataService(clusterMetadata),
messagingService3, messagingService3,
broadcastServiceFactory.newBroadcastService().start().join()) broadcastServiceFactory.newBroadcastService().start().join())
.start() .start()
Expand Down

0 comments on commit 003173b

Please sign in to comment.