Skip to content

Commit

Permalink
Fix Flaky BatchProcessorTest::testRestart
Browse files Browse the repository at this point in the history
  • Loading branch information
zfrenette authored and Maithem committed Jun 20, 2023
1 parent 4b69c44 commit 24d37e5
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.corfudb.infrastructure;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.TextFormat;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.corfudb.common.metrics.micrometer.MicroMeterUtils;
import org.corfudb.infrastructure.BatchWriterOperation.Type;
Expand All @@ -23,10 +23,8 @@
import org.corfudb.runtime.view.Layout;

import javax.annotation.Nonnull;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -71,18 +69,13 @@ public class BatchProcessor implements AutoCloseable {
* @param sync If true, the batch writer will sync writes to secondary storage
*/
public BatchProcessor(StreamLog streamLog, BatchProcessorContext context, long sealEpoch, boolean sync) {
this(new LinkedBlockingQueue<>(), streamLog, context, sealEpoch, sync);
}

public BatchProcessor(BlockingQueue<BatchWriterOperation<?>> operationsQueue, StreamLog streamLog,
BatchProcessorContext context, long sealEpoch, boolean sync) {
this.sealEpoch = sealEpoch;
this.sync = sync;
this.streamLog = streamLog;
this.context = context;

BATCH_SIZE = 50;
this.operationsQueue = operationsQueue;
this.operationsQueue = new LinkedBlockingQueue<>();

processorService = newExecutorService();
processorService.submit(this::process);
Expand All @@ -101,14 +94,6 @@ private ExecutorService newExecutorService() {
return Executors.newSingleThreadExecutor(threadFactory);
}

private void recordRunnable(Runnable runnable, Optional<Timer> timer) {
if (timer.isPresent()) {
timer.get().record(runnable);
} else {
runnable.run();
}
}

/**
* Add a task to the processor.
*
Expand Down Expand Up @@ -276,11 +261,13 @@ public void restart() {
public static class BatchProcessorContext {
private final AtomicReference<BatchProcessorStatus> status = new AtomicReference<>(BatchProcessorStatus.BP_STATUS_OK);

private void setErrorStatus() {
@VisibleForTesting
void setErrorStatus() {
status.set(BatchProcessorStatus.BP_STATUS_ERROR);
}

public void setOkStatus() {
@VisibleForTesting
void setOkStatus() {
status.set(BatchProcessorStatus.BP_STATUS_OK);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package org.corfudb.infrastructure.batchprocessor;
package org.corfudb.infrastructure;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.corfudb.infrastructure.BatchProcessor;
import org.corfudb.infrastructure.BatchProcessor.BatchProcessorContext;
import org.corfudb.infrastructure.BatchWriterOperation;
import org.corfudb.infrastructure.log.StreamLog;
import org.corfudb.protocols.service.CorfuProtocolMessage.ClusterIdCheck;
import org.corfudb.protocols.service.CorfuProtocolMessage.EpochCheck;
Expand All @@ -29,9 +27,8 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -50,13 +47,17 @@
import static org.corfudb.runtime.proto.service.CorfuMessage.PriorityLevel;
import static org.corfudb.runtime.proto.service.CorfuMessage.RequestMsg;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@Slf4j
public class BatchProcessorTest {
private static final BatchProcessorStatus BP_STATUS_OK = BatchProcessorStatus.BP_STATUS_OK;
private static final BatchProcessorStatus BP_STATUS_OK = BatchProcessorStatus.BP_STATUS_OK;
private static final BatchProcessorStatus BP_STATUS_ERROR = BatchProcessorStatus.BP_STATUS_ERROR;

@Rule
Expand Down Expand Up @@ -129,7 +130,7 @@ private LogData getDefaultLogData(long address) {
@Before
public void setup() {
mockStreamLog = mock(StreamLog.class);
bpContext = new BatchProcessorContext();
bpContext = spy(new BatchProcessorContext());
batchProcessor = new BatchProcessor(mockStreamLog, bpContext, DEFAULT_SEAL_EPOCH, true);
}

Expand Down Expand Up @@ -196,17 +197,33 @@ public void testReset() {

@Test
public void testRestart() throws Exception {
BlockingQueue<BatchWriterOperation<?>> queue = mock(BlockingQueue.class);
String errMsg = "Dummy IO exception";
when(queue.take()).thenThrow(new IllegalStateException(errMsg));
when(queue.poll()).thenThrow(new IllegalStateException(errMsg));

try(BatchProcessor bp = new BatchProcessor(queue, mockStreamLog, bpContext, DEFAULT_SEAL_EPOCH, true)) {
TimeUnit.SECONDS.sleep(1);
assertEquals(BP_STATUS_ERROR, bpContext.getStatus());
bp.restart();
assertEquals(BP_STATUS_OK, bpContext.getStatus());
}
assertEquals(BP_STATUS_OK, bpContext.getStatus());

final CountDownLatch errorLatch = new CountDownLatch(1);
final CountDownLatch okLatch = new CountDownLatch(1);

doAnswer(invocationOnMock -> {
final Object ret = invocationOnMock.callRealMethod();
errorLatch.countDown();
return ret;
}).when(bpContext).setErrorStatus();

doAnswer(invocationOnMock -> {
final Object ret = invocationOnMock.callRealMethod();
okLatch.countDown();
return ret;
}).when(bpContext).setOkStatus();

doThrow(new IOException()).when(mockStreamLog).sync(anyBoolean());

batchProcessor.addTask(BatchWriterOperation.Type.SHUTDOWN, RequestMsg.getDefaultInstance());
errorLatch.await();
assertEquals(BP_STATUS_ERROR, bpContext.getStatus());

batchProcessor.restart();

okLatch.await();
assertEquals(BP_STATUS_OK, bpContext.getStatus());
}

/**
Expand Down

0 comments on commit 24d37e5

Please sign in to comment.