From 5a19bcfa98b3ccd9f7fb1fb933248c808676d91c Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 16 Jun 2021 14:30:15 -0700 Subject: [PATCH] HBASE-25984: Avoid premature reuse of sync futures in FSHLog (#3371) Signed-off-by: Viraj Jasani --- .../hbase/regionserver/wal/AbstractFSWAL.java | 22 ++--- .../hbase/regionserver/wal/AsyncFSWAL.java | 16 +++- .../hadoop/hbase/regionserver/wal/FSHLog.java | 23 ++++- .../hbase/regionserver/wal/SyncFuture.java | 12 ++- .../regionserver/wal/SyncFutureCache.java | 74 +++++++++++++++ .../hbase/regionserver/wal/TestFSHLog.java | 91 +++++++++++++++++++ .../regionserver/wal/TestSyncFutureCache.java | 68 ++++++++++++++ 7 files changed, 285 insertions(+), 21 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFutureCache.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index ba302cf2107f..68d39db43408 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -320,11 +320,9 @@ public WalProps(Map encodedName2HighestSequenceId, long logSize) { new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR); /** - * Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures. - * Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228 - *

+ * A cache of sync futures reused by threads. */ - private final ThreadLocal cachedSyncFutures; + protected final SyncFutureCache syncFutureCache; /** * The class name of the runtime implementation, used as prefix for logging/tracing. @@ -494,12 +492,7 @@ public boolean accept(final Path fileName) { DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS); this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS)); - this.cachedSyncFutures = new ThreadLocal() { - @Override - protected SyncFuture initialValue() { - return new SyncFuture(); - } - }; + this.syncFutureCache = new SyncFutureCache(conf); this.implClassName = getClass().getSimpleName(); this.walTooOldNs = TimeUnit.SECONDS.toNanos(conf.getInt( SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT)); @@ -885,10 +878,6 @@ protected final void blockOnSync(SyncFuture syncFuture) throws IOException { } } } catch (TimeoutIOException tioe) { - // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer - // still refer to it, so if this thread use it next time may get a wrong - // result. - this.cachedSyncFutures.remove(); throw tioe; } catch (InterruptedException ie) { LOG.warn("Interrupted", ie); @@ -993,6 +982,9 @@ public void shutdown() throws IOException { rollWriterLock.lock(); try { doShutdown(); + if (syncFutureCache != null) { + syncFutureCache.clear(); + } if (logArchiveExecutor != null) { logArchiveExecutor.shutdownNow(); } @@ -1049,7 +1041,7 @@ public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequen } protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) { - return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync); + return syncFutureCache.getIfPresentOrNew().reset(sequence).setForceSync(forceSync); } protected boolean isLogRollRequested() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 4b0f0410097d..eecd1fd1664a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -263,6 +263,14 @@ public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDi DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); } + /** + * Helper that marks the future as DONE and offers it back to the cache. + */ + private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) { + future.done(txid, t); + syncFutureCache.offer(future); + } + private static boolean waitingRoll(int epochAndState) { return (epochAndState & 1) != 0; } @@ -397,7 +405,7 @@ private int finishSyncLowerThanTxid(long txid) { for (Iterator iter = syncFutures.iterator(); iter.hasNext();) { SyncFuture sync = iter.next(); if (sync.getTxid() <= txid) { - sync.done(txid, null); + markFutureDoneAndOffer(sync, txid, null); iter.remove(); finished++; } else { @@ -416,7 +424,7 @@ private int finishSync() { long maxSyncTxid = highestSyncedTxid.get(); for (SyncFuture sync : syncFutures) { maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid()); - sync.done(maxSyncTxid, null); + markFutureDoneAndOffer(sync, maxSyncTxid, null); } highestSyncedTxid.set(maxSyncTxid); int finished = syncFutures.size(); @@ -531,7 +539,7 @@ private void drainNonMarkerEditsAndFailSyncs() { for (Iterator syncIter = syncFutures.iterator(); syncIter.hasNext();) { SyncFuture future = syncIter.next(); if (future.getTxid() < txid) { - future.done(future.getTxid(), error); + markFutureDoneAndOffer(future, future.getTxid(), error); syncIter.remove(); } else { break; @@ -796,7 +804,7 @@ protected void doShutdown() throws IOException { } } // and fail them - syncFutures.forEach(f -> f.done(f.getTxid(), error)); + syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error)); if (!(consumeExecutor instanceof EventLoop)) { consumeExecutor.shutdown(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 3efadc1d479a..178c599cc416 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -891,6 +891,14 @@ SyncFuture waitSafePoint(SyncFuture syncFuture) throws InterruptedException, return syncFuture; } + /** + * @return if the safepoint has been attained. + */ + @InterfaceAudience.Private + boolean isSafePointAttained() { + return this.safePointAttainedLatch.getCount() == 0; + } + /** * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals Thread * A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} is called @@ -977,6 +985,16 @@ private void cleanupOutstandingSyncsOnException(final long sequence, final Excep for (int i = 0; i < this.syncFuturesCount.get(); i++) { this.syncFutures[i].done(sequence, e); } + offerDoneSyncsBackToCache(); + } + + /** + * Offers the finished syncs back to the cache for reuse. + */ + private void offerDoneSyncsBackToCache() { + for (int i = 0; i < this.syncFuturesCount.get(); i++) { + syncFutureCache.offer(syncFutures[i]); + } this.syncFuturesCount.set(0); } @@ -1089,7 +1107,10 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en ? this.exception : new DamagedWALException("On sync", this.exception)); } attainSafePoint(sequence); - this.syncFuturesCount.set(0); + // It is critical that we offer the futures back to the cache for reuse here after the + // safe point is attained and all the clean up has been done. There have been + // issues with reusing sync futures early causing WAL lockups, see HBASE-25984. + offerDoneSyncsBackToCache(); } catch (Throwable t) { LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index 650f68c1363e..edba5df2aa34 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -90,7 +90,8 @@ synchronized SyncFuture reset(long txid) { @Override public synchronized String toString() { - return "done=" + isDone() + ", txid=" + this.txid; + return "done=" + isDone() + ", txid=" + this.txid + " threadID=" + t.getId() + + " threadName=" + t.getName(); } synchronized long getTxid() { @@ -106,6 +107,15 @@ synchronized SyncFuture setForceSync(boolean forceSync) { return this; } + /** + * Returns the thread that owned this sync future, use with caution as we return the reference to + * the actual thread object. + * @return the associated thread instance. + */ + Thread getThread() { + return t; + } + /** * @param txid the transaction id at which this future 'completed'. * @param t Can be null. Set if we are 'completing' on error (and this 't' is the error). diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFutureCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFutureCache.java new file mode 100644 index 000000000000..de3188f08976 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFutureCache.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.cache.Cache; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; + +/** + * A cache of {@link SyncFuture}s. This class supports two methods + * {@link SyncFutureCache#getIfPresentOrNew()} and {@link SyncFutureCache#offer()}. + * + * Usage pattern: + * SyncFuture sf = syncFutureCache.getIfPresentOrNew(); + * sf.reset(...); + * // Use the sync future + * finally: syncFutureCache.offer(sf); + * + * Offering the sync future back to the cache makes it eligible for reuse within the same thread + * context. Cache keyed by the accessing thread instance and automatically invalidated if it remains + * unused for {@link SyncFutureCache#SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS} minutes. + */ +@InterfaceAudience.Private +public final class SyncFutureCache { + + private static final long SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS = 2; + + private final Cache syncFutureCache; + + public SyncFutureCache(final Configuration conf) { + final int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); + syncFutureCache = CacheBuilder.newBuilder().initialCapacity(handlerCount) + .expireAfterWrite(SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS, TimeUnit.MINUTES).build(); + } + + public SyncFuture getIfPresentOrNew() { + // Invalidate the entry if a mapping exists. We do not want it to be reused at the same time. + SyncFuture future = syncFutureCache.asMap().remove(Thread.currentThread()); + return (future == null) ? new SyncFuture() : future; + } + + /** + * Offers the sync future back to the cache for reuse. + */ + public void offer(SyncFuture syncFuture) { + // It is ok to overwrite an existing mapping. + syncFutureCache.asMap().put(syncFuture.getThread(), syncFuture); + } + + public void clear() { + if (syncFutureCache != null) { + syncFutureCache.invalidateAll(); + } + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index e763896d8df7..49fa1dffd50b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.lang.reflect.Field; @@ -27,6 +29,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -34,6 +37,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; @@ -49,8 +53,10 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALProvider; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -67,6 +73,8 @@ public class TestFSHLog extends AbstractTestFSWAL { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestFSHLog.class); + private static final long TEST_TIMEOUT_MS = 10000; + @Rule public TestName name = new TestName(); @@ -131,6 +139,89 @@ public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldExcepti } } + /** + * Test for WAL stall due to sync future overwrites. See HBASE-25984. + */ + @Test + public void testDeadlockWithSyncOverwrites() throws Exception { + final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1); + + class FailingWriter implements WALProvider.Writer { + @Override public void sync(boolean forceSync) throws IOException { + throw new IOException("Injected failure.."); + } + + @Override public void append(WAL.Entry entry) throws IOException { + } + + @Override public long getLength() { + return 0; + } + + @Override + public long getSyncedLength() { + return 0; + } + + @Override public void close() throws IOException { + } + } + + /* + * Custom FSHLog implementation with a conditional wait before attaining safe point. + */ + class CustomFSHLog extends FSHLog { + public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String archiveDir, + Configuration conf, List listeners, boolean failIfWALExists, + String prefix, String suffix) throws IOException { + super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); + } + + @Override + protected void beforeWaitOnSafePoint() { + try { + assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public SyncFuture publishSyncOnRingBuffer() { + long sequence = getSequenceOnRingBuffer(); + return publishSyncOnRingBuffer(sequence, false); + } + } + + final String name = this.name.getMethodName(); + try (CustomFSHLog log = new CustomFSHLog(FS, CommonFSUtils.getRootDir(CONF), name, + HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { + log.setWriter(new FailingWriter()); + Field ringBufferEventHandlerField = + FSHLog.class.getDeclaredField("ringBufferEventHandler"); + ringBufferEventHandlerField.setAccessible(true); + FSHLog.RingBufferEventHandler ringBufferEventHandler = + (FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log); + // Force a safe point + FSHLog.SafePointZigZagLatch latch = ringBufferEventHandler.attainSafePoint(); + try { + SyncFuture future0 = log.publishSyncOnRingBuffer(); + // Wait for the sync to be done. + Waiter.waitFor(CONF, TEST_TIMEOUT_MS, future0::isDone); + // Publish another sync from the same thread, this should not overwrite the done sync. + SyncFuture future1 = log.publishSyncOnRingBuffer(); + assertFalse(future1.isDone()); + // Unblock the safe point trigger.. + blockBeforeSafePoint.countDown(); + // Wait for the safe point to be reached. + // With the deadlock in HBASE-25984, this is never possible, thus blocking the sync pipeline. + Waiter.waitFor(CONF, TEST_TIMEOUT_MS, latch::isSafePointAttained); + } finally { + // Force release the safe point, for the clean up. + latch.releaseSafePoint(); + } + } + } + /** * Test case for https://issues.apache.org/jira/browse/HBASE-16721 */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java new file mode 100644 index 000000000000..070aaf27a016 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSyncFutureCache.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestSyncFutureCache { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncFutureCache.class); + + @Test + public void testSyncFutureCacheLifeCycle() throws Exception { + final Configuration conf = HBaseConfiguration.create(); + SyncFutureCache cache = new SyncFutureCache(conf); + try { + SyncFuture future0 = cache.getIfPresentOrNew().reset(0); + assertNotNull(future0); + // Get another future from the same thread, should be different one. + SyncFuture future1 = cache.getIfPresentOrNew().reset(1); + assertNotNull(future1); + assertNotSame(future0, future1); + cache.offer(future1); + // Should override. + cache.offer(future0); + SyncFuture future3 = cache.getIfPresentOrNew(); + assertEquals(future3, future0); + final SyncFuture[] future4 = new SyncFuture[1]; + // From a different thread + CompletableFuture.runAsync(() -> future4[0] = cache.getIfPresentOrNew().reset(4)).get(); + assertNotNull(future4[0]); + assertNotSame(future3, future4[0]); + // Clean up + cache.offer(future3); + cache.offer(future4[0]); + } finally { + cache.clear(); + } + } +}