Skip to content

Commit

Permalink
Refactor DefaultResourcePartitionContext to open resource partitions …
Browse files Browse the repository at this point in the history
…via the DefaultClusterCoordinator.
  • Loading branch information
kuujo committed Jan 2, 2015
1 parent cc2575a commit a27db80
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 44 deletions.
3 changes: 1 addition & 2 deletions core/src/main/java/net/kuujo/copycat/cluster/Cluster.java
Expand Up @@ -14,7 +14,6 @@
*/ */
package net.kuujo.copycat.cluster; package net.kuujo.copycat.cluster;


import net.kuujo.copycat.Managed;
import net.kuujo.copycat.election.Election; import net.kuujo.copycat.election.Election;


import java.util.Collection; import java.util.Collection;
Expand All @@ -27,7 +26,7 @@
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public interface Cluster extends Managed<Cluster> { public interface Cluster {


/** /**
* Returns the current cluster leader. * Returns the current cluster leader.
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/java/net/kuujo/copycat/cluster/ManagedCluster.java
@@ -0,0 +1,26 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.cluster;

import net.kuujo.copycat.Managed;

/**
* Managed cluster.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public interface ManagedCluster extends Cluster, Managed<Cluster> {
}
Expand Up @@ -19,6 +19,7 @@
import net.kuujo.copycat.ResourcePartitionContext; import net.kuujo.copycat.ResourcePartitionContext;
import net.kuujo.copycat.cluster.Cluster; import net.kuujo.copycat.cluster.Cluster;
import net.kuujo.copycat.cluster.coordinator.CoordinatedResourcePartitionConfig; import net.kuujo.copycat.cluster.coordinator.CoordinatedResourcePartitionConfig;
import net.kuujo.copycat.internal.cluster.coordinator.DefaultClusterCoordinator;
import net.kuujo.copycat.internal.util.Assert; import net.kuujo.copycat.internal.util.Assert;
import net.kuujo.copycat.internal.util.concurrent.Futures; import net.kuujo.copycat.internal.util.concurrent.Futures;
import net.kuujo.copycat.log.LogManager; import net.kuujo.copycat.log.LogManager;
Expand All @@ -43,15 +44,17 @@ public class DefaultResourcePartitionContext implements ResourcePartitionContext
private final CoordinatedResourcePartitionConfig config; private final CoordinatedResourcePartitionConfig config;
private final Cluster cluster; private final Cluster cluster;
private final CopycatStateContext context; private final CopycatStateContext context;
private final DefaultClusterCoordinator coordinator;
private final AtomicInteger counter = new AtomicInteger(); private final AtomicInteger counter = new AtomicInteger();
private boolean open; private boolean open;
private boolean deleted; private boolean deleted;


public DefaultResourcePartitionContext(String name, CoordinatedResourcePartitionConfig config, Cluster cluster, CopycatStateContext context) { public DefaultResourcePartitionContext(String name, CoordinatedResourcePartitionConfig config, Cluster cluster, CopycatStateContext context, DefaultClusterCoordinator coordinator) {
this.name = Assert.isNotNull(name, "name"); this.name = Assert.isNotNull(name, "name");
this.config = config; this.config = config;
this.cluster = Assert.isNotNull(cluster, "cluster"); this.cluster = Assert.isNotNull(cluster, "cluster");
this.context = Assert.isNotNull(context, "context"); this.context = Assert.isNotNull(context, "context");
this.coordinator = Assert.isNotNull(coordinator, "coordinator");
} }


@Override @Override
Expand Down Expand Up @@ -158,23 +161,10 @@ public synchronized CompletableFuture<ByteBuffer> commit(ByteBuffer entry) {


@Override @Override
public synchronized CompletableFuture<ResourcePartitionContext> open() { public synchronized CompletableFuture<ResourcePartitionContext> open() {
CompletableFuture<ResourcePartitionContext> future = new CompletableFuture<>(); return coordinator.acquirePartition(name, config.getPartition())
context.executor().execute(() -> { .thenRun(() -> {
if (counter.incrementAndGet() == 1) { open = true;
CompletableFuture.allOf(cluster.open(), context.open()).whenComplete((result, error) -> { }).thenApply(v -> this);
if (error == null) {
open = true;
future.complete(null);
} else {
counter.decrementAndGet();
future.completeExceptionally(error);
}
});
} else {
future.complete(null);
}
});
return future;
} }


@Override @Override
Expand All @@ -184,22 +174,10 @@ public boolean isOpen() {


@Override @Override
public synchronized CompletableFuture<Void> close() { public synchronized CompletableFuture<Void> close() {
CompletableFuture<Void> future = new CompletableFuture<>(); return coordinator.releasePartition(name, config.getPartition())
context.executor().execute(() -> { .thenRun(() -> {
open = false; open = false;
if (counter.decrementAndGet() == 0) { });
CompletableFuture.allOf(cluster.close(), context.close()).whenComplete((result, error) -> {
if (error == null) {
future.complete(null);
} else {
future.completeExceptionally(error);
}
});
} else {
future.complete(null);
}
});
return future;
} }


@Override @Override
Expand Down
Expand Up @@ -17,6 +17,7 @@


import net.kuujo.copycat.cluster.Cluster; import net.kuujo.copycat.cluster.Cluster;
import net.kuujo.copycat.cluster.LocalMember; import net.kuujo.copycat.cluster.LocalMember;
import net.kuujo.copycat.cluster.ManagedCluster;
import net.kuujo.copycat.cluster.Member; import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.cluster.coordinator.ClusterCoordinator; import net.kuujo.copycat.cluster.coordinator.ClusterCoordinator;
import net.kuujo.copycat.cluster.coordinator.MemberCoordinator; import net.kuujo.copycat.cluster.coordinator.MemberCoordinator;
Expand All @@ -35,7 +36,7 @@
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class CoordinatedCluster implements Cluster { public class CoordinatedCluster implements ManagedCluster {
private final int id; private final int id;
private CoordinatedLocalMember localMember; private CoordinatedLocalMember localMember;
private final Map<String, CoordinatedMember> members = new HashMap<>(); private final Map<String, CoordinatedMember> members = new HashMap<>();
Expand Down
Expand Up @@ -19,6 +19,7 @@
import net.kuujo.copycat.Resource; import net.kuujo.copycat.Resource;
import net.kuujo.copycat.ResourcePartitionContext; import net.kuujo.copycat.ResourcePartitionContext;
import net.kuujo.copycat.cluster.Cluster; import net.kuujo.copycat.cluster.Cluster;
import net.kuujo.copycat.cluster.ManagedCluster;
import net.kuujo.copycat.cluster.Member; import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.cluster.coordinator.*; import net.kuujo.copycat.cluster.coordinator.*;
import net.kuujo.copycat.internal.CopycatStateContext; import net.kuujo.copycat.internal.CopycatStateContext;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class DefaultClusterCoordinator extends Observable implements ClusterCoor
private final DefaultLocalMemberCoordinator localMember; private final DefaultLocalMemberCoordinator localMember;
private final Map<String, AbstractMemberCoordinator> members = new ConcurrentHashMap<>(); private final Map<String, AbstractMemberCoordinator> members = new ConcurrentHashMap<>();
private final CopycatStateContext context; private final CopycatStateContext context;
private final Cluster cluster; private final ManagedCluster cluster;
private final Map<String, ResourceHolder> resources = new ConcurrentHashMap<>(); private final Map<String, ResourceHolder> resources = new ConcurrentHashMap<>();
private final AtomicBoolean open = new AtomicBoolean(); private final AtomicBoolean open = new AtomicBoolean();


Expand Down Expand Up @@ -121,7 +122,7 @@ public <T extends Resource<T>> T getResource(String name) {
* @param partition The partition number. * @param partition The partition number.
* @return A completable future to be completed once the partition has been acquired. * @return A completable future to be completed once the partition has been acquired.
*/ */
CompletableFuture<Void> acquirePartition(String name, int partition) { public CompletableFuture<Void> acquirePartition(String name, int partition) {
Assert.state(isOpen(), "coordinator not open"); Assert.state(isOpen(), "coordinator not open");
ResourceHolder resource = resources.get(name); ResourceHolder resource = resources.get(name);
if (resource != null) { if (resource != null) {
Expand Down Expand Up @@ -157,7 +158,7 @@ CompletableFuture<Void> acquirePartition(String name, int partition) {
* @param partition The partition number. * @param partition The partition number.
* @return A completable future to be completed once the partition has been released. * @return A completable future to be completed once the partition has been released.
*/ */
CompletableFuture<Void> releasePartition(String name, int partition) { public CompletableFuture<Void> releasePartition(String name, int partition) {
Assert.state(isOpen(), "coordinator not open"); Assert.state(isOpen(), "coordinator not open");
ResourceHolder resource = resources.get(name); ResourceHolder resource = resources.get(name);
if (resource != null) { if (resource != null) {
Expand Down Expand Up @@ -197,8 +198,8 @@ private void createResources() {
List<PartitionHolder> partitions = new ArrayList<>(config.getPartitions().size()); List<PartitionHolder> partitions = new ArrayList<>(config.getPartitions().size());
for (CoordinatedResourcePartitionConfig partitionConfig : config.getPartitions()) { for (CoordinatedResourcePartitionConfig partitionConfig : config.getPartitions()) {
CopycatStateContext state = new CopycatStateContext(name, uri, config, partitionConfig); CopycatStateContext state = new CopycatStateContext(name, uri, config, partitionConfig);
Cluster cluster = new CoordinatedCluster(name.hashCode(), this, context, new ResourceRouter(name), Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-resource-" + name + "-%d"))); ManagedCluster cluster = new CoordinatedCluster(name.hashCode(), this, context, new ResourceRouter(name), Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-resource-" + name + "-%d")));
ResourcePartitionContext context = new DefaultResourcePartitionContext(name, partitionConfig, cluster, state); ResourcePartitionContext context = new DefaultResourcePartitionContext(name, partitionConfig, cluster, state, this);
partitions.add(new PartitionHolder(partitionConfig, cluster, state, context)); partitions.add(new PartitionHolder(partitionConfig, cluster, state, context));
} }


Expand Down Expand Up @@ -360,11 +361,11 @@ private ResourceHolder(Resource resource, CoordinatedResourceConfig config, List
*/ */
private static class PartitionHolder { private static class PartitionHolder {
private final CoordinatedResourcePartitionConfig config; private final CoordinatedResourcePartitionConfig config;
private final Cluster cluster; private final ManagedCluster cluster;
private final CopycatStateContext state; private final CopycatStateContext state;
private final ResourcePartitionContext context; private final ResourcePartitionContext context;


private PartitionHolder(CoordinatedResourcePartitionConfig config, Cluster cluster, CopycatStateContext state, ResourcePartitionContext context) { private PartitionHolder(CoordinatedResourcePartitionConfig config, ManagedCluster cluster, CopycatStateContext state, ResourcePartitionContext context) {
this.config = config; this.config = config;
this.cluster = cluster; this.cluster = cluster;
this.state = state; this.state = state;
Expand Down

0 comments on commit a27db80

Please sign in to comment.