From 2a77aa0f6ff6a4f68d6efab4577c933476a08a37 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Sat, 10 Dec 2022 12:39:51 +0800 Subject: [PATCH] HBASE-27519 Another case for FNFE on StoreFileScanner after a flush followed by a compaction --- .../regionserver/ChangedReadersObserver.java | 8 +- .../hadoop/hbase/regionserver/HStore.java | 25 ++++- .../hadoop/hbase/regionserver/HStoreFile.java | 25 +++++ .../hadoop/hbase/regionserver/TestHStore.java | 102 ++++++++++++++++++ 4 files changed, 155 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java index d45a80468738..9ad93395a7e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java @@ -31,7 +31,13 @@ public interface ChangedReadersObserver { long getReadPoint(); /** - * Notify observers. + * Notify observers.
+ * NOTE:Before we invoke this method,{@link HStoreFile#increaseRefCount} is invoked for every + * {@link HStoreFile} in 'sfs' input parameter to prevent {@link HStoreFile} is archived after a + * concurrent compaction, and after this method is invoked,{@link HStoreFile#decreaseRefCount} is + * invoked.So if you open the {@link StoreFileReader} or {@link StoreFileScanner} asynchronously + * in this method,you may need to invoke {@link HStoreFile#increaseRefCount} or + * {@link HStoreFile#decreaseRefCount} by yourself to prevent the {@link HStoreFile}s be archived. * @param sfs The new files * @param memStoreScanners scanner of current memstore * @throws IOException e diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index fc3dfd340ac5..b132a1531633 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1195,6 +1195,13 @@ private boolean updateStorefiles(List sfs, long snapshotId) throws I if (snapshotId > 0) { this.memstore.clearSnapshot(snapshotId); } + // NOTE: here we must increase the refCount for storeFiles because we would open the + // storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers. + // If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by + // CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because + // HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under + // storeEngine lock. see HBASE-27519 for more details. + HStoreFile.increaseStoreFilesRefeCount(sfs); } finally { // We need the lock, as long as we are updating the storeFiles // or changing the memstore. Let us release it before calling @@ -1204,8 +1211,12 @@ private boolean updateStorefiles(List sfs, long snapshotId) throws I this.lock.writeLock().unlock(); } - // notify to be called here - only in case of flushes - notifyChangedReadersObservers(sfs); + try { + // notify to be called here - only in case of flushes + notifyChangedReadersObservers(sfs); + } finally { + HStoreFile.decreaseStoreFilesRefeCount(sfs); + } if (LOG.isTraceEnabled()) { long totalSize = getTotalSize(sfs); String traceMessage = "FLUSH time,count,size,store size,store files [" @@ -1273,7 +1284,13 @@ public List getScanners(boolean cacheBlocks, boolean usePread, storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, includeStartRow, stopRow, includeStopRow); memStoreScanners = this.memstore.getScanners(readPt); - storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.incrementAndGet()); + // NOTE: here we must increase the refCount for storeFiles because we would open the + // storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here, + // HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the + // storeFiles after a concurrent compaction.Because HStore.requestCompaction is under + // storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484 + // for more details. + HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan); } finally { this.lock.readLock().unlock(); } @@ -1294,7 +1311,7 @@ public List getScanners(boolean cacheBlocks, boolean usePread, clearAndClose(memStoreScanners); throw t instanceof IOException ? (IOException) t : new IOException(t); } finally { - storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.decrementAndGet()); + HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java index 29203b26e779..984b081f7743 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -46,6 +47,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; /** @@ -609,4 +612,26 @@ public OptionalLong getMaximumTimestamp() { Set getCompactedStoreFiles() { return Collections.unmodifiableSet(this.compactedStoreFiles); } + + long increaseRefCount() { + return this.fileInfo.refCount.incrementAndGet(); + } + + long decreaseRefCount() { + return this.fileInfo.refCount.decrementAndGet(); + } + + static void increaseStoreFilesRefeCount(Collection storeFiles) { + if (CollectionUtils.isEmpty(storeFiles)) { + return; + } + storeFiles.forEach(HStoreFile::increaseRefCount); + } + + static void decreaseStoreFilesRefeCount(Collection storeFiles) { + if (CollectionUtils.isEmpty(storeFiles)) { + return; + } + storeFiles.forEach(HStoreFile::decreaseRefCount); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index d2e9ac944da8..f8009f85358a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.FileNotFoundException; import java.io.IOException; import java.lang.ref.SoftReference; import java.security.PrivilegedExceptionAction; @@ -44,6 +45,7 @@ import java.util.NavigableSet; import java.util.Optional; import java.util.TreeSet; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -1444,6 +1446,106 @@ public void getScanners(MyStore store) throws IOException { } } + /** + * This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the + * Compaction execute concurrently and theCcompaction compact and archive the flushed + * {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before + * HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}. + */ + @Test + public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException { + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(WALFactory.WAL_ENABLED, false); + conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName()); + byte[] r0 = Bytes.toBytes("row0"); + byte[] r1 = Bytes.toBytes("row1"); + final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); + final AtomicBoolean shouldWaitRef = new AtomicBoolean(false); + // Initialize region + final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() { + @Override + public void getScanners(MyStore store) throws IOException { + try { + // Here this method is called by StoreScanner.updateReaders which is invoked by the + // following TestHStore.flushStore + if (shouldWaitRef.get()) { + // wait the following compaction Task start + cyclicBarrier.await(); + // wait the following HStore.closeAndArchiveCompactedFiles end. + cyclicBarrier.await(); + } + } catch (BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + + final AtomicReference compactionExceptionRef = new AtomicReference(null); + Runnable compactionTask = () -> { + try { + // Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for + // entering the MyStore.getScanners, compactionTask could start. + cyclicBarrier.await(); + region.compactStore(family, new NoLimitThroughputController()); + myStore.closeAndArchiveCompactedFiles(); + // Notify StoreScanner.updateReaders could enter MyStore.getScanners. + cyclicBarrier.await(); + } catch (Throwable e) { + compactionExceptionRef.set(e); + } + }; + + long ts = EnvironmentEdgeManager.currentTime(); + long seqId = 100; + byte[] value = Bytes.toBytes("value"); + // older data whihc shouldn't be "seen" by client + myStore.add(createCell(r0, qf1, ts, seqId, value), null); + flushStore(myStore, id++); + myStore.add(createCell(r0, qf2, ts, seqId, value), null); + flushStore(myStore, id++); + myStore.add(createCell(r0, qf3, ts, seqId, value), null); + TreeSet quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); + quals.add(qf1); + quals.add(qf2); + quals.add(qf3); + + myStore.add(createCell(r1, qf1, ts, seqId, value), null); + myStore.add(createCell(r1, qf2, ts, seqId, value), null); + myStore.add(createCell(r1, qf3, ts, seqId, value), null); + + Thread.currentThread() + .setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread"); + Scan scan = new Scan(); + scan.withStartRow(r0, true); + try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) { + List results = new MyList<>(size -> { + switch (size) { + case 1: + shouldWaitRef.set(true); + Thread thread = new Thread(compactionTask); + thread.setName("MyCompacting Thread."); + thread.start(); + try { + flushStore(myStore, id++); + thread.join(); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + shouldWaitRef.set(false); + break; + default: + break; + } + }); + // Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile + // which used by StoreScanner.updateReaders is deleted by compactionTask. + scanner.next(results); + // The results is r0 row cells. + assertEquals(3, results.size()); + assertTrue(compactionExceptionRef.get() == null); + } + } + @Test public void testReclaimChunkWhenScaning() throws IOException { init("testReclaimChunkWhenScaning");