From 8661efc81194bb1bfa298d7a4b86b185eba81b62 Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Thu, 9 Jul 2015 01:19:18 -0700 Subject: [PATCH] Refactor context to support pluggable contexts and use the Netty event loop. --- .../kuujo/copycat/cluster/ManagedMember.java | 14 +- .../kuujo/copycat/cluster/ManagedMembers.java | 4 +- .../java/net/kuujo/copycat/util/Context.java | 118 ++++++++++++++++ ...ThreadChecker.java => ContextFactory.java} | 24 ++-- .../net/kuujo/copycat/util/CopycatThread.java | 6 +- .../kuujo/copycat/util/ExecutionContext.java | 128 ------------------ .../kuujo/copycat/cluster/NettyCluster.java | 4 +- .../copycat/cluster/NettyLocalMember.java | 18 +-- .../kuujo/copycat/cluster/NettyMembers.java | 4 +- .../copycat/cluster/NettyRemoteMember.java | 14 +- .../net/kuujo/copycat/util/NettyContext.java | 94 +++++++++++++ .../copycat/util/NettyContextFactory.java | 35 +++++ .../net.kuujo.copycat.util.ContextFactory | 16 +++ .../copycat/cluster/NettyClusterTest.java | 18 +-- .../java/net/kuujo/copycat/raft/Raft.java | 4 +- .../net/kuujo/copycat/raft/RaftClient.java | 4 +- .../net/kuujo/copycat/raft/log/Compactor.java | 6 +- .../java/net/kuujo/copycat/raft/log/Log.java | 4 +- .../copycat/raft/log/MajorCompaction.java | 6 +- .../copycat/raft/log/MinorCompaction.java | 6 +- .../net/kuujo/copycat/raft/log/Segment.java | 8 +- .../copycat/raft/log/SegmentManager.java | 6 +- .../kuujo/copycat/raft/state/RaftState.java | 6 +- .../copycat/raft/state/RaftStateClient.java | 17 +-- .../copycat/raft/state/RaftStateContext.java | 23 ++-- .../net/kuujo/copycat/raft/log/LogTest.java | 6 +- .../copycat/raft/log/MajorCompactionTest.java | 4 +- .../copycat/raft/log/MinorCompactionTest.java | 4 +- .../kuujo/copycat/cluster/TestCluster.java | 4 +- .../copycat/cluster/TestLocalMember.java | 12 +- .../copycat/cluster/TestRemoteMember.java | 10 +- 31 files changed, 376 insertions(+), 251 deletions(-) create mode 100644 cluster/src/main/java/net/kuujo/copycat/util/Context.java rename cluster/src/main/java/net/kuujo/copycat/util/{ThreadChecker.java => ContextFactory.java} (57%) delete mode 100644 cluster/src/main/java/net/kuujo/copycat/util/ExecutionContext.java create mode 100644 netty/src/main/java/net/kuujo/copycat/util/NettyContext.java create mode 100644 netty/src/main/java/net/kuujo/copycat/util/NettyContextFactory.java create mode 100644 netty/src/main/resources/META-INF/services/net.kuujo.copycat.util.ContextFactory diff --git a/cluster/src/main/java/net/kuujo/copycat/cluster/ManagedMember.java b/cluster/src/main/java/net/kuujo/copycat/cluster/ManagedMember.java index e2887e46c9..214e10b232 100644 --- a/cluster/src/main/java/net/kuujo/copycat/cluster/ManagedMember.java +++ b/cluster/src/main/java/net/kuujo/copycat/cluster/ManagedMember.java @@ -16,7 +16,7 @@ package net.kuujo.copycat.cluster; import net.kuujo.alleycat.Alleycat; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import net.kuujo.copycat.util.Managed; import java.util.Random; @@ -30,8 +30,8 @@ public abstract class ManagedMember implements Member, Managed { protected final MemberInfo info; protected Type type; protected Status status = Status.DEAD; - protected ExecutionContext context; - protected Alleycat alleycat; + protected Context context; + protected Alleycat serializer; protected ManagedMember(MemberInfo info, Type type) { this.info = info; @@ -41,16 +41,16 @@ protected ManagedMember(MemberInfo info, Type type) { /** * Sets the member context. */ - void setContext(ExecutionContext context) { + void setContext(Context context) { this.context = context; - this.alleycat = context.alleycat(); + this.serializer = context.serializer(); } /** * Returns the current execution context. */ - protected ExecutionContext getContext() { - ExecutionContext context = ExecutionContext.currentContext(); + protected Context getContext() { + Context context = Context.currentContext(); return context != null ? context : this.context; } diff --git a/cluster/src/main/java/net/kuujo/copycat/cluster/ManagedMembers.java b/cluster/src/main/java/net/kuujo/copycat/cluster/ManagedMembers.java index b0451ad2bd..60aca1458a 100644 --- a/cluster/src/main/java/net/kuujo/copycat/cluster/ManagedMembers.java +++ b/cluster/src/main/java/net/kuujo/copycat/cluster/ManagedMembers.java @@ -16,7 +16,7 @@ package net.kuujo.copycat.cluster; import net.kuujo.alleycat.Alleycat; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import net.kuujo.copycat.util.Managed; import java.util.*; @@ -43,7 +43,7 @@ public abstract class ManagedMembers implements Members, Managed { protected ManagedMembers(Collection remoteMembers, Alleycat alleycat) { remoteMembers.forEach(m -> { - ((ManagedMember)m).setContext(new ExecutionContext("copycat-cluster-" + m.id(), alleycat)); + ((ManagedMember)m).setContext(Context.createContext("copycat-cluster-%d", alleycat)); this.members.put(m.id(), m); this.sortedMembers.add(m); }); diff --git a/cluster/src/main/java/net/kuujo/copycat/util/Context.java b/cluster/src/main/java/net/kuujo/copycat/util/Context.java new file mode 100644 index 0000000000..abcebdc3bc --- /dev/null +++ b/cluster/src/main/java/net/kuujo/copycat/util/Context.java @@ -0,0 +1,118 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.kuujo.copycat.util; + +import net.kuujo.alleycat.Alleycat; + +import java.util.ServiceLoader; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Execution context. + * + * @author Jordan Halterman + */ +public abstract class Context implements Executor, AutoCloseable { + private static final ContextFactory factory = ServiceLoader.load(ContextFactory.class).iterator().next(); + + /** + * Creates a new context. + * + * @param name The context name. + * @param serializer The Alleycat serializer. + * @return A new context. + */ + public static Context createContext(String name, Alleycat serializer) { + return factory.createContext(name, serializer); + } + + /** + * Returns the current context. + * + * @return The current context. + */ + public static Context currentContext() { + Thread thread = Thread.currentThread(); + return thread instanceof CopycatThread ? ((CopycatThread) thread).getContext() : null; + } + + private final String name; + private final Alleycat serializer; + + protected Context(String name, CopycatThread thread, Alleycat serializer) { + if (thread == null) + throw new NullPointerException("thread cannot be null"); + thread.setName(name); + thread.setContext(this); + this.name = name; + this.serializer = serializer; + } + + /** + * Checks that the current thread is the correct context thread. + */ + public void checkThread() { + Thread thread = Thread.currentThread(); + if (!(thread instanceof CopycatThread && ((CopycatThread) thread).getContext() == this)) { + throw new IllegalStateException("not running on the correct thread"); + } + } + + /** + * Returns the context name. + * + * @return The context name. + */ + public String name() { + return name; + } + + /** + * Returns the context serializer. + * + * @return The context serializer. + */ + public Alleycat serializer() { + return serializer; + } + + /** + * Schedules a runnable on the context. + * + * @param runnable The runnable to schedule. + * @param delay The delay at which to schedule the runnable. + * @param unit The time unit. + */ + public abstract ScheduledFuture schedule(Runnable runnable, long delay, TimeUnit unit); + + /** + * Schedules a runnable at a fixed rate on the context. + * + * @param runnable The runnable to schedule. + * @param delay The delay at which to schedule the runnable. + * @param unit The time unit. + */ + public abstract ScheduledFuture scheduleAtFixedRate(Runnable runnable, long delay, long rate, TimeUnit unit); + + /** + * Closes the context. + */ + @Override + public abstract void close(); + +} diff --git a/cluster/src/main/java/net/kuujo/copycat/util/ThreadChecker.java b/cluster/src/main/java/net/kuujo/copycat/util/ContextFactory.java similarity index 57% rename from cluster/src/main/java/net/kuujo/copycat/util/ThreadChecker.java rename to cluster/src/main/java/net/kuujo/copycat/util/ContextFactory.java index bfd1fd45f0..3a0aea9a7f 100644 --- a/cluster/src/main/java/net/kuujo/copycat/util/ThreadChecker.java +++ b/cluster/src/main/java/net/kuujo/copycat/util/ContextFactory.java @@ -15,26 +15,22 @@ */ package net.kuujo.copycat.util; +import net.kuujo.alleycat.Alleycat; + /** - * Context thread checker. + * Context factory. * * @author Jordan Halterman */ -public class ThreadChecker { - private final ExecutionContext context; - - public ThreadChecker(ExecutionContext context) { - this.context = context; - } +public interface ContextFactory { /** - * Checks that the current thread is the correct context thread. + * Creates a new execution context. + * + * @param name The context name. + * @param serializer The Alleycat serializer. + * @return The execution context. */ - public void checkThread() { - Thread thread = Thread.currentThread(); - if (!(thread instanceof net.kuujo.copycat.util.CopycatThread && ((net.kuujo.copycat.util.CopycatThread) thread).getContext() == context)) { - throw new IllegalStateException("not running on the correct thread"); - } - } + Context createContext(String name, Alleycat serializer); } diff --git a/cluster/src/main/java/net/kuujo/copycat/util/CopycatThread.java b/cluster/src/main/java/net/kuujo/copycat/util/CopycatThread.java index f8161bf321..22dc26e2aa 100644 --- a/cluster/src/main/java/net/kuujo/copycat/util/CopycatThread.java +++ b/cluster/src/main/java/net/kuujo/copycat/util/CopycatThread.java @@ -23,7 +23,7 @@ * @author Jordan Halterman */ public class CopycatThread extends Thread { - private WeakReference context; + private WeakReference context; public CopycatThread(Runnable target, String name) { super(target, name); @@ -32,14 +32,14 @@ public CopycatThread(Runnable target, String name) { /** * Sets the thread context. */ - public void setContext(ExecutionContext context) { + public void setContext(Context context) { this.context = new WeakReference<>(context); } /** * Returns the thread context. */ - public ExecutionContext getContext() { + public Context getContext() { return context.get(); } diff --git a/cluster/src/main/java/net/kuujo/copycat/util/ExecutionContext.java b/cluster/src/main/java/net/kuujo/copycat/util/ExecutionContext.java deleted file mode 100644 index 02933ddd93..0000000000 --- a/cluster/src/main/java/net/kuujo/copycat/util/ExecutionContext.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright 2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.kuujo.copycat.util; - -import net.kuujo.alleycat.Alleycat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.*; - -/** - * Execution context. - * - * @author Jordan Halterman - */ -public class ExecutionContext implements Executor, AutoCloseable { - private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionContext.class); - private final String name; - private final Alleycat alleycat; - private final ScheduledExecutorService executor; - private CopycatThread thread; - - /** - * Returns the current execution context. - */ - public static ExecutionContext currentContext() { - Thread thread = Thread.currentThread(); - return thread instanceof CopycatThread ? ((CopycatThread) thread).getContext() : null; - } - - public ExecutionContext(String name, Alleycat alleycat) { - if (name == null) - throw new NullPointerException("name cannot be null"); - this.name = name; - this.alleycat = alleycat.clone(); - this.executor = Executors.newSingleThreadScheduledExecutor(new CopycatThreadFactory(name)); - try { - executor.submit(() -> { - thread = (CopycatThread) Thread.currentThread(); - thread.setContext(this); - }).get(); - } catch (InterruptedException | ExecutionException e) { - throw new IllegalStateException("failed to initialize thread state", e); - } - } - - /** - * Returns the context name. - * - * @return The context name. - */ - public String name() { - return name; - } - - /** - * Returns the context serializer. - * - * @return The context serializer. - */ - public Alleycat alleycat() { - return alleycat; - } - - /** - * Executes a runnable on the context. - * - * @param runnable The runnable to execute. - */ - public void execute(Runnable runnable) { - executor.execute(wrapRunnable(runnable)); - } - - /** - * Schedules a runnable on the context. - * - * @param runnable The runnable to schedule. - * @param delay The delay at which to schedule the runnable. - * @param unit The time unit. - */ - public ScheduledFuture schedule(Runnable runnable, long delay, TimeUnit unit) { - return executor.schedule(wrapRunnable(runnable), delay, unit); - } - - /** - * Schedules a runnable at a fixed rate on the context. - * - * @param runnable The runnable to schedule. - * @param delay The delay at which to schedule the runnable. - * @param unit The time unit. - */ - public ScheduledFuture scheduleAtFixedRate(Runnable runnable, long delay, long rate, TimeUnit unit) { - return executor.scheduleAtFixedRate(wrapRunnable(runnable), delay, rate, unit); - } - - /** - * Wraps a runnable in an uncaught exception handler. - */ - private Runnable wrapRunnable(Runnable runnable) { - return () -> { - try { - runnable.run(); - } catch (RuntimeException e) { - LOGGER.error("An uncaught exception occurred: {}", e); - e.printStackTrace(); - } - }; - } - - @Override - public void close() { - executor.shutdown(); - } - -} diff --git a/netty/src/main/java/net/kuujo/copycat/cluster/NettyCluster.java b/netty/src/main/java/net/kuujo/copycat/cluster/NettyCluster.java index ac83e0e92f..d90f0e818e 100644 --- a/netty/src/main/java/net/kuujo/copycat/cluster/NettyCluster.java +++ b/netty/src/main/java/net/kuujo/copycat/cluster/NettyCluster.java @@ -20,7 +20,7 @@ import net.kuujo.alleycat.Alleycat; import net.kuujo.alleycat.ServiceLoaderResolver; import net.kuujo.copycat.ConfigurationException; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import java.net.InetSocketAddress; import java.util.Collection; @@ -55,7 +55,7 @@ public NettyCluster(EventLoopGroup eventLoopGroup, NettyLocalMember localMember, protected ManagedRemoteMember createMember(MemberInfo info) { ManagedRemoteMember remoteMember = new NettyRemoteMember((NettyMemberInfo) info, Member.Type.PASSIVE) .setEventLoopGroup(eventLoopGroup); - remoteMember.setContext(new ExecutionContext(String.format("copycat-cluster-%d", info.id()), alleycat)); + remoteMember.setContext(Context.createContext(String.format("copycat-cluster-%d", info.id()), alleycat)); return remoteMember; } diff --git a/netty/src/main/java/net/kuujo/copycat/cluster/NettyLocalMember.java b/netty/src/main/java/net/kuujo/copycat/cluster/NettyLocalMember.java index 30ff797c04..0dec2959c6 100644 --- a/netty/src/main/java/net/kuujo/copycat/cluster/NettyLocalMember.java +++ b/netty/src/main/java/net/kuujo/copycat/cluster/NettyLocalMember.java @@ -32,7 +32,7 @@ import io.netty.util.concurrent.GlobalEventExecutor; import net.kuujo.alleycat.util.ReferenceCounted; import net.kuujo.copycat.Task; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import net.kuujo.copycat.util.concurrent.Futures; import net.openhft.hashing.LongHashFunction; import org.slf4j.Logger; @@ -130,7 +130,7 @@ public CompletableFuture send(String topic, T message) { if (!listening) return Futures.exceptionalFuture(new IllegalStateException("member not open")); - ExecutionContext context = getContext(); + Context context = getContext(); CompletableFuture future = new CompletableFuture<>(); HandlerHolder handler = handlers.get(hashMap.computeIfAbsent(topic, this::hash32)); @@ -301,7 +301,7 @@ private void handleMessage(long requestId, ByteBuf request, ChannelHandlerContex NettyLocalMember.this.context.execute(() -> { ByteBufBuffer requestBuffer = BUFFER.get(); requestBuffer.setByteBuf(request); - Object deserializedRequest = alleycat.readObject(requestBuffer); + Object deserializedRequest = serializer.readObject(requestBuffer); handler.context.execute(() -> { handler.handler.handle(deserializedRequest).whenCompleteAsync((result, error) -> { @@ -315,7 +315,7 @@ private void handleMessage(long requestId, ByteBuf request, ChannelHandlerContex response.writeByte(STATUS_SUCCESS); ByteBufBuffer responseBuffer = BUFFER.get(); responseBuffer.setByteBuf(response); - alleycat.writeObject(result, responseBuffer); + serializer.writeObject(result, responseBuffer); context.writeAndFlush(response); } else { ByteBuf response = context.alloc().buffer(10, 1024 * 8); @@ -323,7 +323,7 @@ private void handleMessage(long requestId, ByteBuf request, ChannelHandlerContex response.writeByte(STATUS_FAILURE); ByteBufBuffer responseBuffer = BUFFER.get(); responseBuffer.setByteBuf(response); - alleycat.writeObject(error, responseBuffer); + serializer.writeObject(error, responseBuffer); context.writeAndFlush(response); } @@ -344,13 +344,13 @@ private void handleTask(long requestId, ByteBuf request, ChannelHandlerContext c getContext().execute(() -> { ByteBufBuffer requestBuffer = BUFFER.get(); requestBuffer.setByteBuf(request); - Task task = alleycat.readObject(requestBuffer); + Task task = serializer.readObject(requestBuffer); try { Object result = task.execute(); ByteBuf response = context.alloc().buffer(9, 1024 * 8); ByteBufBuffer responseBuffer = BUFFER.get(); responseBuffer.setByteBuf(response); - alleycat.writeObject(result, responseBuffer); + serializer.writeObject(result, responseBuffer); response.writeLong(requestId); context.writeAndFlush(response); request.release(); @@ -371,9 +371,9 @@ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { */ protected static class HandlerHolder { private final MessageHandler handler; - private final ExecutionContext context; + private final Context context; - private HandlerHolder(MessageHandler handler, ExecutionContext context) { + private HandlerHolder(MessageHandler handler, Context context) { this.handler = handler; this.context = context; } diff --git a/netty/src/main/java/net/kuujo/copycat/cluster/NettyMembers.java b/netty/src/main/java/net/kuujo/copycat/cluster/NettyMembers.java index 057f7f7625..aaa89075b9 100644 --- a/netty/src/main/java/net/kuujo/copycat/cluster/NettyMembers.java +++ b/netty/src/main/java/net/kuujo/copycat/cluster/NettyMembers.java @@ -19,7 +19,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import net.kuujo.alleycat.Alleycat; import net.kuujo.alleycat.ServiceLoaderResolver; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -53,7 +53,7 @@ public NettyMembers(EventLoopGroup eventLoopGroup, Collection protected ManagedRemoteMember createMember(MemberInfo info) { ManagedRemoteMember remoteMember = new NettyRemoteMember((NettyMemberInfo) info, Member.Type.PASSIVE) .setEventLoopGroup(eventLoopGroup); - remoteMember.setContext(new ExecutionContext(String.format("copycat-cluster-%d", info.id()), alleycat)); + remoteMember.setContext(Context.createContext(String.format("copycat-cluster-%d", info.id()), alleycat)); return remoteMember; } diff --git a/netty/src/main/java/net/kuujo/copycat/cluster/NettyRemoteMember.java b/netty/src/main/java/net/kuujo/copycat/cluster/NettyRemoteMember.java index 0ef194a2ab..5725a13806 100644 --- a/netty/src/main/java/net/kuujo/copycat/cluster/NettyRemoteMember.java +++ b/netty/src/main/java/net/kuujo/copycat/cluster/NettyRemoteMember.java @@ -25,7 +25,7 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import net.kuujo.copycat.Task; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import net.openhft.hashing.LongHashFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,7 +128,7 @@ public CompletableFuture send(String topic, T message) { ByteBuf byteBuf = context.alloc().buffer(13, 1024 * 32); buffer.setByteBuf(byteBuf); buffer.writeLong(requestId).writeByte(MESSAGE).writeInt(hashMap.computeIfAbsent(topic, this::hash32)); - alleycat.writeObject(message, buffer); + serializer.writeObject(message, buffer); channel.writeAndFlush(byteBuf).addListener((channelFuture) -> { if (channelFuture.isSuccess()) { responseFutures.put(requestId, future); @@ -162,7 +162,7 @@ public CompletableFuture submit(Task task) { ByteBuf byteBuf = context.alloc().buffer(9, 1024 * 32); buffer.setByteBuf(byteBuf); buffer.writeLong(requestId).writeByte(TASK); - alleycat.writeObject(task, buffer); + serializer.writeObject(task, buffer); channel.writeAndFlush(byteBuf).addListener((channelFuture) -> { if (channelFuture.isSuccess()) { responseFutures.put(requestId, future); @@ -269,7 +269,7 @@ public CompletableFuture close() { connecting.set(false); - ExecutionContext context = getContext(); + Context context = getContext(); synchronized (this) { if (closeFuture == null) { @@ -313,10 +313,10 @@ public boolean isClosed() { * Contextual future. */ private static class ContextualFuture extends CompletableFuture { - private final ExecutionContext context; + private final Context context; private final long timeout; - private ContextualFuture(ExecutionContext context, long timeout) { + private ContextualFuture(Context context, long timeout) { this.context = context; this.timeout = timeout; } @@ -341,7 +341,7 @@ public void channelRead(ChannelHandlerContext context, Object message) { int status = response.readByte(); ByteBufBuffer buffer = BUFFER.get(); buffer.setByteBuf(response.slice()); - Object result = alleycat.readObject(buffer); + Object result = serializer.readObject(buffer); responseFuture.context.execute(() -> { if (status == STATUS_FAILURE) { responseFuture.completeExceptionally((Exception) result); diff --git a/netty/src/main/java/net/kuujo/copycat/util/NettyContext.java b/netty/src/main/java/net/kuujo/copycat/util/NettyContext.java new file mode 100644 index 0000000000..4af666b971 --- /dev/null +++ b/netty/src/main/java/net/kuujo/copycat/util/NettyContext.java @@ -0,0 +1,94 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.kuujo.copycat.util; + +import io.netty.channel.EventLoop; +import net.kuujo.alleycat.Alleycat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Netty context. + * + * @author Jordan Halterman + */ +public class NettyContext extends Context { + private static final Logger LOGGER = LoggerFactory.getLogger(NettyContext.class); + private final EventLoop eventLoop; + + public NettyContext(String name, EventLoop eventLoop, Alleycat serializer) { + super(name, getThread(eventLoop), serializer); + this.eventLoop = eventLoop; + } + + private static CopycatThread getThread(EventLoop eventLoop) { + final AtomicReference thread = new AtomicReference<>(); + try { + eventLoop.submit(new Runnable() { + @Override + public void run() { + thread.set((CopycatThread) Thread.currentThread()); + } + }).get(); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException("failed to initialize thread state", e); + } + return thread.get(); + } + + @Override + public void execute(Runnable command) { + eventLoop.execute(command); + } + + @Override + public ScheduledFuture schedule(Runnable runnable, long delay, TimeUnit unit) { + return eventLoop.schedule(wrapRunnable(runnable), delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable runnable, long delay, long rate, TimeUnit unit) { + return eventLoop.scheduleAtFixedRate(wrapRunnable(runnable), delay, rate, unit); + } + + /** + * Wraps a runnable in an uncaught exception handler. + */ + private Runnable wrapRunnable(final Runnable runnable) { + return new Runnable() { + @Override + public void run() { + try { + runnable.run(); + } catch (RuntimeException e) { + LOGGER.error("An uncaught exception occurred: {}", e); + e.printStackTrace(); + } + } + }; + } + + @Override + public void close() { + eventLoop.shutdownGracefully(); + } + +} diff --git a/netty/src/main/java/net/kuujo/copycat/util/NettyContextFactory.java b/netty/src/main/java/net/kuujo/copycat/util/NettyContextFactory.java new file mode 100644 index 0000000000..82af32b281 --- /dev/null +++ b/netty/src/main/java/net/kuujo/copycat/util/NettyContextFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.kuujo.copycat.util; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import net.kuujo.alleycat.Alleycat; + +/** + * Netty context factory. + * + * @author Jordan Halterman + */ +public class NettyContextFactory implements ContextFactory { + private final EventLoopGroup group = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2, new CopycatThreadFactory("copycat-thread-%d")); + + @Override + public Context createContext(String name, Alleycat serializer) { + return new NettyContext(name, group.next(), serializer); + } + +} diff --git a/netty/src/main/resources/META-INF/services/net.kuujo.copycat.util.ContextFactory b/netty/src/main/resources/META-INF/services/net.kuujo.copycat.util.ContextFactory new file mode 100644 index 0000000000..b00bbeb1c5 --- /dev/null +++ b/netty/src/main/resources/META-INF/services/net.kuujo.copycat.util.ContextFactory @@ -0,0 +1,16 @@ +# +# Copyright 2015 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +net.kuujo.copycat.util.NettyContextFactory diff --git a/netty/src/test/java/net/kuujo/copycat/cluster/NettyClusterTest.java b/netty/src/test/java/net/kuujo/copycat/cluster/NettyClusterTest.java index 9123c199c0..02227ee881 100644 --- a/netty/src/test/java/net/kuujo/copycat/cluster/NettyClusterTest.java +++ b/netty/src/test/java/net/kuujo/copycat/cluster/NettyClusterTest.java @@ -23,7 +23,7 @@ import net.kuujo.alleycat.ServiceLoaderResolver; import net.kuujo.alleycat.io.Buffer; import net.kuujo.copycat.Task; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import org.testng.annotations.Test; import java.net.InetSocketAddress; @@ -44,7 +44,7 @@ public class NettyClusterTest extends ConcurrentTestCase { */ public void testConnectRemoteToLocal() throws Throwable { NettyLocalMember localMember = new NettyLocalMember(new NettyMemberInfo(1, new InetSocketAddress("localhost", 8080)), Member.Type.ACTIVE); - localMember.setContext(new ExecutionContext("test-server", new Alleycat(new ServiceLoaderResolver()))); + localMember.setContext(Context.createContext("test-server", new Alleycat(new ServiceLoaderResolver()))); expectResume(); localMember.open().thenRun(this::resume); @@ -52,7 +52,7 @@ public void testConnectRemoteToLocal() throws Throwable { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); NettyRemoteMember remoteMember = new NettyRemoteMember(new NettyMemberInfo(1, new InetSocketAddress("localhost", 8080)), Member.Type.ACTIVE); - remoteMember.setContext(new ExecutionContext("test-client", new Alleycat(new ServiceLoaderResolver()))); + remoteMember.setContext(Context.createContext("test-client", new Alleycat(new ServiceLoaderResolver()))); remoteMember.setEventLoopGroup(eventLoopGroup); expectResume(); @@ -75,14 +75,14 @@ public void testConnectRemoteToLocal() throws Throwable { public void testConnectRemoteBeforeLocal() throws Throwable { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); NettyRemoteMember remoteMember = new NettyRemoteMember(new NettyMemberInfo(1, new InetSocketAddress("localhost", 8081)), Member.Type.ACTIVE); - remoteMember.setContext(new ExecutionContext("test-client", new Alleycat(new ServiceLoaderResolver()))); + remoteMember.setContext(Context.createContext("test-client", new Alleycat(new ServiceLoaderResolver()))); remoteMember.setEventLoopGroup(eventLoopGroup); expectResumes(2); remoteMember.open().thenRun(this::resume); NettyLocalMember localMember = new NettyLocalMember(new NettyMemberInfo(1, new InetSocketAddress("localhost", 8081)), Member.Type.ACTIVE); - localMember.setContext(new ExecutionContext("test-server", new Alleycat(new ServiceLoaderResolver()))); + localMember.setContext(Context.createContext("test-server", new Alleycat(new ServiceLoaderResolver()))); localMember.open().thenRun(this::resume); await(); @@ -102,7 +102,7 @@ public void testConnectRemoteBeforeLocal() throws Throwable { */ public void testMessageRemoteToLocal() throws Throwable { NettyLocalMember localMember = new NettyLocalMember(new NettyMemberInfo(1, new InetSocketAddress("localhost", 8082)), Member.Type.ACTIVE); - localMember.setContext(new ExecutionContext("test-server", new Alleycat(new ServiceLoaderResolver()))); + localMember.setContext(Context.createContext("test-server", new Alleycat(new ServiceLoaderResolver()))); expectResume(); localMember.open().thenRun(this::resume); @@ -110,7 +110,7 @@ public void testMessageRemoteToLocal() throws Throwable { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); NettyRemoteMember remoteMember = new NettyRemoteMember(new NettyMemberInfo(1, new InetSocketAddress("localhost", 8082)), Member.Type.ACTIVE); - remoteMember.setContext(new ExecutionContext("test-client", new Alleycat(new ServiceLoaderResolver()))); + remoteMember.setContext(Context.createContext("test-client", new Alleycat(new ServiceLoaderResolver()))); remoteMember.setEventLoopGroup(eventLoopGroup); expectResume(); @@ -141,7 +141,7 @@ public void testMessageRemoteToLocal() throws Throwable { */ public void testTaskRemoteToLocal() throws Throwable { NettyLocalMember localMember = new NettyLocalMember(new NettyMemberInfo(1, new InetSocketAddress("localhost", 8083)), Member.Type.ACTIVE); - localMember.setContext(new ExecutionContext("test-server", new Alleycat(new ServiceLoaderResolver()))); + localMember.setContext(Context.createContext("test-server", new Alleycat(new ServiceLoaderResolver()))); expectResume(); localMember.open().thenRun(this::resume); @@ -149,7 +149,7 @@ public void testTaskRemoteToLocal() throws Throwable { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); NettyRemoteMember remoteMember = new NettyRemoteMember(new NettyMemberInfo(1, new InetSocketAddress("localhost", 8083)), Member.Type.ACTIVE); - remoteMember.setContext(new ExecutionContext("test-client", new Alleycat(new ServiceLoaderResolver()))); + remoteMember.setContext(Context.createContext("test-client", new Alleycat(new ServiceLoaderResolver()))); remoteMember.setEventLoopGroup(eventLoopGroup); expectResume(); diff --git a/raft/src/main/java/net/kuujo/copycat/raft/Raft.java b/raft/src/main/java/net/kuujo/copycat/raft/Raft.java index d59347356e..0400ee3c60 100644 --- a/raft/src/main/java/net/kuujo/copycat/raft/Raft.java +++ b/raft/src/main/java/net/kuujo/copycat/raft/Raft.java @@ -21,7 +21,7 @@ import net.kuujo.copycat.cluster.Member; import net.kuujo.copycat.raft.log.Log; import net.kuujo.copycat.raft.state.RaftStateContext; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -354,7 +354,7 @@ public Raft build() { if (log == null) throw new NullPointerException("log cannot be null"); - RaftStateContext context = (RaftStateContext) new RaftStateContext(log, stateMachine, cluster, new ExecutionContext(cluster.member().id() != 0 ? String.format("copycat-%d", cluster.member().id()) : "copycat", cluster.alleycat().clone())) + RaftStateContext context = (RaftStateContext) new RaftStateContext(log, stateMachine, cluster, Context.createContext(cluster.member().id() != 0 ? String.format("copycat-%d", cluster.member().id()) : "copycat", cluster.alleycat().clone())) .setHeartbeatInterval(config.getHeartbeatInterval()) .setElectionTimeout(config.getElectionTimeout()) .setSessionTimeout(config.getSessionTimeout()) diff --git a/raft/src/main/java/net/kuujo/copycat/raft/RaftClient.java b/raft/src/main/java/net/kuujo/copycat/raft/RaftClient.java index bee1cb3e82..a7f2aa8d20 100644 --- a/raft/src/main/java/net/kuujo/copycat/raft/RaftClient.java +++ b/raft/src/main/java/net/kuujo/copycat/raft/RaftClient.java @@ -17,7 +17,7 @@ import net.kuujo.copycat.cluster.ManagedMembers; import net.kuujo.copycat.raft.state.RaftStateClient; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -175,7 +175,7 @@ public Builder withMembers(ManagedMembers members) { @Override public RaftClient build() { - return new RaftClient(new RaftStateClient(members, new ExecutionContext("copycat-client-%d", members.alleycat().clone())).setKeepAliveInterval(keepAliveInterval)); + return new RaftClient(new RaftStateClient(members, Context.createContext("copycat-client-%d", members.alleycat().clone())).setKeepAliveInterval(keepAliveInterval)); } } diff --git a/raft/src/main/java/net/kuujo/copycat/raft/log/Compactor.java b/raft/src/main/java/net/kuujo/copycat/raft/log/Compactor.java index fff24f7205..70b21961e2 100644 --- a/raft/src/main/java/net/kuujo/copycat/raft/log/Compactor.java +++ b/raft/src/main/java/net/kuujo/copycat/raft/log/Compactor.java @@ -16,7 +16,7 @@ package net.kuujo.copycat.raft.log; import net.kuujo.copycat.raft.log.entry.EntryFilter; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,7 +127,7 @@ public Compactor withMajorCompactionInterval(long compactionInterval) { /** * Opens the log compactor. */ - public void open(ExecutionContext context) { + public void open(Context context) { scheduledFuture = context.scheduleAtFixedRate(() -> compact(context), minorCompactionInterval, minorCompactionInterval, TimeUnit.MILLISECONDS); } @@ -152,7 +152,7 @@ public void setCompactIndex(long index) { /** * Compacts the log. */ - synchronized CompletableFuture compact(ExecutionContext context) { + synchronized CompletableFuture compact(Context context) { if (compactFuture != null) { return compactFuture; } diff --git a/raft/src/main/java/net/kuujo/copycat/raft/log/Log.java b/raft/src/main/java/net/kuujo/copycat/raft/log/Log.java index bc9df68c6d..6c0d1e0e7a 100644 --- a/raft/src/main/java/net/kuujo/copycat/raft/log/Log.java +++ b/raft/src/main/java/net/kuujo/copycat/raft/log/Log.java @@ -17,7 +17,7 @@ import net.kuujo.copycat.raft.log.entry.Entry; import net.kuujo.copycat.raft.log.entry.TypedEntryPool; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import java.io.File; import java.util.concurrent.TimeUnit; @@ -52,7 +52,7 @@ protected Log(SegmentManager segments) { * * @param context The context in which to open the log. */ - public void open(ExecutionContext context) { + public void open(Context context) { segments.open(context); compactor.open(context); open = true; diff --git a/raft/src/main/java/net/kuujo/copycat/raft/log/MajorCompaction.java b/raft/src/main/java/net/kuujo/copycat/raft/log/MajorCompaction.java index d55d17f22b..cbbde55017 100644 --- a/raft/src/main/java/net/kuujo/copycat/raft/log/MajorCompaction.java +++ b/raft/src/main/java/net/kuujo/copycat/raft/log/MajorCompaction.java @@ -17,7 +17,7 @@ import net.kuujo.copycat.raft.log.entry.Entry; import net.kuujo.copycat.raft.log.entry.EntryFilter; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,9 +34,9 @@ public class MajorCompaction extends Compaction { private final Logger LOGGER = LoggerFactory.getLogger(MajorCompaction.class); private final EntryFilter filter; - private final ExecutionContext context; + private final Context context; - public MajorCompaction(long index, EntryFilter filter, ExecutionContext context) { + public MajorCompaction(long index, EntryFilter filter, Context context) { super(index); this.filter = filter; this.context = context; diff --git a/raft/src/main/java/net/kuujo/copycat/raft/log/MinorCompaction.java b/raft/src/main/java/net/kuujo/copycat/raft/log/MinorCompaction.java index 78d51d6f06..d7aad2dd4a 100644 --- a/raft/src/main/java/net/kuujo/copycat/raft/log/MinorCompaction.java +++ b/raft/src/main/java/net/kuujo/copycat/raft/log/MinorCompaction.java @@ -17,7 +17,7 @@ import net.kuujo.copycat.raft.log.entry.Entry; import net.kuujo.copycat.raft.log.entry.EntryFilter; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,9 +33,9 @@ public class MinorCompaction extends Compaction { private final Logger LOGGER = LoggerFactory.getLogger(MinorCompaction.class); private final EntryFilter filter; - private final ExecutionContext context; + private final Context context; - public MinorCompaction(long index, EntryFilter filter, ExecutionContext context) { + public MinorCompaction(long index, EntryFilter filter, Context context) { super(index); this.filter = filter; this.context = context; diff --git a/raft/src/main/java/net/kuujo/copycat/raft/log/Segment.java b/raft/src/main/java/net/kuujo/copycat/raft/log/Segment.java index 8bafdcb5cf..4647702905 100644 --- a/raft/src/main/java/net/kuujo/copycat/raft/log/Segment.java +++ b/raft/src/main/java/net/kuujo/copycat/raft/log/Segment.java @@ -18,7 +18,7 @@ import net.kuujo.alleycat.Alleycat; import net.kuujo.alleycat.io.Buffer; import net.kuujo.copycat.raft.log.entry.Entry; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; /** * Log segment. @@ -36,7 +36,7 @@ public class Segment implements AutoCloseable { * @param context The segment execution context. * @return The opened segment. */ - public static Segment open(Buffer buffer, SegmentDescriptor descriptor, OffsetIndex index, ExecutionContext context) { + public static Segment open(Buffer buffer, SegmentDescriptor descriptor, OffsetIndex index, Context context) { return new Segment(buffer, descriptor, index, context); } @@ -49,7 +49,7 @@ public static Segment open(Buffer buffer, SegmentDescriptor descriptor, OffsetIn private int skip = 0; private boolean open = true; - Segment(Buffer buffer, SegmentDescriptor descriptor, OffsetIndex offsetIndex, ExecutionContext context) { + Segment(Buffer buffer, SegmentDescriptor descriptor, OffsetIndex offsetIndex, Context context) { if (buffer == null) throw new NullPointerException("buffer cannot be null"); if (descriptor == null) @@ -60,7 +60,7 @@ public static Segment open(Buffer buffer, SegmentDescriptor descriptor, OffsetIn throw new NullPointerException("context cannot be null"); this.source = buffer; - this.alleycat = context.alleycat(); + this.alleycat = context.serializer(); this.writeBuffer = buffer.slice(); this.readBuffer = writeBuffer.asReadOnlyBuffer(); this.descriptor = descriptor; diff --git a/raft/src/main/java/net/kuujo/copycat/raft/log/SegmentManager.java b/raft/src/main/java/net/kuujo/copycat/raft/log/SegmentManager.java index 189c84d809..73063132ae 100644 --- a/raft/src/main/java/net/kuujo/copycat/raft/log/SegmentManager.java +++ b/raft/src/main/java/net/kuujo/copycat/raft/log/SegmentManager.java @@ -19,7 +19,7 @@ import net.kuujo.alleycat.io.FileBuffer; import net.kuujo.alleycat.io.HeapBuffer; import net.kuujo.copycat.ConfigurationException; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +39,7 @@ public class SegmentManager implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentManager.class); protected final LogConfig config; private NavigableMap segments = new ConcurrentSkipListMap<>(); - private ExecutionContext context; + private Context context; private Segment currentSegment; public SegmentManager(LogConfig config) { @@ -62,7 +62,7 @@ LogConfig config() { * * @param context The context in which to open the segments. */ - public void open(ExecutionContext context) { + public void open(Context context) { this.context = context; // Load existing log segments from disk. diff --git a/raft/src/main/java/net/kuujo/copycat/raft/state/RaftState.java b/raft/src/main/java/net/kuujo/copycat/raft/state/RaftState.java index 6e3673ea29..2ef60c7f5c 100644 --- a/raft/src/main/java/net/kuujo/copycat/raft/state/RaftState.java +++ b/raft/src/main/java/net/kuujo/copycat/raft/state/RaftState.java @@ -22,7 +22,7 @@ import net.kuujo.copycat.raft.*; import net.kuujo.copycat.raft.log.Compaction; import net.kuujo.copycat.raft.log.entry.*; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import net.kuujo.copycat.util.concurrent.Futures; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,13 +40,13 @@ class RaftState { private final StateMachine stateMachine; private final ManagedCluster cluster; private final ClusterState members; - private final ExecutionContext context; + private final Context context; private final Map sessions = new HashMap<>(); private final Map> queries = new HashMap<>(); private long sessionTimeout = 5000; private long lastApplied; - public RaftState(StateMachine stateMachine, ManagedCluster cluster, ClusterState members, ExecutionContext context) { + public RaftState(StateMachine stateMachine, ManagedCluster cluster, ClusterState members, Context context) { this.stateMachine = stateMachine; this.cluster = cluster; this.members = members; diff --git a/raft/src/main/java/net/kuujo/copycat/raft/state/RaftStateClient.java b/raft/src/main/java/net/kuujo/copycat/raft/state/RaftStateClient.java index 2c8b18e7bd..98a1e3e152 100644 --- a/raft/src/main/java/net/kuujo/copycat/raft/state/RaftStateClient.java +++ b/raft/src/main/java/net/kuujo/copycat/raft/state/RaftStateClient.java @@ -21,9 +21,8 @@ import net.kuujo.copycat.cluster.MemberInfo; import net.kuujo.copycat.raft.*; import net.kuujo.copycat.raft.rpc.*; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import net.kuujo.copycat.util.Managed; -import net.kuujo.copycat.util.ThreadChecker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +45,7 @@ public class RaftStateClient implements Managed { private static final Logger LOGGER = LoggerFactory.getLogger(RaftStateClient.class); private static final long REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(10); private final ManagedMembers members; - private final ExecutionContext context; - private final ThreadChecker threadChecker; + private final Context context; private CompletableFuture registerFuture; private final AtomicBoolean keepAlive = new AtomicBoolean(); private final Random random = new Random(); @@ -63,12 +61,11 @@ public class RaftStateClient implements Managed { private volatile long response; private volatile long version; - public RaftStateClient(ManagedMembers members, ExecutionContext context) { + public RaftStateClient(ManagedMembers members, Context context) { if (members == null) throw new NullPointerException("members cannot be null"); this.members = members; this.context = context; - this.threadChecker = new ThreadChecker(context); } /** @@ -488,7 +485,7 @@ protected Member selectMember(Query query) { * Registers the client. */ private CompletableFuture register() { - threadChecker.checkThread(); + context.checkThread(); if (registerFuture == null) { registerFuture = register(100, new CompletableFuture<>()).whenComplete((result, error) -> registerFuture = null); } @@ -500,7 +497,7 @@ private CompletableFuture register() { */ private CompletableFuture register(long interval, CompletableFuture future) { register(new ArrayList<>(members.members())).whenCompleteAsync((result, error) -> { - threadChecker.checkThread(); + context.checkThread(); if (error == null) { future.complete(null); } else { @@ -537,7 +534,7 @@ protected CompletableFuture register(List members, Com RegisterRequest request = RegisterRequest.builder().build(); LOGGER.debug("Sending {} to {}", request, member); member.send(request).whenComplete((response, error) -> { - threadChecker.checkThread(); + context.checkThread(); if (error == null && response.status() == Response.Status.OK) { future.complete(response); LOGGER.debug("Registered new session: {}", getSession()); @@ -598,7 +595,7 @@ protected CompletableFuture keepAlive(List members, C .build(); LOGGER.debug("Sending {} to {}", request, member); member.send(request).whenComplete((response, error) -> { - threadChecker.checkThread(); + context.checkThread(); if (isOpen()) { if (error == null && response.status() == Response.Status.OK) { future.complete(response); diff --git a/raft/src/main/java/net/kuujo/copycat/raft/state/RaftStateContext.java b/raft/src/main/java/net/kuujo/copycat/raft/state/RaftStateContext.java index f0bdef5390..1e302e98ff 100644 --- a/raft/src/main/java/net/kuujo/copycat/raft/state/RaftStateContext.java +++ b/raft/src/main/java/net/kuujo/copycat/raft/state/RaftStateContext.java @@ -21,8 +21,7 @@ import net.kuujo.copycat.raft.*; import net.kuujo.copycat.raft.log.Log; import net.kuujo.copycat.raft.rpc.*; -import net.kuujo.copycat.util.ExecutionContext; -import net.kuujo.copycat.util.ThreadChecker; +import net.kuujo.copycat.util.Context; import net.kuujo.copycat.util.concurrent.Futures; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +47,7 @@ public class RaftStateContext extends RaftStateClient { private final Log log; private final ManagedCluster cluster; private final ClusterState members = new ClusterState(); - private final ExecutionContext context; - private final ThreadChecker threadChecker; + private final Context context; private AbstractState state; private ScheduledFuture joinTimer; private ScheduledFuture heartbeatTimer; @@ -61,13 +59,12 @@ public class RaftStateContext extends RaftStateClient { private long globalIndex; private volatile boolean open; - public RaftStateContext(Log log, StateMachine stateMachine, ManagedCluster cluster, ExecutionContext context) { - super(cluster, new ExecutionContext(String.format("%s-client", context.name()), context.alleycat().clone())); + public RaftStateContext(Log log, StateMachine stateMachine, ManagedCluster cluster, Context context) { + super(cluster, Context.createContext(String.format("%s-client", context.name()), context.serializer().clone())); this.log = log; - this.stateMachine = new RaftState(stateMachine, cluster, members, new ExecutionContext(String.format("%s-state", context.name()), context.alleycat().clone())); + this.stateMachine = new RaftState(stateMachine, cluster, members, Context.createContext(String.format("%s-state", context.name()), context.serializer().clone())); this.cluster = cluster; this.context = context; - this.threadChecker = new ThreadChecker(context); log.compactor().filter(this.stateMachine::filter); @@ -99,7 +96,7 @@ public Alleycat getSerializer() { * * @return The execution context. */ - public ExecutionContext getContext() { + public Context getContext() { return context; } @@ -333,7 +330,7 @@ public Log getLog() { * Checks that the current thread is the state context thread. */ void checkThread() { - threadChecker.checkThread(); + context.checkThread(); } @Override @@ -439,7 +436,7 @@ private CompletableFuture join(Member member, List members, Comple .build(); LOGGER.debug("Sending {} to {}", request, member); member.send(request).whenComplete((response, error) -> { - threadChecker.checkThread(); + context.checkThread(); if (error == null && response.status() == Response.Status.OK) { setLeader(response.leader()); setTerm(response.term()); @@ -498,7 +495,7 @@ private CompletableFuture heartbeat(Member member, List members, C .build(); LOGGER.debug("Sending {} to {}", request, member); member.send(request).whenComplete((response, error) -> { - threadChecker.checkThread(); + context.checkThread(); if (isOpen()) { if (error == null && response.status() == Response.Status.OK) { setLeader(response.leader()); @@ -544,7 +541,7 @@ private CompletableFuture leave(Member member, List members, Compl .build(); LOGGER.debug("Sending {} to {}", request, member); member.send(request).whenComplete((response, error) -> { - threadChecker.checkThread(); + context.checkThread(); if (error == null && response.status() == Response.Status.OK) { future.complete(null); LOGGER.info("{} - Left cluster", cluster.member().id()); diff --git a/raft/src/test/java/net/kuujo/copycat/raft/log/LogTest.java b/raft/src/test/java/net/kuujo/copycat/raft/log/LogTest.java index 24946f7969..d55a63ec24 100644 --- a/raft/src/test/java/net/kuujo/copycat/raft/log/LogTest.java +++ b/raft/src/test/java/net/kuujo/copycat/raft/log/LogTest.java @@ -20,7 +20,7 @@ import net.kuujo.copycat.raft.Command; import net.kuujo.copycat.raft.log.entry.CommandEntry; import net.kuujo.copycat.raft.log.entry.NoOpEntry; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import org.testng.annotations.Test; import java.io.Serializable; @@ -269,8 +269,8 @@ public void testSkipOnRollOver() { /** * Creates a test execution context. */ - private ExecutionContext createContext() { - return new ExecutionContext("test", new Alleycat(new ServiceLoaderResolver())); + private Context createContext() { + return Context.createContext("test", new Alleycat(new ServiceLoaderResolver())); } /** diff --git a/raft/src/test/java/net/kuujo/copycat/raft/log/MajorCompactionTest.java b/raft/src/test/java/net/kuujo/copycat/raft/log/MajorCompactionTest.java index c68d5dda49..a3b607bc87 100644 --- a/raft/src/test/java/net/kuujo/copycat/raft/log/MajorCompactionTest.java +++ b/raft/src/test/java/net/kuujo/copycat/raft/log/MajorCompactionTest.java @@ -20,7 +20,7 @@ import net.kuujo.alleycat.ServiceLoaderResolver; import net.kuujo.copycat.raft.log.entry.KeepAliveEntry; import net.kuujo.copycat.raft.log.entry.NoOpEntry; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import org.testng.annotations.Test; import java.util.concurrent.CompletableFuture; @@ -42,7 +42,7 @@ public void testCompact() throws Throwable { .withMaxEntriesPerSegment(128) .build(); - ExecutionContext context = new ExecutionContext("test", new Alleycat(new ServiceLoaderResolver())); + Context context = Context.createContext("test", new Alleycat(new ServiceLoaderResolver())); log.open(context); diff --git a/raft/src/test/java/net/kuujo/copycat/raft/log/MinorCompactionTest.java b/raft/src/test/java/net/kuujo/copycat/raft/log/MinorCompactionTest.java index aba69a7741..51fb76b3f9 100644 --- a/raft/src/test/java/net/kuujo/copycat/raft/log/MinorCompactionTest.java +++ b/raft/src/test/java/net/kuujo/copycat/raft/log/MinorCompactionTest.java @@ -20,7 +20,7 @@ import net.kuujo.alleycat.ServiceLoaderResolver; import net.kuujo.copycat.raft.log.entry.KeepAliveEntry; import net.kuujo.copycat.raft.log.entry.NoOpEntry; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import org.testng.annotations.Test; import java.util.concurrent.CompletableFuture; @@ -42,7 +42,7 @@ public void testCompact() throws Throwable { .withMaxEntriesPerSegment(128) .build(); - ExecutionContext context = new ExecutionContext("test", new Alleycat(new ServiceLoaderResolver())); + Context context = Context.createContext("test", new Alleycat(new ServiceLoaderResolver())); log.open(context); diff --git a/test/src/main/java/net/kuujo/copycat/cluster/TestCluster.java b/test/src/main/java/net/kuujo/copycat/cluster/TestCluster.java index 494fbfd851..4435a30554 100644 --- a/test/src/main/java/net/kuujo/copycat/cluster/TestCluster.java +++ b/test/src/main/java/net/kuujo/copycat/cluster/TestCluster.java @@ -18,7 +18,7 @@ import net.kuujo.alleycat.Alleycat; import net.kuujo.alleycat.ServiceLoaderResolver; import net.kuujo.copycat.ConfigurationException; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import java.util.Collection; import java.util.stream.Collectors; @@ -49,7 +49,7 @@ public TestCluster(TestLocalMember localMember, Collection CompletableFuture send(Class type, T message) { @Override @SuppressWarnings("unchecked") public CompletableFuture send(String topic, T message) { - ExecutionContext context = getContext(); + Context context = getContext(); HandlerHolder handler = handlers.get(topic); if (handler != null) { @@ -114,11 +114,11 @@ protected CompletableFuture receive(String topic, Buffer buffer) { HandlerHolder handler = handlers.get(topic); if (handler != null) { CompletableFuture future = new CompletableFuture<>(); - Object message = alleycat.readObject(buffer); + Object message = serializer.readObject(buffer); handler.context.execute(() -> { handler.handler.handle(message).whenCompleteAsync((result, error) -> { if (error == null) { - future.complete(alleycat.writeObject(result).flip()); + future.complete(serializer.writeObject(result).flip()); } else { future.completeExceptionally(new ClusterException(error)); } @@ -181,9 +181,9 @@ public boolean equals(Object object) { */ protected static class HandlerHolder { private final MessageHandler handler; - private final ExecutionContext context; + private final Context context; - private HandlerHolder(MessageHandler handler, ExecutionContext context) { + private HandlerHolder(MessageHandler handler, Context context) { this.handler = handler; this.context = context; } diff --git a/test/src/main/java/net/kuujo/copycat/cluster/TestRemoteMember.java b/test/src/main/java/net/kuujo/copycat/cluster/TestRemoteMember.java index 60eeaf584e..b15799cf57 100644 --- a/test/src/main/java/net/kuujo/copycat/cluster/TestRemoteMember.java +++ b/test/src/main/java/net/kuujo/copycat/cluster/TestRemoteMember.java @@ -17,7 +17,7 @@ import net.kuujo.alleycat.io.Buffer; import net.kuujo.copycat.Task; -import net.kuujo.copycat.util.ExecutionContext; +import net.kuujo.copycat.util.Context; import net.kuujo.copycat.util.concurrent.Futures; import java.util.concurrent.CompletableFuture; @@ -84,14 +84,14 @@ public CompletableFuture send(String topic, T message) { if (member == null) return Futures.exceptionalFuture(new ClusterException("invalid member")); - ExecutionContext context = getContext(); + Context context = getContext(); CompletableFuture future = new CompletableFuture<>(); this.context.execute(() -> { - Buffer buffer = alleycat.writeObject(message).flip(); + Buffer buffer = serializer.writeObject(message).flip(); member.receive(topic, buffer).whenCompleteAsync((result, error) -> { if (error == null) { - context.execute(() -> future.complete(alleycat.readObject(result))); + context.execute(() -> future.complete(serializer.readObject(result))); } else { context.execute(() -> future.completeExceptionally(error)); } @@ -114,7 +114,7 @@ public CompletableFuture submit(Task task) { if (member == null) return Futures.exceptionalFuture(new ClusterException("invalid member")); - ExecutionContext context = getContext(); + Context context = getContext(); CompletableFuture future = new CompletableFuture<>(); member.submit(task).whenComplete((result, error) -> { context.execute(() -> {