From ad130b4ab532e2adb7fa68fd1cc4166462057c65 Mon Sep 17 00:00:00 2001 From: chenxu14 Date: Fri, 6 Dec 2019 12:07:14 +0800 Subject: [PATCH] HBASE-23355 Bypass the prefetch operation if HFiles are generated through flush or compaction --- .../hbase/io/hfile/HFilePreadReader.java | 2 +- .../hadoop/hbase/io/hfile/ReaderContext.java | 8 +- .../hbase/io/hfile/ReaderContextBuilder.java | 9 +- .../hadoop/hbase/regionserver/HStore.java | 7 +- .../hbase/regionserver/StoreFileInfo.java | 7 + .../hadoop/hbase/io/hfile/TestPrefetch.java | 6 +- .../hbase/io/hfile/TestPrefetchOnOpen.java | 127 ++++++++++++++++++ 7 files changed, 161 insertions(+), 5 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchOnOpen.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 98fe885de516..099f1882698a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -35,7 +35,7 @@ public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf, Configuration conf) throws IOException { super(context, fileInfo, cacheConf, conf); // Prefetch file blocks upon open if requested - if (cacheConf.shouldPrefetchOnOpen()) { + if (context.shouldPrefetchOnOpen()) { PrefetchExecutor.request(path, new Runnable() { @Override public void run() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java index bd3d63dab0c6..a6fa46902884 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContext.java @@ -39,15 +39,17 @@ public enum ReaderType { private final HFileSystem hfs; private final boolean primaryReplicaReader; private final ReaderType type; + private final boolean prefetchOnOpen; public ReaderContext(Path filePath, FSDataInputStreamWrapper fsdis, long fileSize, - HFileSystem hfs, boolean primaryReplicaReader, ReaderType type) { + HFileSystem hfs, boolean primaryReplicaReader, ReaderType type, boolean prefetchOnOpen) { this.filePath = filePath; this.fsdis = fsdis; this.fileSize = fileSize; this.hfs = hfs; this.primaryReplicaReader = primaryReplicaReader; this.type = type; + this.prefetchOnOpen = prefetchOnOpen; } public Path getFilePath() { @@ -73,4 +75,8 @@ public boolean isPrimaryReplicaReader() { public ReaderType getReaderType() { return this.type; } + + public boolean shouldPrefetchOnOpen() { + return this.prefetchOnOpen; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java index c58d5b8ce077..76abc5bd0bc2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ReaderContextBuilder.java @@ -40,6 +40,7 @@ public class ReaderContextBuilder { private HFileSystem hfs; private boolean primaryReplicaReader = true; private ReaderType type = ReaderType.PREAD; + private boolean pefetchOnOpen = false; public ReaderContextBuilder() {} @@ -82,6 +83,11 @@ public ReaderContextBuilder withReaderType(ReaderType type) { return this; } + public ReaderContextBuilder withPrefetchOnOpen(boolean pefetchOnOpen) { + this.pefetchOnOpen = pefetchOnOpen; + return this; + } + @VisibleForTesting public ReaderContextBuilder withFileSystemAndPath(FileSystem fs, Path filePath) throws IOException { @@ -94,7 +100,8 @@ public ReaderContextBuilder withFileSystemAndPath(FileSystem fs, Path filePath) public ReaderContext build() { validateFields(); - return new ReaderContext(filePath, fsdis, fileSize, hfs, primaryReplicaReader, type); + return new ReaderContext(filePath, fsdis, fileSize, hfs, primaryReplicaReader, + type, pefetchOnOpen); } private void validateFields() throws IllegalArgumentException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c7ecfca9682d..8169b3893abb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -575,6 +575,7 @@ private List openStoreFiles(Collection files, boolean int totalValidStoreFile = 0; for (StoreFileInfo storeFileInfo : files) { + storeFileInfo.setPrefetchOnOpen(cacheConf.shouldPrefetchOnOpen()); // open each store file in parallel completionService.submit(() -> this.createStoreFileAndReader(storeFileInfo)); totalValidStoreFile++; @@ -899,7 +900,10 @@ public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as " + dstPath + " - updating store file list."); - HStoreFile sf = createStoreFileAndReader(dstPath); + StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), + dstPath, isPrimaryReplicaStore()); + info.setPrefetchOnOpen(cacheConf.shouldPrefetchOnOpen()); + HStoreFile sf = createStoreFileAndReader(info); bulkLoadHFile(sf); LOG.info("Successfully loaded store file {} into store {} (new location: {})", @@ -909,6 +913,7 @@ public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws } public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { + fileInfo.setPrefetchOnOpen(cacheConf.shouldPrefetchOnOpen()); HStoreFile sf = createStoreFileAndReader(fileInfo); bulkLoadHFile(sf); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java index 15ed359a8822..c29e89685e71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java @@ -107,6 +107,8 @@ public class StoreFileInfo { private RegionCoprocessorHost coprocessorHost; + private boolean prefetchOnOpen = false; + // timestamp on when the file was created, is 0 and ignored for reference or link files private long createdTimestamp; @@ -321,6 +323,7 @@ ReaderContext createReaderContext(boolean doDropBehind, long readahead, ReaderTy .withFileSize(length) .withPrimaryReplicaReader(this.primaryReplica) .withReaderType(type) + .withPrefetchOnOpen(prefetchOnOpen) .withFileSystem(fs); if (this.reference != null) { contextBuilder.withFilePath(this.getPath()); @@ -643,6 +646,10 @@ HFileInfo getHFileInfo() { return hfileInfo; } + void setPrefetchOnOpen(boolean prefetchOnOpen) { + this.prefetchOnOpen = prefetchOnOpen; + } + void initHDFSBlocksDistribution() throws IOException { hdfsBlocksDistribution = computeHDFSBlocksDistribution(fs); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index 136568006816..1fc1a828343a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -116,7 +116,11 @@ private void readStoreFileLikeScanner(Path storeFilePath) throws Exception { private void readStoreFile(Path storeFilePath) throws Exception { // Open the file - HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); + ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, storeFilePath) + .withPrefetchOnOpen(true).build(); + HFileInfo hfile = new HFileInfo(context, conf); + HFile.Reader reader = HFile.createReader(context, hfile, cacheConf, conf); + hfile.initMetaAndIndex(reader); while (!reader.prefetchComplete()) { // Sleep for a bit diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchOnOpen.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchOnOpen.java new file mode 100644 index 000000000000..1dbca295e185 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchOnOpen.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({IOTests.class, MediumTests.class}) +public class TestPrefetchOnOpen { + private static final String FAMILYNAME = "fam"; + private static final byte[] FAMILYBYTES = Bytes.toBytes(FAMILYNAME); + private static final byte[] COLBYTES = Bytes.toBytes("col"); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestPrefetchOnOpen.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @Rule + public TestName name = new TestName(); + + @Before + public void setUp() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); + TEST_UTIL.startMiniCluster(1); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testPrefetchWithRegionOpenOnly() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + Table table = TEST_UTIL.createTable(tableName, FAMILYNAME); + List regions = TEST_UTIL.getHBaseCluster().getRegions(tableName); + assertEquals(1, regions.size()); + HRegion region = regions.get(0); + ServerName sn = TEST_UTIL.getMiniHBaseCluster().getServerHoldingRegion(tableName, + region.getRegionInfo().getRegionName()); + BlockCache cache = TEST_UTIL.getHBaseCluster().getRegionServer(sn).getBlockCache().get(); + + writeSomeRecords(table); + TEST_UTIL.flush(tableName); + + Path regionDir = FSUtils.getRegionDirFromRootDir( + TEST_UTIL.getDefaultRootDirPath(), region.getRegionInfo()); + Path famDir = FSUtils.getFamilyDirs(TEST_UTIL.getTestFileSystem(), regionDir).get(0); + FileStatus[] files = TEST_UTIL.getTestFileSystem().listStatus(famDir); + waitUntilPrefetchComplete(files); + // no prefetch to do with flush + assertEquals(0, cache.getBlockCount()); + + writeSomeRecords(table); + TEST_UTIL.flush(tableName); + TEST_UTIL.compact(tableName, true); + + files = TEST_UTIL.getTestFileSystem().listStatus(famDir); + waitUntilPrefetchComplete(files); + // no prefetch to do with compaction + assertEquals(0, cache.getBlockCount()); + + TEST_UTIL.getAdmin().disableTable(tableName); + TEST_UTIL.getAdmin().enableTable(tableName); + files = TEST_UTIL.getTestFileSystem().listStatus(famDir); + waitUntilPrefetchComplete(files); + // do prefetch with region open + assertEquals(1, cache.getBlockCount()); + } + + private void writeSomeRecords(Table table) throws IOException { + for (int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes("row" + i)); + put.addColumn(FAMILYBYTES, COLBYTES, Bytes.toBytes("value" + i)); + table.put(put); + } + } + + private void waitUntilPrefetchComplete(FileStatus[] files) throws InterruptedException { + for (FileStatus file : files) { + while (!PrefetchExecutor.isCompleted(file.getPath())) { + // Sleep for a bit + Thread.sleep(1000); + } + } + } +}