Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ private void run() {
tracker.reportLeak();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Thread interrupted, exiting.", e);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements RaftPeer.Add, Clos
/** Peer and its proxy. */
private class PeerAndProxy {
private final RaftPeer peer;
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile PROXY proxy = null;
private final LifeCycle lifeCycle;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ static void assertTrue(boolean value, Supplier<Object> message) {
}
}

static void assertSame(int expected, int computed, String name) {
assertTrue(expected == computed,
() -> name + ": expected == " + expected + " but computed == " + computed);
}

static void assertSame(long expected, long computed, String name) {
assertTrue(expected == computed,
() -> name + ": expected == " + expected + " but computed == " + computed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ synchronized int process(Event event) {
private final TimeDuration logMessageBatchDuration;
private final int maxOutstandingInstallSnapshots;
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile StreamObservers appendLogRequestObserver;
private final boolean useSeparateHBChannel;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ public class ConfigurationManager {
* The current raft configuration. If configurations is not empty, should be
* the last entry of the map. Otherwise is initialConf.
*/
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile RaftConfigurationImpl currentConf;
/** Cache the peer corresponding to {@link #id}. */
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile RaftPeer currentPeer;

ConfigurationManager(RaftPeerId id, RaftConfigurationImpl initialConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ int update(AtomicInteger outstanding) {
private final RaftServerImpl server;

private final Timestamp creationTime = Timestamp.currentTime();
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile Timestamp lastRpcTime = creationTime;
private volatile boolean isRunning = true;
private final AtomicInteger outstandingOp = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private Timestamp getMaxTimestampWithMajorityAck(List<FollowerInfo> followers) {
return Timestamp.currentTime();
}

final int mid = followers.size() / 2;
final long mid = followers.size() / 2;
return followers.stream()
.map(FollowerInfo::getLastRespondedAppendEntriesSendTime)
.sorted()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ static boolean isSameConf(CurrentOldFollowerInfos cached, RaftConfigurationImpl

static class FollowerInfoMap {
private final Map<RaftPeerId, FollowerInfo> map = new ConcurrentHashMap<>();

@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile CurrentOldFollowerInfos followerInfos;

void put(RaftPeerId id, FollowerInfo info) {
Expand Down Expand Up @@ -333,6 +333,7 @@ boolean isApplied() {
private final RaftServerImpl server;
private final RaftLog raftLog;
private final long currentTerm;
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile ConfigurationStagingState stagingState;

private final FollowerInfoMap followerInfoMap = new FollowerInfoMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ boolean isStable() {
return oldConf == null;
}

@SuppressWarnings({"squid:S6466"}) // Suppress ArrayIndexOutOfBoundsException warning
boolean containsInConf(RaftPeerId peerId, RaftPeerRole... roles) {
if (roles == null || roles.length == 0) {
return conf.contains(peerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class ServerState {
/**
* Candidate that this peer granted vote for in current term (or null if none)
*/
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile RaftPeerId votedFor;

/**
Expand Down Expand Up @@ -171,6 +172,7 @@ private RaftLog initRaftLog(LongSupplier getSnapshotIndexFromStateMachine, RaftP
}
}

@SuppressWarnings({"squid:S2095"}) // Suppress closeable warning
private static RaftLog initRaftLog(RaftGroupMemberId memberId, RaftServerImpl server, RaftStorage storage,
Consumer<LogEntryProto> logConsumer, LongSupplier getSnapshotIndexFromStateMachine,
RaftProperties prop) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public void run() {
}
} catch (Throwable t) {
if (t instanceof InterruptedException && state == State.STOP) {
Thread.currentThread().interrupt();
LOG.info("{} was interrupted. Exiting ...", this);
} else {
state = State.EXCEPTION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public final class LeaderElectionMetrics extends RatisMetrics {

private final Timekeeper electionTime = getRegistry().timer(LEADER_ELECTION_TIME_TAKEN);

@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile Timestamp lastElectionTime;

private LeaderElectionMetrics(RaftGroupMemberId serverId, LongSupplier getLastLeaderElapsedTimeMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,10 @@ File getFile() {
private volatile boolean isOpen;
private long totalFileSize = SegmentedRaftLogFormat.getHeaderLength();
/** Segment start index, inclusive. */
private long startIndex;
private final long startIndex;
/** Segment end index, inclusive. */
private volatile long endIndex;
private RaftStorage storage;
private final RaftStorage storage;
private final SizeInBytes maxOpSize;
private final LogEntryLoader cacheLoader;
/** later replace it with a metric */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ public TransactionContext getTransactionContext(LogEntryProto entry, boolean cre
private final boolean stateMachineCachingEnabled;
private final SegmentedRaftLogMetrics metrics;

@SuppressWarnings({"squid:S2095"}) // Suppress closeable warning
private SegmentedRaftLog(Builder b) {
super(b.memberId, b.snapshotIndexSupplier, b.properties);
this.metrics = new SegmentedRaftLogMetrics(b.memberId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ public String toString() {
}

private final String name;
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile LogSegment openSegment;
private final LogSegmentList closedSegments;
private final RaftStorage storage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ synchronized void updateIndex(long i) {
private volatile boolean running = true;
private final ExecutorService workerThreadExecutor;
private final RaftStorage storage;
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile SegmentedRaftLogOutputStream out;
private final Runnable submitUpdateCommitEvent;
private final StateMachine stateMachine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ void lock() throws IOException {
* <code>null</code> if storage is already locked.
* @throws IOException if locking fails.
*/
@SuppressWarnings({"squid:S2095"}) // Suppress closeable warning
private FileLock tryLock(File lockF) throws IOException {
boolean deletionHookAdded = false;
if (!lockF.exists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class SnapshotManager {
new File(dir.get().getRoot(), c.getFilename()).toPath()).toString();
}

@SuppressWarnings({"squid:S2095"}) // Suppress closeable warning
private FileChannel open(FileChunkProto chunk, File tmpSnapshotFile) throws IOException {
final FileChannel out;
final boolean exists = tmpSnapshotFile.exists();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
public class BaseStateMachine implements StateMachine, StateMachine.DataApi,
StateMachine.EventApi, StateMachine.LeaderEventApi, StateMachine.FollowerEventApi {
private final CompletableFuture<RaftServer> server = new CompletableFuture<>();
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile RaftGroupId groupId;
private final LifeCycle lifeCycle = new LifeCycle(JavaUtils.getClassSimpleName(getClass()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class TransactionContextImpl implements TransactionContext {
private final RaftClientRequest clientRequest;

/** Exception from the {@link StateMachine} or from the log */
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile Exception exception;

/** Data from the {@link StateMachine} */
Expand All @@ -60,6 +61,7 @@ public class TransactionContextImpl implements TransactionContext {
* {@link StateMachine#startTransaction(RaftClientRequest)} and
* {@link StateMachine#applyTransaction(TransactionContext)}.
*/
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile Object stateMachineContext;

/**
Expand All @@ -70,11 +72,14 @@ public class TransactionContextImpl implements TransactionContext {
private boolean shouldCommit = true;

/** Committed LogEntry. */
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile LogEntryProto logEntry;
/** Committed LogEntry copy. */
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile Supplier<LogEntryProto> logEntryCopy;

/** For wrapping {@link #logEntry} in order to release the underlying buffer. */
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
private volatile ReferenceCountedObject<?> delegatedRef;

private final CompletableFuture<Long> logIndexFuture = new CompletableFuture<>();
Expand Down