Skip to content

Commit

Permalink
RATIS-2116. Fix the issue where RaftServerImpl.appendEntries may be b…
Browse files Browse the repository at this point in the history
…locked indefinitely (#1116)
  • Loading branch information
sunhaibotb committed Jul 2, 2024
1 parent eeaf6a4 commit 32745c3
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,11 @@ public <RESULT, THROWABLE extends Throwable> List<RESULT> pollList(long timeoutM
return results;
}
}

@Override
public E peek() {
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
return super.peek();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ public E poll() {
return polled;
}

/** Peek the head element from this queue. */
public E peek() {
return q.peek();
}

/** The same as {@link java.util.Collection#remove(Object)}. */
public boolean remove(E e) {
final boolean removed = q.remove(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class SegmentedRaftLogWorker {

static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1, TimeUnit.SECONDS);

private static final String CLASS_NAME = JavaUtils.getClassSimpleName(SegmentedRaftLogWorker.class);
static final String RUN_WORKER = CLASS_NAME + ".runWorker";

static class StateMachineDataPolicy {
private final boolean sync;
private final TimeDuration syncTimeout;
Expand Down Expand Up @@ -298,6 +301,7 @@ private void run() {
// if and when a log task encounters an exception
RaftLogIOException logIOException = null;

CodeInjectionForTesting.execute(RUN_WORKER, server == null ? null : server.getId(), null, queue);
while (running) {
try {
Task task = queue.poll(ONE_SECOND);
Expand Down Expand Up @@ -356,7 +360,7 @@ private boolean shouldFlush() {
} else if (pendingFlushNum >= forceSyncNum) {
return true;
}
return pendingFlushNum > 0 && queue.isEmpty();
return pendingFlushNum > 0 && !(queue.peek() instanceof WriteLog);
}

private void flushIfNecessary() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.DataBlockingQueue;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.Slf4jUtils;
Expand All @@ -57,8 +59,13 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Stream;
Expand All @@ -74,6 +81,7 @@

import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogWorker.RUN_WORKER;
import static org.apache.ratis.server.storage.RaftStorageTestUtils.getLogUnsafe;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
Expand Down Expand Up @@ -396,6 +404,82 @@ public void testAppendAndRoll(Boolean useAsyncFlush, Boolean smSyncFlush) throws
}
}

@ParameterizedTest
@MethodSource("data")
public void testPurgeAfterAppendEntry(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception {
RaftServerConfigKeys.Log.setAsyncFlushEnabled(properties, useAsyncFlush);
RaftServerConfigKeys.Log.StateMachineData.setSync(properties, smSyncFlush);
RaftServerConfigKeys.Log.setPurgeGap(properties, 1);
RaftServerConfigKeys.Log.setForceSyncNum(properties, 128);

int startTerm = 0;
int endTerm = 2;
int segmentSize = 10;
long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1);
long nextStartIndex = segmentSize * (endTerm - startTerm);

// append entries and roll logSegment for later purge operation
List<SegmentRange> ranges0 = prepareRanges(startTerm, endTerm, segmentSize, 0);
List<LogEntryProto> entries0 = prepareLogEntries(ranges0, null);
try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
entries0.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
}

// test the pattern in the task queue of SegmentedRaftLogWorker: (WriteLog, ..., PurgeLog)
List<SegmentRange> ranges = prepareRanges(endTerm - 1, endTerm, 1, nextStartIndex);
List<LogEntryProto> entries = prepareLogEntries(ranges, null);

try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) {
final CountDownLatch raftLogOpened = new CountDownLatch(1);
final CountDownLatch tasksAdded = new CountDownLatch(1);

// inject test code to make the pattern (WriteLog, PurgeLog)
final ConcurrentLinkedQueue<CompletableFuture<Long>> appendFutures = new ConcurrentLinkedQueue<>();
final AtomicReference<CompletableFuture<Long>> purgeFuture = new AtomicReference<>();
final AtomicInteger tasksCount = new AtomicInteger(0);
CodeInjectionForTesting.put(RUN_WORKER, (localId, remoteId, args) -> {
// wait for raftLog to be opened
try {
if(!raftLogOpened.await(FIVE_SECONDS.getDuration(), FIVE_SECONDS.getUnit())) {
throw new TimeoutException();
}
} catch (InterruptedException | TimeoutException e) {
LOG.error("an exception occurred", e);
throw new RuntimeException(e);
}

// add WriteLog and PurgeLog tasks
entries.stream().map(raftLog::appendEntry).forEach(appendFutures::add);
purgeFuture.set(raftLog.purge(endIndexOfClosedSegment));

tasksCount.set(((DataBlockingQueue<?>) args[0]).getNumElements());
tasksAdded.countDown();
return true;
});

// open raftLog
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
raftLogOpened.countDown();

// wait for all tasks to be added
if(!tasksAdded.await(FIVE_SECONDS.getDuration(), FIVE_SECONDS.getUnit())) {
throw new TimeoutException();
}
Assertions.assertEquals(entries.size() + 1, tasksCount.get());

// check if the purge task is executed
final Long purged = purgeFuture.get().get();
LOG.info("purgeIndex = {}, purged = {}", endIndexOfClosedSegment, purged);
Assertions.assertEquals(endIndexOfClosedSegment, raftLog.getRaftLogCache().getStartIndex());

// check if the appendEntry futures are done
JavaUtils.allOf(appendFutures).get(FIVE_SECONDS.getDuration(), FIVE_SECONDS.getUnit());
} finally {
CodeInjectionForTesting.put(RUN_WORKER, (localId, remoteId, args) -> false);
}
}

@ParameterizedTest
@MethodSource("data")
public void testTruncate(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception {
Expand Down

0 comments on commit 32745c3

Please sign in to comment.