diff --git a/coordination/src/main/java/io/atomix/copycat/coordination/state/MessageBusState.java b/coordination/src/main/java/io/atomix/copycat/coordination/state/MessageBusState.java index 8136c8c922..a76a6e81c0 100644 --- a/coordination/src/main/java/io/atomix/copycat/coordination/state/MessageBusState.java +++ b/coordination/src/main/java/io/atomix/copycat/coordination/state/MessageBusState.java @@ -22,8 +22,6 @@ import io.atomix.catalyst.transport.Address; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; /** * Message bus state machine. @@ -38,7 +36,7 @@ public class MessageBusState extends StateMachine { public void configure(StateMachineExecutor executor) { executor.register(MessageBusCommands.Join.class, this::join); executor.register(MessageBusCommands.Leave.class, this::leave); - executor.register(MessageBusCommands.Register.class, (Function, CompletableFuture>) this::registerConsumer); + executor.register(MessageBusCommands.Register.class, this::registerConsumer); executor.register(MessageBusCommands.Unregister.class, this::unregisterConsumer); } @@ -110,7 +108,7 @@ protected void leave(Commit commit) { /** * Registers a topic consumer. */ - private CompletableFuture registerConsumer(Commit commit) { + private void registerConsumer(Commit commit) { try { Commit parent = members.get(commit.session().id()); if (parent == null) { @@ -120,12 +118,9 @@ private CompletableFuture registerConsumer(Commit c Map> registrations = topics.computeIfAbsent(commit.operation().topic(), t -> new HashMap<>()); registrations.put(commit.session().id(), commit); - int i = 0; - CompletableFuture[] futures = new CompletableFuture[members.size()]; for (Commit member : members.values()) { - futures[i++] = member.session().publish("register", new MessageBusCommands.ConsumerInfo(commit.operation().topic(), parent.operation().member())); + member.session().publish("register", new MessageBusCommands.ConsumerInfo(commit.operation().topic(), parent.operation().member())); } - return CompletableFuture.allOf(futures); } catch (Exception e) { commit.clean(); throw e; diff --git a/core/src/main/java/io/atomix/copycat/CopycatReplica.java b/core/src/main/java/io/atomix/copycat/CopycatReplica.java index 28c666092c..52dceac761 100644 --- a/core/src/main/java/io/atomix/copycat/CopycatReplica.java +++ b/core/src/main/java/io/atomix/copycat/CopycatReplica.java @@ -22,12 +22,12 @@ import io.atomix.catalyst.transport.*; import io.atomix.catalyst.util.Assert; import io.atomix.catalyst.util.ConfigurationException; -import io.atomix.catalyst.util.concurrent.CatalystThreadFactory; import io.atomix.copycat.manager.ResourceManager; import java.time.Duration; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; /** @@ -236,9 +236,6 @@ public Builder withSessionTimeout(Duration sessionTimeout) { @Override public CopycatReplica build() { - ThreadFactory threadFactory = new CatalystThreadFactory("copycat-resource-%d"); - ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory); - // If no transport was configured by the user, attempt to load the Netty transport. if (transport == null) { try { @@ -256,7 +253,7 @@ public CopycatReplica build() { // Construct the underlying RaftServer. The server should have been configured with a CombinedTransport // that facilitates the local client connecting directly to the server. RaftServer server = serverBuilder.withTransport(new CombinedTransport(new LocalTransport(localRegistry), transport)) - .withStateMachine(new ResourceManager(executor)).build(); + .withStateMachine(new ResourceManager()).build(); return new CopycatReplica(client, server, transport); } diff --git a/core/src/main/java/io/atomix/copycat/CopycatServer.java b/core/src/main/java/io/atomix/copycat/CopycatServer.java index 3ef3c5ac55..ec1e7b38d2 100644 --- a/core/src/main/java/io/atomix/copycat/CopycatServer.java +++ b/core/src/main/java/io/atomix/copycat/CopycatServer.java @@ -23,15 +23,11 @@ import io.atomix.catalyst.util.Assert; import io.atomix.catalyst.util.ConfigurationException; import io.atomix.catalyst.util.Managed; -import io.atomix.catalyst.util.concurrent.CatalystThreadFactory; import io.atomix.copycat.manager.ResourceManager; import java.time.Duration; import java.util.Collection; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; /** * Standalone Copycat server. @@ -153,9 +149,6 @@ public Builder withSessionTimeout(Duration sessionTimeout) { @Override public CopycatServer build() { - ThreadFactory threadFactory = new CatalystThreadFactory("copycat-resource-%d"); - ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory); - // If no transport was configured by the user, attempt to load the Netty transport. if (transport == null) { try { @@ -168,7 +161,7 @@ public CopycatServer build() { // Construct the underlying RaftServer. The server should have been configured with a CombinedTransport // that facilitates the local client connecting directly to the server. RaftServer server = builder.withTransport(transport) - .withStateMachine(new ResourceManager(executor)).build(); + .withStateMachine(new ResourceManager()).build(); return new CopycatServer(server); } diff --git a/core/src/main/java/io/atomix/copycat/manager/ManagedResourceSession.java b/core/src/main/java/io/atomix/copycat/manager/ManagedResourceSession.java index 4a331f6744..ed87418a91 100644 --- a/core/src/main/java/io/atomix/copycat/manager/ManagedResourceSession.java +++ b/core/src/main/java/io/atomix/copycat/manager/ManagedResourceSession.java @@ -22,7 +22,6 @@ import io.atomix.copycat.resource.ResourceEvent; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; @@ -62,7 +61,7 @@ public long id() { } @Override - public CompletableFuture publish(String event, Object message) { + public Session publish(String event, Object message) { return parent.publish(event, new ResourceEvent<>(resource, Assert.notNull(message, "message"))); } diff --git a/core/src/main/java/io/atomix/copycat/manager/ResourceManager.java b/core/src/main/java/io/atomix/copycat/manager/ResourceManager.java index 698cc13095..63173a4a68 100644 --- a/core/src/main/java/io/atomix/copycat/manager/ResourceManager.java +++ b/core/src/main/java/io/atomix/copycat/manager/ResourceManager.java @@ -19,16 +19,11 @@ import io.atomix.catalogue.server.Commit; import io.atomix.catalogue.server.StateMachine; import io.atomix.catalogue.server.StateMachineExecutor; -import io.atomix.catalyst.util.Assert; -import io.atomix.catalyst.util.concurrent.ThreadContext; -import io.atomix.catalyst.util.concurrent.ThreadPoolContext; import io.atomix.copycat.resource.ResourceOperation; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; /** @@ -37,24 +32,16 @@ * @author Jordan Halterman */ public class ResourceManager extends StateMachine { - private final ScheduledExecutorService scheduler; private StateMachineExecutor executor; private final Map paths = new HashMap<>(); private final Map resources = new HashMap<>(); private final Map sessions = new HashMap<>(); private final ResourceCommitPool commits = new ResourceCommitPool(); - /** - * @throws NullPointerException if {@code scheduler} is null - */ - public ResourceManager(ScheduledExecutorService scheduler) { - this.scheduler = Assert.notNull(scheduler, "scheduler"); - } - @Override public void configure(StateMachineExecutor executor) { this.executor = executor; - executor.register(ResourceOperation.class, (Function, CompletableFuture>) this::operateResource); + executor.register(ResourceOperation.class, (Function, Object>) this::operateResource); executor.register(GetResource.class, this::getResource); executor.register(CreateResource.class, this::createResource); executor.register(DeleteResource.class, this::deleteResource); @@ -65,7 +52,7 @@ public void configure(StateMachineExecutor executor) { * Performs an operation on a resource. */ @SuppressWarnings("unchecked") - private CompletableFuture operateResource(Commit commit) { + private Object operateResource(Commit commit) { ResourceHolder resource; SessionHolder session = sessions.get(commit.operation().resource()); if (session != null) { @@ -103,8 +90,7 @@ protected long getResource(Commit commit) { try { StateMachine stateMachine = commit.operation().type().newInstance(); - ThreadContext context = new ThreadPoolContext(scheduler, ThreadContext.currentContext().serializer().clone()); - ResourceStateMachineExecutor executor = new ResourceStateMachineExecutor(commit.index(), this.executor, context); + ResourceStateMachineExecutor executor = new ResourceStateMachineExecutor(commit.index(), this.executor); ResourceHolder resource = new ResourceHolder(path, stateMachine, executor); resources.put(resourceId, resource); stateMachine.init(executor); @@ -138,8 +124,7 @@ private long createResource(Commit commit) { try { StateMachine stateMachine = commit.operation().type().newInstance(); - ThreadContext context = new ThreadPoolContext(scheduler, ThreadContext.currentContext().serializer().clone()); - ResourceStateMachineExecutor executor = new ResourceStateMachineExecutor(commit.index(), this.executor, context); + ResourceStateMachineExecutor executor = new ResourceStateMachineExecutor(commit.index(), this.executor); resource = new ResourceHolder(path, stateMachine, executor); resources.put(resourceId, resource); stateMachine.init(executor); @@ -156,7 +141,7 @@ private long createResource(Commit commit) { id = commit.index(); ManagedResourceSession session = new ManagedResourceSession(id, commit.session()); sessions.put(id, new SessionHolder(resourceId, commit, session)); - resource.executor.execute(() -> resource.stateMachine.register(session)); + resource.stateMachine.register(session); return id; } @@ -195,7 +180,7 @@ public void expire(Session session) { for (ResourceHolder resource : resources.values()) { SessionHolder resourceSession = resource.sessions.get(session.id()); if (resourceSession != null) { - resource.executor.execute(() -> resource.stateMachine.expire(resourceSession.session)); + resource.stateMachine.expire(resourceSession.session); } } @@ -203,7 +188,7 @@ public void expire(Session session) { if (sessionHolder.session.id() == session.id()) { ResourceHolder resource = resources.get(sessionHolder.resource); if (resource != null) { - resource.executor.execute(() -> resource.stateMachine.expire(sessionHolder.session)); + resource.stateMachine.expire(sessionHolder.session); } } } @@ -214,7 +199,7 @@ public void close(Session session) { for (ResourceHolder resource : resources.values()) { SessionHolder resourceSession = resource.sessions.remove(session.id()); if (resourceSession != null) { - resource.executor.execute(() -> resource.stateMachine.close(resourceSession.session)); + resource.stateMachine.close(resourceSession.session); resourceSession.commit.clean(); } } @@ -225,7 +210,7 @@ public void close(Session session) { if (sessionHolder.session.id() == session.id()) { ResourceHolder resource = resources.get(sessionHolder.resource); if (resource != null) { - resource.executor.execute(() -> resource.stateMachine.close(sessionHolder.session)); + resource.stateMachine.close(sessionHolder.session); } sessionHolder.commit.clean(); iterator.remove(); @@ -233,11 +218,6 @@ public void close(Session session) { } } - @Override - public void close() { - scheduler.shutdown(); - } - /** * Resource holder. */ diff --git a/core/src/main/java/io/atomix/copycat/manager/ResourceStateMachineExecutor.java b/core/src/main/java/io/atomix/copycat/manager/ResourceStateMachineExecutor.java index 45ee0233de..6de5085479 100644 --- a/core/src/main/java/io/atomix/copycat/manager/ResourceStateMachineExecutor.java +++ b/core/src/main/java/io/atomix/copycat/manager/ResourceStateMachineExecutor.java @@ -22,9 +22,7 @@ import io.atomix.catalogue.server.StateMachineExecutor; import io.atomix.catalyst.serializer.Serializer; import io.atomix.catalyst.util.Assert; -import io.atomix.catalyst.util.concurrent.ComposableFuture; import io.atomix.catalyst.util.concurrent.Scheduled; -import io.atomix.catalyst.util.concurrent.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,9 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -48,15 +44,13 @@ */ class ResourceStateMachineExecutor implements StateMachineExecutor { private final StateMachineExecutor parent; - private final ThreadContext context; private final Logger logger; private final Map operations = new HashMap<>(); private final Set tasks = new HashSet<>(); private Function allOperation; - ResourceStateMachineExecutor(long resource, StateMachineExecutor parent, ThreadContext context) { + ResourceStateMachineExecutor(long resource, StateMachineExecutor parent) { this.parent = parent; - this.context = context; this.logger = LoggerFactory.getLogger(String.format("%s-%d", getClass().getName(), resource)); } @@ -72,75 +66,55 @@ public Logger logger() { @Override public Serializer serializer() { - return context.serializer(); + return parent.serializer(); } @Override public Executor executor() { - return context.executor(); + return parent.executor(); } @Override public CompletableFuture execute(Runnable callback) { - return context.execute(callback); + return parent.execute(callback); } @Override public CompletableFuture execute(Supplier callback) { - return context.execute(callback); + return parent.execute(callback); } /** * Executes the given commit on the state machine. */ @SuppressWarnings("unchecked") - , U> CompletableFuture execute(Commit commit) { - ThreadContext parent = ThreadContext.currentContext(); - - ComposableFuture future = new ComposableFuture<>(); - context.executor().execute(() -> { - // Get the function registered for the operation. If no function is registered, attempt to - // use a global function if available. - Function function = operations.get(commit.type()); - if (function == null) { - function = allOperation; - } - - if (function == null) { - parent.executor().execute(() -> future.completeExceptionally(new IllegalStateException("unknown state machine operation: " + commit.type()))); - } else { - // Execute the operation. If the operation return value is a Future, await the result, - // otherwise immediately complete the execution future. - Object result = function.apply(commit); - if (result instanceof CompletableFuture) { - ((CompletableFuture) result).whenCompleteAsync(future, parent.executor()); - } else if (result instanceof Future) { - parent.executor().execute(() -> { - try { - future.complete(((Future) result).get()); - } catch (ExecutionException | InterruptedException e) { - throw new RuntimeException(e); - } - }); - } else { - parent.executor().execute(() -> future.complete((U) result)); - } - } - }); - - return future; + , U> U execute(Commit commit) { + // Get the function registered for the operation. If no function is registered, attempt to + // use a global function if available. + Function function = operations.get(commit.type()); + if (function == null) { + function = allOperation; + } + + if (function == null) { + throw new IllegalStateException("unknown state machine operation: " + commit.type()); + } else { + // Execute the operation. If the operation return value is a Future, await the result, + // otherwise immediately complete the execution future. + return (U) function.apply(commit); + } } @Override public Scheduled schedule(Duration delay, Runnable callback) { - Scheduled task = parent.schedule(delay, () -> context.executor().execute(callback)); + Scheduled task = parent.schedule(delay, () -> parent.executor().execute(callback)); tasks.add(task); return task; } @Override public Scheduled schedule(Duration initialDelay, Duration interval, Runnable callback) { - Scheduled task = parent.schedule(initialDelay, interval, () -> context.executor().execute(callback)); + Scheduled task = parent.schedule(initialDelay, interval, () -> parent.executor().execute(callback)); tasks.add(task); return task; } @@ -163,7 +137,7 @@ public > StateMachineExecutor register(Class type, } @Override - public > StateMachineExecutor register(Class type, Function, ?> callback) { + public , U> StateMachineExecutor register(Class type, Function, U> callback) { Assert.notNull(type, "type"); Assert.notNull(callback, "callback"); operations.put(type, callback); diff --git a/core/src/main/java/io/atomix/copycat/resource/ResourceSession.java b/core/src/main/java/io/atomix/copycat/resource/ResourceSession.java index 1bbb5905c6..51f5e29cbf 100644 --- a/core/src/main/java/io/atomix/copycat/resource/ResourceSession.java +++ b/core/src/main/java/io/atomix/copycat/resource/ResourceSession.java @@ -23,7 +23,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; @@ -65,19 +64,19 @@ public Listener onOpen(Consumer listener) { @Override @SuppressWarnings("unchecked") - public CompletableFuture publish(String event, Object message) { + public Session publish(String event, Object message) { ResourceEvent resourceEvent = (ResourceEvent) Assert.notNull(message, "message"); if (resourceEvent.resource() == resource) { Set listeners = eventListeners.get(event); if (listeners != null) { - return CompletableFuture.runAsync(() -> { + context.executor().execute(() -> { for (Consumer listener : listeners) { listener.accept(resourceEvent.event()); } - }, context.executor()); + }); } } - return CompletableFuture.completedFuture(null); + return this; } @Override