Skip to content

Commit

Permalink
Fix snapshot from follower thread leak
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

When using Ratis, the primary master does not take a snapshot itself,
instead it requests a snapshot from a follower. This pull request fixes
two issues with the current implementation of this.

1. Currently when a snapshot needs to be taken as given by
`alluxio.master.journal.checkpoint.period.entries`, Ratis will call
`takeSnapshot()` in the JournalStateMachine each time a journal entry is
committed until a new snapshot is installed. On the primary master
`takeSnapshot()` runs asynchronously by first requesting snapshot
information from each follower, then downloading a snapshot form one of
them if a valid snapshot is available. If no valid snapshot is available
(which is likely since all nodes take snapshots at the same log index
and it takes time to generate a snapshot) the request happens repeatedly
until one is available, but each request allocates a new GRPC
connection, eventually this may cause the master to crash or fail over
from allocating too many threads. This is fixed by having the follower
block until a valid snapshot is available before sending the reply (with
a configurable timeout).

2. Currently when a primary master sends a request to a follower to
start sending the snapshot it always expects the follower to start a new
RPC to do this, but if the follower does not do this (for example due to
any sort of failure or network issue) then the primary master will
always be waiting for this RPC and never install a new snapshot. This is
fixed by adding a timeout for the follower to start the RPC, and if the
timeout runs out, a new follower is tried, or the snapshot request
protocol starts again if none are available presently.

### Does this PR introduce any user facing changes?

No

pr-link: #15873
change-id: cid-2a9a8ef7f1a416299b9a70df2592ffc9a076f760
  • Loading branch information
tcrain committed Jul 19, 2022
1 parent b114ad5 commit 94316d7
Show file tree
Hide file tree
Showing 8 changed files with 332 additions and 53 deletions.
19 changes: 19 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Expand Up @@ -2738,6 +2738,21 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey MASTER_JOURNAL_REQUEST_DATA_TIMEOUT =
durationBuilder(Name.MASTER_JOURNAL_REQUEST_DATA_TIMEOUT)
.setDefaultValue(20000)
.setDescription("Time to wait for follower to respond to request to send a new snapshot")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey MASTER_JOURNAL_REQUEST_INFO_TIMEOUT =
durationBuilder(Name.MASTER_JOURNAL_REQUEST_INFO_TIMEOUT)
.setDefaultValue(20000)
.setDescription("Time to wait for follower to respond to request to get information"
+ " about its latest snapshot")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey MASTER_JOURNAL_SPACE_MONITOR_INTERVAL =
durationBuilder(Name.MASTER_JOURNAL_SPACE_MONITOR_INTERVAL)
.setDefaultValue("10min")
Expand Down Expand Up @@ -6969,6 +6984,10 @@ public static final class Name {
"alluxio.master.journal.log.size.bytes.max";
public static final String MASTER_JOURNAL_LOG_CONCURRENCY_MAX =
"alluxio.master.journal.log.concurrency.max";
public static final String MASTER_JOURNAL_REQUEST_DATA_TIMEOUT =
"alluxio.master.journal.request.data.timeout";
public static final String MASTER_JOURNAL_REQUEST_INFO_TIMEOUT =
"alluxio.master.journal.request.info.timeout";
public static final String MASTER_JOURNAL_TAILER_SHUTDOWN_QUIET_WAIT_TIME_MS =
"alluxio.master.journal.tailer.shutdown.quiet.wait.time";
public static final String MASTER_JOURNAL_TAILER_SLEEP_TIME_MS =
Expand Down
Expand Up @@ -87,7 +87,7 @@
@ThreadSafe
public class JournalStateMachine extends BaseStateMachine {
private static final Logger LOG = LoggerFactory.getLogger(JournalStateMachine.class);
private static final Logger SAMPLING_LOG = new SamplingLogger(LOG, 10L * Constants.MINUTE_MS);
private static final Logger SAMPLING_LOG = new SamplingLogger(LOG, 10L * Constants.SECOND_MS);

private static final CompletableFuture<Message> EMPTY_FUTURE =
CompletableFuture.completedFuture(Message.EMPTY);
Expand Down Expand Up @@ -186,6 +186,9 @@ public void initialize(RaftServer server, RaftGroupId groupId,
mRaftGroupId = groupId;
mStorage.init(raftStorage);
loadSnapshot(mStorage.getLatestSnapshot());
synchronized (mSnapshotManager) {
mSnapshotManager.notifyAll();
}
});
}

Expand All @@ -195,6 +198,9 @@ public void reinitialize() throws IOException {
mStorage.loadLatestSnapshot();
loadSnapshot(mStorage.getLatestSnapshot());
unpause();
synchronized (mSnapshotManager) {
mSnapshotManager.notifyAll();
}
}

private synchronized void loadSnapshot(SingleFileSnapshotInfo snapshot) throws IOException {
Expand All @@ -213,6 +219,9 @@ private synchronized void loadSnapshot(SingleFileSnapshotInfo snapshot) throws I
setLastAppliedTermIndex(snapshot.getTermIndex());
install(snapshotFile);
mSnapshotLastIndex = getLatestSnapshot() != null ? getLatestSnapshot().getIndex() : -1;
synchronized (mSnapshotManager) {
mSnapshotManager.notifyAll();
}
} catch (Exception e) {
throw new IOException(String.format("Failed to load snapshot %s", snapshot), e);
}
Expand All @@ -221,6 +230,7 @@ private synchronized void loadSnapshot(SingleFileSnapshotInfo snapshot) throws I
@Override
public long takeSnapshot() {
if (mIsLeader) {
SAMPLING_LOG.info("Calling take snapshot on leader");
try {
Preconditions.checkState(mServer.getGroups().iterator().hasNext());
RaftGroup group = mServer.getGroups().iterator().next();
Expand Down Expand Up @@ -291,6 +301,9 @@ public CompletableFuture<Message> query(Message request) {
@Override
public void close() {
mClosed = true;
synchronized (mSnapshotManager) {
mSnapshotManager.notifyAll();
}
}

@Override
Expand All @@ -311,6 +324,9 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
mIsLeader = false;
mJournalSystem.notifyLeadershipStateChanged(false);
synchronized (mSnapshotManager) {
mSnapshotManager.notifyAll();
}
}

@Override
Expand Down Expand Up @@ -346,6 +362,9 @@ public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
snapshotIndex.getIndex(), latestJournalIndex));
}
mSnapshotLastIndex = snapshotIndex.getIndex();
synchronized (mSnapshotManager) {
mSnapshotManager.notifyAll();
}
return snapshotIndex;
});
}
Expand Down Expand Up @@ -547,6 +566,9 @@ public synchronized long takeLocalSnapshot() {
return last.getIndex();
} finally {
mSnapshotting = false;
synchronized (mSnapshotManager) {
mSnapshotManager.notifyAll();
}
}
}

Expand Down
Expand Up @@ -458,8 +458,11 @@ public synchronized RaftGroup getCurrentGroup() {
}

private RaftClient createClient() {
long timeoutMs =
Configuration.getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_RAFT_CLIENT_REQUEST_TIMEOUT);
return createClient(Configuration.getMs(
PropertyKey.MASTER_EMBEDDED_JOURNAL_RAFT_CLIENT_REQUEST_TIMEOUT));
}

private RaftClient createClient(long timeoutMs) {
long retryBaseMs =
Configuration.getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_RAFT_CLIENT_REQUEST_INTERVAL);
long maxSleepTimeMs =
Expand Down Expand Up @@ -880,7 +883,21 @@ public synchronized List<QuorumServerInfo> getQuorumServerInfoList() throws IOEx
*/
public synchronized CompletableFuture<RaftClientReply> sendMessageAsync(
RaftPeerId server, Message message) {
RaftClient client = createClient();
return sendMessageAsync(server, message, Configuration.getMs(
PropertyKey.MASTER_EMBEDDED_JOURNAL_RAFT_CLIENT_REQUEST_TIMEOUT));
}

/**
* Sends a message to a raft server asynchronously.
*
* @param server the raft peer id of the target server
* @param message the message to send
* @param timeoutMs the message timeout in milliseconds
* @return a future to be completed with the client reply
*/
public synchronized CompletableFuture<RaftClientReply> sendMessageAsync(
RaftPeerId server, Message message, long timeoutMs) {
RaftClient client = createClient(timeoutMs);
RaftClientRequest request = RaftClientRequest.newBuilder()
.setClientId(mRawClientId)
.setServerId(server)
Expand Down

0 comments on commit 94316d7

Please sign in to comment.