Skip to content

Commit

Permalink
Add thread context/implementations for managing threads in Raft server.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 14, 2017
1 parent b0b5926 commit 4366d64
Show file tree
Hide file tree
Showing 17 changed files with 560 additions and 21 deletions.
Expand Up @@ -24,9 +24,9 @@
import io.atomix.protocols.raft.server.state.StateMachineRegistry; import io.atomix.protocols.raft.server.state.StateMachineRegistry;
import io.atomix.protocols.raft.server.storage.Storage; import io.atomix.protocols.raft.server.storage.Storage;
import io.atomix.util.concurrent.Futures; import io.atomix.util.concurrent.Futures;
import io.atomix.util.temp.CatalystThreadFactory; import io.atomix.util.concurrent.AtomixThreadFactory;
import io.atomix.util.temp.SingleThreadContext; import io.atomix.util.concurrent.SingleThreadContext;
import io.atomix.util.temp.ThreadContext; import io.atomix.util.concurrent.ThreadContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -816,7 +816,7 @@ public RaftServer build() {
} }


ThreadContext threadContext = new SingleThreadContext(String.format("copycat-server-%s-%s", localNodeId, name)); ThreadContext threadContext = new SingleThreadContext(String.format("copycat-server-%s-%s", localNodeId, name));
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(threadPoolSize, new CatalystThreadFactory("copycat-" + name + "-state-%d")); ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(threadPoolSize, new AtomixThreadFactory("copycat-" + name + "-state-%d"));


ServerContext context = new ServerContext(name, type, localNodeId, protocol, storage, stateMachineRegistry, threadPool, threadContext); ServerContext context = new ServerContext(name, type, localNodeId, protocol, storage, stateMachineRegistry, threadPool, threadContext);
context.setElectionTimeout(electionTimeout) context.setElectionTimeout(electionTimeout)
Expand Down
Expand Up @@ -17,7 +17,7 @@
package io.atomix.protocols.raft.server; package io.atomix.protocols.raft.server;


import io.atomix.protocols.raft.RaftOperation; import io.atomix.protocols.raft.RaftOperation;
import io.atomix.util.temp.ThreadContext; import io.atomix.util.concurrent.ThreadContext;


import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
Expand Down
Expand Up @@ -23,7 +23,7 @@
import io.atomix.protocols.raft.server.RaftServer; import io.atomix.protocols.raft.server.RaftServer;
import io.atomix.protocols.raft.server.storage.Indexed; import io.atomix.protocols.raft.server.storage.Indexed;
import io.atomix.protocols.raft.server.util.Quorum; import io.atomix.protocols.raft.server.util.Quorum;
import io.atomix.util.temp.Scheduled; import io.atomix.util.concurrent.Scheduled;


import java.time.Duration; import java.time.Duration;
import java.util.HashSet; import java.util.HashSet;
Expand Down
Expand Up @@ -27,7 +27,7 @@
import io.atomix.protocols.raft.server.RaftServer; import io.atomix.protocols.raft.server.RaftServer;
import io.atomix.protocols.raft.server.storage.Indexed; import io.atomix.protocols.raft.server.storage.Indexed;
import io.atomix.protocols.raft.server.util.Quorum; import io.atomix.protocols.raft.server.util.Quorum;
import io.atomix.util.temp.Scheduled; import io.atomix.util.concurrent.Scheduled;


import java.time.Duration; import java.time.Duration;
import java.util.HashSet; import java.util.HashSet;
Expand Down
Expand Up @@ -56,7 +56,7 @@
import io.atomix.protocols.raft.server.storage.entry.QueryEntry; import io.atomix.protocols.raft.server.storage.entry.QueryEntry;
import io.atomix.protocols.raft.server.storage.system.Configuration; import io.atomix.protocols.raft.server.storage.system.Configuration;
import io.atomix.util.concurrent.Futures; import io.atomix.util.concurrent.Futures;
import io.atomix.util.temp.Scheduled; import io.atomix.util.concurrent.Scheduled;


import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
Expand Down
Expand Up @@ -18,7 +18,7 @@
import io.atomix.protocols.raft.server.storage.Log; import io.atomix.protocols.raft.server.storage.Log;
import io.atomix.protocols.raft.server.storage.LogReader; import io.atomix.protocols.raft.server.storage.LogReader;
import io.atomix.protocols.raft.server.storage.Reader; import io.atomix.protocols.raft.server.storage.Reader;
import io.atomix.util.temp.ThreadContext; import io.atomix.util.concurrent.ThreadContext;


import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
Expand Down
Expand Up @@ -25,9 +25,9 @@
import io.atomix.protocols.raft.server.RaftServer; import io.atomix.protocols.raft.server.RaftServer;
import io.atomix.protocols.raft.server.storage.system.Configuration; import io.atomix.protocols.raft.server.storage.system.Configuration;
import io.atomix.util.concurrent.Futures; import io.atomix.util.concurrent.Futures;
import io.atomix.util.temp.CatalystThreadFactory; import io.atomix.util.concurrent.AtomixThreadFactory;
import io.atomix.util.temp.Scheduled; import io.atomix.util.concurrent.Scheduled;
import io.atomix.util.temp.SingleThreadContext; import io.atomix.util.concurrent.SingleThreadContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -58,7 +58,7 @@
final class RaftClusterState implements RaftCluster, AutoCloseable { final class RaftClusterState implements RaftCluster, AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(RaftClusterState.class); private static final Logger LOGGER = LoggerFactory.getLogger(RaftClusterState.class);
private final ServerContext context; private final ServerContext context;
private final CatalystThreadFactory threadFactory; private final AtomixThreadFactory threadFactory;
private final RaftMemberState member; private final RaftMemberState member;
private volatile Configuration configuration; private volatile Configuration configuration;
private final Map<NodeId, MemberState> membersMap = new ConcurrentHashMap<>(); private final Map<NodeId, MemberState> membersMap = new ConcurrentHashMap<>();
Expand All @@ -77,7 +77,7 @@ final class RaftClusterState implements RaftCluster, AutoCloseable {
Instant time = Instant.now(); Instant time = Instant.now();
this.member = new RaftMemberState(localNodeId, type, RaftMember.Status.AVAILABLE, time).setCluster(this); this.member = new RaftMemberState(localNodeId, type, RaftMember.Status.AVAILABLE, time).setCluster(this);
this.context = checkNotNull(context, "context cannot be null"); this.context = checkNotNull(context, "context cannot be null");
this.threadFactory = new CatalystThreadFactory("copycat-server-" + localNodeId + "-appender-%d"); this.threadFactory = new AtomixThreadFactory("copycat-server-" + localNodeId + "-appender-%d");


// If a configuration is stored, use the stored configuration, otherwise configure the server with the user provided configuration. // If a configuration is stored, use the stored configuration, otherwise configure the server with the user provided configuration.
configuration = context.getMetaStore().loadConfiguration(); configuration = context.getMetaStore().loadConfiguration();
Expand Down
Expand Up @@ -22,7 +22,7 @@
import io.atomix.protocols.raft.protocol.RaftResponse; import io.atomix.protocols.raft.protocol.RaftResponse;
import io.atomix.protocols.raft.protocol.ReconfigureRequest; import io.atomix.protocols.raft.protocol.ReconfigureRequest;
import io.atomix.protocols.raft.server.storage.system.Configuration; import io.atomix.protocols.raft.server.storage.system.Configuration;
import io.atomix.util.temp.Scheduled; import io.atomix.util.concurrent.Scheduled;


import java.time.Instant; import java.time.Instant;
import java.util.Objects; import java.util.Objects;
Expand Down
Expand Up @@ -31,8 +31,8 @@
import io.atomix.protocols.raft.server.storage.Storage; import io.atomix.protocols.raft.server.storage.Storage;
import io.atomix.protocols.raft.server.storage.snapshot.SnapshotStore; import io.atomix.protocols.raft.server.storage.snapshot.SnapshotStore;
import io.atomix.protocols.raft.server.storage.system.MetaStore; import io.atomix.protocols.raft.server.storage.system.MetaStore;
import io.atomix.util.temp.SingleThreadContext; import io.atomix.util.concurrent.SingleThreadContext;
import io.atomix.util.temp.ThreadContext; import io.atomix.util.concurrent.ThreadContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down
Expand Up @@ -30,8 +30,8 @@
import io.atomix.protocols.raft.server.storage.snapshot.SnapshotReader; import io.atomix.protocols.raft.server.storage.snapshot.SnapshotReader;
import io.atomix.protocols.raft.server.storage.snapshot.SnapshotWriter; import io.atomix.protocols.raft.server.storage.snapshot.SnapshotWriter;
import io.atomix.util.serializer.Serializer; import io.atomix.util.serializer.Serializer;
import io.atomix.util.temp.Scheduled; import io.atomix.util.concurrent.Scheduled;
import io.atomix.util.temp.ThreadContext; import io.atomix.util.concurrent.ThreadContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down
Expand Up @@ -36,8 +36,8 @@
import io.atomix.protocols.raft.server.storage.snapshot.Snapshot; import io.atomix.protocols.raft.server.storage.snapshot.Snapshot;
import io.atomix.util.concurrent.ComposableFuture; import io.atomix.util.concurrent.ComposableFuture;
import io.atomix.util.concurrent.Futures; import io.atomix.util.concurrent.Futures;
import io.atomix.util.temp.ThreadContext; import io.atomix.util.concurrent.ThreadContext;
import io.atomix.util.temp.ThreadPoolContext; import io.atomix.util.concurrent.ThreadPoolContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down
56 changes: 56 additions & 0 deletions core/src/main/java/io/atomix/util/concurrent/AtomixThread.java
@@ -0,0 +1,56 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.util.concurrent;

import java.lang.ref.WeakReference;

/**
* Atomix thread.
* <p>
* The Atomix thread primarily serves to store a {@link ThreadContext} for the current thread.
* The context is stored in a {@link WeakReference} in order to allow the thread to be garbage collected.
* <p>
* There is no {@link ThreadContext} associated with the thread when it is first created.
* It is the responsibility of thread creators to {@link #setContext(ThreadContext) set} the thread context when appropriate.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class AtomixThread extends Thread {
private WeakReference<ThreadContext> context;

public AtomixThread(Runnable target, String name) {
super(target, name);
}

/**
* Sets the thread context.
*
* @param context The thread context.
*/
public void setContext(ThreadContext context) {
this.context = new WeakReference<>(context);
}

/**
* Returns the thread context.
*
* @return The thread {@link ThreadContext} or {@code null} if no context has been configured.
*/
public ThreadContext getContext() {
return context != null ? context.get() : null;
}

}
@@ -0,0 +1,43 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.util.concurrent;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Named thread factory.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class AtomixThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String nameFormat;

/**
* Creates a thread factory that names threads according to the {@code nameFormat} by supplying a
* single argument to the format representing the thread number.
*/
public AtomixThreadFactory(String nameFormat) {
this.nameFormat = nameFormat;
}

@Override
public Thread newThread(Runnable r) {
return new AtomixThread(r, String.format(nameFormat, threadNumber.getAndIncrement()));
}

}
31 changes: 31 additions & 0 deletions core/src/main/java/io/atomix/util/concurrent/Scheduled.java
@@ -0,0 +1,31 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.atomix.util.concurrent;

/**
* Scheduled task.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public interface Scheduled {

/**
* Cancels the scheduled task.
*/
void cancel();

}

0 comments on commit 4366d64

Please sign in to comment.