Skip to content
Permalink
Browse files
Fix Journal.ForceWriteThread.forceWriteRequests.put deadlock
Descriptions of the changes in this PR:



### Motivation
`Journal.ForceWriteThread` could deadlock as it is the sole consumer of `Journal.forceWriteRequests` while it send group marker blocking using `BlockingQueue.put`.

This PR try to fix this.

### Changes
* Add testing code to deadlock `Journal.ForceWriteThread` on `forceWriteRequests.put`.
* Send force write group marker non-blocking to avoid deadlock `ForceWriteThread`.

Master Issue: #2948

Reviewers: Andrey Yegorov <None>

This closes #2962 from kezhuw/fix-Journal.ForceWriteThread.forceWriteRequests.put-deadlock
  • Loading branch information
kezhuw committed Mar 10, 2022
1 parent 3edbc98 commit 1b1e9370f39e386d660ceee11660d7ee87fbb460
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 1 deletion.
@@ -112,6 +112,7 @@ public interface BookKeeperServerStats {
String JOURNAL_FORCE_WRITE_LATENCY = "JOURNAL_FORCE_WRITE_LATENCY";
String JOURNAL_FORCE_WRITE_BATCH_ENTRIES = "JOURNAL_FORCE_WRITE_BATCH_ENTRIES";
String JOURNAL_FORCE_WRITE_BATCH_BYTES = "JOURNAL_FORCE_WRITE_BATCH_BYTES";
String JOURNAL_FORCE_WRITE_GROUPING_FAILURES = "JOURNAL_FORCE_WRITE_GROUPING_FAILURES";
String JOURNAL_FLUSH_LATENCY = "JOURNAL_FLUSH_LATENCY";
String JOURNAL_QUEUE_LATENCY = "JOURNAL_QUEUE_LATENCY";
String JOURNAL_QUEUE_MAX_SIZE = "JOURNAL_QUEUE_MAX_SIZE";
@@ -501,6 +501,7 @@ public void run() {
long busyStartTime = System.nanoTime();
while (running) {
ForceWriteRequest req = null;
boolean forceWriteMarkerSent = false;
try {
forceWriteThreadTime.add(MathUtils.elapsedNanos(busyStartTime));
req = forceWriteRequests.take();
@@ -513,7 +514,19 @@ public void run() {
// queue will benefit from this force write - post a marker prior to issuing
// the flush so until this marker is encountered we can skip the force write
if (enableGroupForceWrites) {
forceWriteRequests.put(createForceWriteRequest(req.logFile, 0, 0, null, false, true));
ForceWriteRequest marker =
createForceWriteRequest(req.logFile, 0, 0, null, false, true);
forceWriteMarkerSent = forceWriteRequests.offer(marker);
if (!forceWriteMarkerSent) {
marker.recycle();
Counter failures = journalStats.getForceWriteGroupingFailures();
failures.inc();
LOG.error(
"Fail to send force write grouping marker,"
+ " Journal.forceWriteRequests queue(capacity {}) is full,"
+ " current failure counter is {}.",
conf.getJournalQueueSize(), failures.get());
}
}

// If we are about to issue a write, record the number of requests in
@@ -531,6 +544,11 @@ public void run() {
if (enableGroupForceWrites
// if its a marker we should switch back to flushing
&& !req.isMarker
// If group marker sending failed, we can't figure out which writes are
// grouped in this force write. So, abandon it even if other writes could
// be grouped. This should be extremely rare as, usually, queue size is
// large enough to accommodate high flush frequencies.
&& forceWriteMarkerSent
// This indicates that this is the last request in a given file
// so subsequent requests will go to a different file so we should
// flush on the next request
@@ -31,6 +31,7 @@
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_ENTRIES;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_ENQUEUE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_GROUPING_COUNT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_GROUPING_FAILURES;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_MEMORY_MAX;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_MEMORY_USED;
@@ -132,6 +133,11 @@ public class JournalStats {
help = "The distribution of number of bytes grouped together into a force write request"
)
private final OpStatsLogger forceWriteBatchBytesStats;
@StatsDoc(
name = JOURNAL_FORCE_WRITE_GROUPING_FAILURES,
help = "The number of force write grouping failures"
)
private final Counter forceWriteGroupingFailures;
@StatsDoc(
name = JOURNAL_QUEUE_SIZE,
help = "The journal queue size"
@@ -190,6 +196,7 @@ public JournalStats(StatsLogger statsLogger, final long maxJournalMemoryBytes,
journalProcessTimeStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_PROCESS_TIME_LATENCY);
forceWriteGroupingCountStats =
statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_GROUPING_COUNT);
forceWriteGroupingFailures = statsLogger.getCounter(JOURNAL_FORCE_WRITE_GROUPING_FAILURES);
forceWriteBatchEntriesStats =
statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_ENTRIES);
forceWriteBatchBytesStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_BYTES);
@@ -42,6 +42,7 @@
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.junit.Rule;
import org.junit.Test;
@@ -207,6 +208,33 @@ public void testForceLedger() throws Exception {
b.shutdown();
}

@Test
public void testSmallJournalQueueWithHighFlushFrequency() throws IOException, InterruptedException {
ServerConfiguration conf = new ServerConfiguration();
conf.setJournalQueueSize(1);
conf.setJournalFlushWhenQueueEmpty(true);
conf.setJournalBufferedWritesThreshold(1);

conf.setJournalDirName(tempDir.newFolder().getPath());
conf.setLedgerDirNames(new String[]{tempDir.newFolder().getPath()});
DiskChecker diskChecker = new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold());
LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), diskChecker);
Journal journal = new Journal(0, conf.getJournalDirs()[0], conf, ledgerDirsManager);
journal.start();

final int entries = 1000;
CountDownLatch entriesLatch = new CountDownLatch(entries);
for (int j = 1; j <= entries; j++) {
ByteBuf entry = buildEntry(1, j, -1);
journal.logAddEntry(entry, false, (int rc, long ledgerId, long entryId, BookieId addr, Object ctx) -> {
entriesLatch.countDown();
}, null);
}
entriesLatch.await();

journal.shutdown();
}

private static ByteBuf buildEntry(long ledgerId, long entryId, long lastAddConfirmed) {
final ByteBuf data = Unpooled.buffer();
data.writeLong(ledgerId);

0 comments on commit 1b1e937

Please sign in to comment.