Skip to content

Commit

Permalink
Update implementations for Catalogue API changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Sep 26, 2015
1 parent ce56990 commit 1978df4
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 107 deletions.
Expand Up @@ -22,8 +22,6 @@
import io.atomix.catalyst.transport.Address;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

/**
* Message bus state machine.
Expand All @@ -38,7 +36,7 @@ public class MessageBusState extends StateMachine {
public void configure(StateMachineExecutor executor) {
executor.register(MessageBusCommands.Join.class, this::join);
executor.register(MessageBusCommands.Leave.class, this::leave);
executor.register(MessageBusCommands.Register.class, (Function<Commit<MessageBusCommands.Register>, CompletableFuture>) this::registerConsumer);
executor.register(MessageBusCommands.Register.class, this::registerConsumer);
executor.register(MessageBusCommands.Unregister.class, this::unregisterConsumer);
}

Expand Down Expand Up @@ -110,7 +108,7 @@ protected void leave(Commit<MessageBusCommands.Leave> commit) {
/**
* Registers a topic consumer.
*/
private CompletableFuture registerConsumer(Commit<MessageBusCommands.Register> commit) {
private void registerConsumer(Commit<MessageBusCommands.Register> commit) {
try {
Commit<MessageBusCommands.Join> parent = members.get(commit.session().id());
if (parent == null) {
Expand All @@ -120,12 +118,9 @@ private CompletableFuture registerConsumer(Commit<MessageBusCommands.Register> c
Map<Long, Commit<MessageBusCommands.Register>> registrations = topics.computeIfAbsent(commit.operation().topic(), t -> new HashMap<>());
registrations.put(commit.session().id(), commit);

int i = 0;
CompletableFuture[] futures = new CompletableFuture[members.size()];
for (Commit<MessageBusCommands.Join> member : members.values()) {
futures[i++] = member.session().publish("register", new MessageBusCommands.ConsumerInfo(commit.operation().topic(), parent.operation().member()));
member.session().publish("register", new MessageBusCommands.ConsumerInfo(commit.operation().topic(), parent.operation().member()));
}
return CompletableFuture.allOf(futures);
} catch (Exception e) {
commit.clean();
throw e;
Expand Down
9 changes: 3 additions & 6 deletions core/src/main/java/io/atomix/copycat/CopycatReplica.java
Expand Up @@ -22,12 +22,12 @@
import io.atomix.catalyst.transport.*;
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.ConfigurationException;
import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
import io.atomix.copycat.manager.ResourceManager;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -236,9 +236,6 @@ public Builder withSessionTimeout(Duration sessionTimeout) {

@Override
public CopycatReplica build() {
ThreadFactory threadFactory = new CatalystThreadFactory("copycat-resource-%d");
ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);

// If no transport was configured by the user, attempt to load the Netty transport.
if (transport == null) {
try {
Expand All @@ -256,7 +253,7 @@ public CopycatReplica build() {
// Construct the underlying RaftServer. The server should have been configured with a CombinedTransport
// that facilitates the local client connecting directly to the server.
RaftServer server = serverBuilder.withTransport(new CombinedTransport(new LocalTransport(localRegistry), transport))
.withStateMachine(new ResourceManager(executor)).build();
.withStateMachine(new ResourceManager()).build();

return new CopycatReplica(client, server, transport);
}
Expand Down
9 changes: 1 addition & 8 deletions core/src/main/java/io/atomix/copycat/CopycatServer.java
Expand Up @@ -23,15 +23,11 @@
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.ConfigurationException;
import io.atomix.catalyst.util.Managed;
import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
import io.atomix.copycat.manager.ResourceManager;

import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

/**
* Standalone Copycat server.
Expand Down Expand Up @@ -153,9 +149,6 @@ public Builder withSessionTimeout(Duration sessionTimeout) {

@Override
public CopycatServer build() {
ThreadFactory threadFactory = new CatalystThreadFactory("copycat-resource-%d");
ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);

// If no transport was configured by the user, attempt to load the Netty transport.
if (transport == null) {
try {
Expand All @@ -168,7 +161,7 @@ public CopycatServer build() {
// Construct the underlying RaftServer. The server should have been configured with a CombinedTransport
// that facilitates the local client connecting directly to the server.
RaftServer server = builder.withTransport(transport)
.withStateMachine(new ResourceManager(executor)).build();
.withStateMachine(new ResourceManager()).build();

return new CopycatServer(server);
}
Expand Down
Expand Up @@ -22,7 +22,6 @@
import io.atomix.copycat.resource.ResourceEvent;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

Expand Down Expand Up @@ -62,7 +61,7 @@ public long id() {
}

@Override
public CompletableFuture<Void> publish(String event, Object message) {
public Session publish(String event, Object message) {
return parent.publish(event, new ResourceEvent<>(resource, Assert.notNull(message, "message")));
}

Expand Down
38 changes: 9 additions & 29 deletions core/src/main/java/io/atomix/copycat/manager/ResourceManager.java
Expand Up @@ -19,16 +19,11 @@
import io.atomix.catalogue.server.Commit;
import io.atomix.catalogue.server.StateMachine;
import io.atomix.catalogue.server.StateMachineExecutor;
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.concurrent.ThreadContext;
import io.atomix.catalyst.util.concurrent.ThreadPoolContext;
import io.atomix.copycat.resource.ResourceOperation;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;

/**
Expand All @@ -37,24 +32,16 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class ResourceManager extends StateMachine {
private final ScheduledExecutorService scheduler;
private StateMachineExecutor executor;
private final Map<String, Long> paths = new HashMap<>();
private final Map<Long, ResourceHolder> resources = new HashMap<>();
private final Map<Long, SessionHolder> sessions = new HashMap<>();
private final ResourceCommitPool commits = new ResourceCommitPool();

/**
* @throws NullPointerException if {@code scheduler} is null
*/
public ResourceManager(ScheduledExecutorService scheduler) {
this.scheduler = Assert.notNull(scheduler, "scheduler");
}

@Override
public void configure(StateMachineExecutor executor) {
this.executor = executor;
executor.register(ResourceOperation.class, (Function<Commit<ResourceOperation>, CompletableFuture<Object>>) this::operateResource);
executor.register(ResourceOperation.class, (Function<Commit<ResourceOperation>, Object>) this::operateResource);
executor.register(GetResource.class, this::getResource);
executor.register(CreateResource.class, this::createResource);
executor.register(DeleteResource.class, this::deleteResource);
Expand All @@ -65,7 +52,7 @@ public void configure(StateMachineExecutor executor) {
* Performs an operation on a resource.
*/
@SuppressWarnings("unchecked")
private CompletableFuture<Object> operateResource(Commit<ResourceOperation> commit) {
private Object operateResource(Commit<ResourceOperation> commit) {
ResourceHolder resource;
SessionHolder session = sessions.get(commit.operation().resource());
if (session != null) {
Expand Down Expand Up @@ -103,8 +90,7 @@ protected long getResource(Commit<GetResource> commit) {

try {
StateMachine stateMachine = commit.operation().type().newInstance();
ThreadContext context = new ThreadPoolContext(scheduler, ThreadContext.currentContext().serializer().clone());
ResourceStateMachineExecutor executor = new ResourceStateMachineExecutor(commit.index(), this.executor, context);
ResourceStateMachineExecutor executor = new ResourceStateMachineExecutor(commit.index(), this.executor);
ResourceHolder resource = new ResourceHolder(path, stateMachine, executor);
resources.put(resourceId, resource);
stateMachine.init(executor);
Expand Down Expand Up @@ -138,8 +124,7 @@ private long createResource(Commit<CreateResource> commit) {

try {
StateMachine stateMachine = commit.operation().type().newInstance();
ThreadContext context = new ThreadPoolContext(scheduler, ThreadContext.currentContext().serializer().clone());
ResourceStateMachineExecutor executor = new ResourceStateMachineExecutor(commit.index(), this.executor, context);
ResourceStateMachineExecutor executor = new ResourceStateMachineExecutor(commit.index(), this.executor);
resource = new ResourceHolder(path, stateMachine, executor);
resources.put(resourceId, resource);
stateMachine.init(executor);
Expand All @@ -156,7 +141,7 @@ private long createResource(Commit<CreateResource> commit) {
id = commit.index();
ManagedResourceSession session = new ManagedResourceSession(id, commit.session());
sessions.put(id, new SessionHolder(resourceId, commit, session));
resource.executor.execute(() -> resource.stateMachine.register(session));
resource.stateMachine.register(session);
return id;
}

Expand Down Expand Up @@ -195,15 +180,15 @@ public void expire(Session session) {
for (ResourceHolder resource : resources.values()) {
SessionHolder resourceSession = resource.sessions.get(session.id());
if (resourceSession != null) {
resource.executor.execute(() -> resource.stateMachine.expire(resourceSession.session));
resource.stateMachine.expire(resourceSession.session);
}
}

for (SessionHolder sessionHolder : sessions.values()) {
if (sessionHolder.session.id() == session.id()) {
ResourceHolder resource = resources.get(sessionHolder.resource);
if (resource != null) {
resource.executor.execute(() -> resource.stateMachine.expire(sessionHolder.session));
resource.stateMachine.expire(sessionHolder.session);
}
}
}
Expand All @@ -214,7 +199,7 @@ public void close(Session session) {
for (ResourceHolder resource : resources.values()) {
SessionHolder resourceSession = resource.sessions.remove(session.id());
if (resourceSession != null) {
resource.executor.execute(() -> resource.stateMachine.close(resourceSession.session));
resource.stateMachine.close(resourceSession.session);
resourceSession.commit.clean();
}
}
Expand All @@ -225,19 +210,14 @@ public void close(Session session) {
if (sessionHolder.session.id() == session.id()) {
ResourceHolder resource = resources.get(sessionHolder.resource);
if (resource != null) {
resource.executor.execute(() -> resource.stateMachine.close(sessionHolder.session));
resource.stateMachine.close(sessionHolder.session);
}
sessionHolder.commit.clean();
iterator.remove();
}
}
}

@Override
public void close() {
scheduler.shutdown();
}

/**
* Resource holder.
*/
Expand Down
Expand Up @@ -22,9 +22,7 @@
import io.atomix.catalogue.server.StateMachineExecutor;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.concurrent.ComposableFuture;
import io.atomix.catalyst.util.concurrent.Scheduled;
import io.atomix.catalyst.util.concurrent.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,9 +32,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -48,15 +44,13 @@
*/
class ResourceStateMachineExecutor implements StateMachineExecutor {
private final StateMachineExecutor parent;
private final ThreadContext context;
private final Logger logger;
private final Map<Class, Function> operations = new HashMap<>();
private final Set<Scheduled> tasks = new HashSet<>();
private Function allOperation;

ResourceStateMachineExecutor(long resource, StateMachineExecutor parent, ThreadContext context) {
ResourceStateMachineExecutor(long resource, StateMachineExecutor parent) {
this.parent = parent;
this.context = context;
this.logger = LoggerFactory.getLogger(String.format("%s-%d", getClass().getName(), resource));
}

Expand All @@ -72,75 +66,55 @@ public Logger logger() {

@Override
public Serializer serializer() {
return context.serializer();
return parent.serializer();
}

@Override
public Executor executor() {
return context.executor();
return parent.executor();
}

@Override
public CompletableFuture<Void> execute(Runnable callback) {
return context.execute(callback);
return parent.execute(callback);
}

@Override
public <T> CompletableFuture<T> execute(Supplier<T> callback) {
return context.execute(callback);
return parent.execute(callback);
}

/**
* Executes the given commit on the state machine.
*/
@SuppressWarnings("unchecked")
<T extends Operation<U>, U> CompletableFuture<U> execute(Commit<T> commit) {
ThreadContext parent = ThreadContext.currentContext();

ComposableFuture<U> future = new ComposableFuture<>();
context.executor().execute(() -> {
// Get the function registered for the operation. If no function is registered, attempt to
// use a global function if available.
Function function = operations.get(commit.type());
if (function == null) {
function = allOperation;
}

if (function == null) {
parent.executor().execute(() -> future.completeExceptionally(new IllegalStateException("unknown state machine operation: " + commit.type())));
} else {
// Execute the operation. If the operation return value is a Future, await the result,
// otherwise immediately complete the execution future.
Object result = function.apply(commit);
if (result instanceof CompletableFuture) {
((CompletableFuture<U>) result).whenCompleteAsync(future, parent.executor());
} else if (result instanceof Future) {
parent.executor().execute(() -> {
try {
future.complete(((Future<U>) result).get());
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
});
} else {
parent.executor().execute(() -> future.complete((U) result));
}
}
});

return future;
<T extends Operation<U>, U> U execute(Commit<T> commit) {
// Get the function registered for the operation. If no function is registered, attempt to
// use a global function if available.
Function function = operations.get(commit.type());
if (function == null) {
function = allOperation;
}

if (function == null) {
throw new IllegalStateException("unknown state machine operation: " + commit.type());
} else {
// Execute the operation. If the operation return value is a Future, await the result,
// otherwise immediately complete the execution future.
return (U) function.apply(commit);
}
}

@Override
public Scheduled schedule(Duration delay, Runnable callback) {
Scheduled task = parent.schedule(delay, () -> context.executor().execute(callback));
Scheduled task = parent.schedule(delay, () -> parent.executor().execute(callback));
tasks.add(task);
return task;
}

@Override
public Scheduled schedule(Duration initialDelay, Duration interval, Runnable callback) {
Scheduled task = parent.schedule(initialDelay, interval, () -> context.executor().execute(callback));
Scheduled task = parent.schedule(initialDelay, interval, () -> parent.executor().execute(callback));
tasks.add(task);
return task;
}
Expand All @@ -163,7 +137,7 @@ public <T extends Operation<Void>> StateMachineExecutor register(Class<T> type,
}

@Override
public <T extends Operation<?>> StateMachineExecutor register(Class<T> type, Function<Commit<T>, ?> callback) {
public <T extends Operation<U>, U> StateMachineExecutor register(Class<T> type, Function<Commit<T>, U> callback) {
Assert.notNull(type, "type");
Assert.notNull(callback, "callback");
operations.put(type, callback);
Expand Down

0 comments on commit 1978df4

Please sign in to comment.