Skip to content

Commit

Permalink
HBASE-25984: Avoid premature reuse of sync futures in FSHLog (#3371)
Browse files Browse the repository at this point in the history
Signed-off-by: Viraj Jasani <vjasani@apache.org>
  • Loading branch information
bharathv committed Jun 16, 2021
1 parent 7466e08 commit 5a19bcf
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 21 deletions.
Expand Up @@ -320,11 +320,9 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);

/**
* Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures.
* Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228
* <p>
* A cache of sync futures reused by threads.
*/
private final ThreadLocal<SyncFuture> cachedSyncFutures;
protected final SyncFutureCache syncFutureCache;

/**
* The class name of the runtime implementation, used as prefix for logging/tracing.
Expand Down Expand Up @@ -494,12 +492,7 @@ public boolean accept(final Path fileName) {
DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS,
DEFAULT_WAL_SYNC_TIMEOUT_MS));
this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
@Override
protected SyncFuture initialValue() {
return new SyncFuture();
}
};
this.syncFutureCache = new SyncFutureCache(conf);
this.implClassName = getClass().getSimpleName();
this.walTooOldNs = TimeUnit.SECONDS.toNanos(conf.getInt(
SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
Expand Down Expand Up @@ -885,10 +878,6 @@ protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
}
}
} catch (TimeoutIOException tioe) {
// SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
// still refer to it, so if this thread use it next time may get a wrong
// result.
this.cachedSyncFutures.remove();
throw tioe;
} catch (InterruptedException ie) {
LOG.warn("Interrupted", ie);
Expand Down Expand Up @@ -993,6 +982,9 @@ public void shutdown() throws IOException {
rollWriterLock.lock();
try {
doShutdown();
if (syncFutureCache != null) {
syncFutureCache.clear();
}
if (logArchiveExecutor != null) {
logArchiveExecutor.shutdownNow();
}
Expand Down Expand Up @@ -1049,7 +1041,7 @@ public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequen
}

protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync);
return syncFutureCache.getIfPresentOrNew().reset(sequence).setForceSync(forceSync);
}

protected boolean isLogRollRequested() {
Expand Down
Expand Up @@ -263,6 +263,14 @@ public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDi
DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
}

/**
* Helper that marks the future as DONE and offers it back to the cache.
*/
private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) {
future.done(txid, t);
syncFutureCache.offer(future);
}

private static boolean waitingRoll(int epochAndState) {
return (epochAndState & 1) != 0;
}
Expand Down Expand Up @@ -397,7 +405,7 @@ private int finishSyncLowerThanTxid(long txid) {
for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
SyncFuture sync = iter.next();
if (sync.getTxid() <= txid) {
sync.done(txid, null);
markFutureDoneAndOffer(sync, txid, null);
iter.remove();
finished++;
} else {
Expand All @@ -416,7 +424,7 @@ private int finishSync() {
long maxSyncTxid = highestSyncedTxid.get();
for (SyncFuture sync : syncFutures) {
maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
sync.done(maxSyncTxid, null);
markFutureDoneAndOffer(sync, maxSyncTxid, null);
}
highestSyncedTxid.set(maxSyncTxid);
int finished = syncFutures.size();
Expand Down Expand Up @@ -531,7 +539,7 @@ private void drainNonMarkerEditsAndFailSyncs() {
for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) {
SyncFuture future = syncIter.next();
if (future.getTxid() < txid) {
future.done(future.getTxid(), error);
markFutureDoneAndOffer(future, future.getTxid(), error);
syncIter.remove();
} else {
break;
Expand Down Expand Up @@ -796,7 +804,7 @@ protected void doShutdown() throws IOException {
}
}
// and fail them
syncFutures.forEach(f -> f.done(f.getTxid(), error));
syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error));
if (!(consumeExecutor instanceof EventLoop)) {
consumeExecutor.shutdown();
}
Expand Down
Expand Up @@ -891,6 +891,14 @@ SyncFuture waitSafePoint(SyncFuture syncFuture) throws InterruptedException,
return syncFuture;
}

/**
* @return if the safepoint has been attained.
*/
@InterfaceAudience.Private
boolean isSafePointAttained() {
return this.safePointAttainedLatch.getCount() == 0;
}

/**
* Called by Thread B when it attains the 'safe point'. In this method, Thread B signals Thread
* A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} is called
Expand Down Expand Up @@ -977,6 +985,16 @@ private void cleanupOutstandingSyncsOnException(final long sequence, final Excep
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
this.syncFutures[i].done(sequence, e);
}
offerDoneSyncsBackToCache();
}

/**
* Offers the finished syncs back to the cache for reuse.
*/
private void offerDoneSyncsBackToCache() {
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
syncFutureCache.offer(syncFutures[i]);
}
this.syncFuturesCount.set(0);
}

Expand Down Expand Up @@ -1089,7 +1107,10 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
? this.exception : new DamagedWALException("On sync", this.exception));
}
attainSafePoint(sequence);
this.syncFuturesCount.set(0);
// It is critical that we offer the futures back to the cache for reuse here after the
// safe point is attained and all the clean up has been done. There have been
// issues with reusing sync futures early causing WAL lockups, see HBASE-25984.
offerDoneSyncsBackToCache();
} catch (Throwable t) {
LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
}
Expand Down
Expand Up @@ -90,7 +90,8 @@ synchronized SyncFuture reset(long txid) {

@Override
public synchronized String toString() {
return "done=" + isDone() + ", txid=" + this.txid;
return "done=" + isDone() + ", txid=" + this.txid + " threadID=" + t.getId() +
" threadName=" + t.getName();
}

synchronized long getTxid() {
Expand All @@ -106,6 +107,15 @@ synchronized SyncFuture setForceSync(boolean forceSync) {
return this;
}

/**
* Returns the thread that owned this sync future, use with caution as we return the reference to
* the actual thread object.
* @return the associated thread instance.
*/
Thread getThread() {
return t;
}

/**
* @param txid the transaction id at which this future 'completed'.
* @param t Can be null. Set if we are 'completing' on error (and this 't' is the error).
Expand Down
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;

import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;

/**
* A cache of {@link SyncFuture}s. This class supports two methods
* {@link SyncFutureCache#getIfPresentOrNew()} and {@link SyncFutureCache#offer()}.
*
* Usage pattern:
* SyncFuture sf = syncFutureCache.getIfPresentOrNew();
* sf.reset(...);
* // Use the sync future
* finally: syncFutureCache.offer(sf);
*
* Offering the sync future back to the cache makes it eligible for reuse within the same thread
* context. Cache keyed by the accessing thread instance and automatically invalidated if it remains
* unused for {@link SyncFutureCache#SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS} minutes.
*/
@InterfaceAudience.Private
public final class SyncFutureCache {

private static final long SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS = 2;

private final Cache<Thread, SyncFuture> syncFutureCache;

public SyncFutureCache(final Configuration conf) {
final int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
syncFutureCache = CacheBuilder.newBuilder().initialCapacity(handlerCount)
.expireAfterWrite(SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS, TimeUnit.MINUTES).build();
}

public SyncFuture getIfPresentOrNew() {
// Invalidate the entry if a mapping exists. We do not want it to be reused at the same time.
SyncFuture future = syncFutureCache.asMap().remove(Thread.currentThread());
return (future == null) ? new SyncFuture() : future;
}

/**
* Offers the sync future back to the cache for reuse.
*/
public void offer(SyncFuture syncFuture) {
// It is ok to overwrite an existing mapping.
syncFutureCache.asMap().put(syncFuture.getThread(), syncFuture);
}

public void clear() {
if (syncFutureCache != null) {
syncFutureCache.invalidateAll();
}
}
}
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.regionserver.wal;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.lang.reflect.Field;
Expand All @@ -27,13 +29,15 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
Expand All @@ -49,8 +53,10 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -67,6 +73,8 @@ public class TestFSHLog extends AbstractTestFSWAL {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFSHLog.class);

private static final long TEST_TIMEOUT_MS = 10000;

@Rule
public TestName name = new TestName();

Expand Down Expand Up @@ -131,6 +139,89 @@ public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldExcepti
}
}

/**
* Test for WAL stall due to sync future overwrites. See HBASE-25984.
*/
@Test
public void testDeadlockWithSyncOverwrites() throws Exception {
final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1);

class FailingWriter implements WALProvider.Writer {
@Override public void sync(boolean forceSync) throws IOException {
throw new IOException("Injected failure..");
}

@Override public void append(WAL.Entry entry) throws IOException {
}

@Override public long getLength() {
return 0;
}

@Override
public long getSyncedLength() {
return 0;
}

@Override public void close() throws IOException {
}
}

/*
* Custom FSHLog implementation with a conditional wait before attaining safe point.
*/
class CustomFSHLog extends FSHLog {
public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix) throws IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
}

@Override
protected void beforeWaitOnSafePoint() {
try {
assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public SyncFuture publishSyncOnRingBuffer() {
long sequence = getSequenceOnRingBuffer();
return publishSyncOnRingBuffer(sequence, false);
}
}

final String name = this.name.getMethodName();
try (CustomFSHLog log = new CustomFSHLog(FS, CommonFSUtils.getRootDir(CONF), name,
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
log.setWriter(new FailingWriter());
Field ringBufferEventHandlerField =
FSHLog.class.getDeclaredField("ringBufferEventHandler");
ringBufferEventHandlerField.setAccessible(true);
FSHLog.RingBufferEventHandler ringBufferEventHandler =
(FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
// Force a safe point
FSHLog.SafePointZigZagLatch latch = ringBufferEventHandler.attainSafePoint();
try {
SyncFuture future0 = log.publishSyncOnRingBuffer();
// Wait for the sync to be done.
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, future0::isDone);
// Publish another sync from the same thread, this should not overwrite the done sync.
SyncFuture future1 = log.publishSyncOnRingBuffer();
assertFalse(future1.isDone());
// Unblock the safe point trigger..
blockBeforeSafePoint.countDown();
// Wait for the safe point to be reached.
// With the deadlock in HBASE-25984, this is never possible, thus blocking the sync pipeline.
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, latch::isSafePointAttained);
} finally {
// Force release the safe point, for the clean up.
latch.releaseSafePoint();
}
}
}

/**
* Test case for https://issues.apache.org/jira/browse/HBASE-16721
*/
Expand Down

0 comments on commit 5a19bcf

Please sign in to comment.