Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27752: Update the list of prefetched files upon region movement… #5222

Merged
merged 1 commit into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public static void request(Path path, Runnable runnable) {
public static void complete(Path path) {
prefetchFutures.remove(path);
prefetchCompleted.put(path.getName(), true);
LOG.debug("Prefetch completed for {}", path);
LOG.debug("Prefetch completed for {}", path.getName());
}

public static void cancel(Path path) {
Expand All @@ -134,7 +134,8 @@ public static void cancel(Path path) {
prefetchFutures.remove(path);
LOG.debug("Prefetch cancelled for {}", path);
}
prefetchCompleted.remove(path.getName());
LOG.debug("Removing filename from the prefetched persistence list: {}", path.getName());
removePrefetchedFileWhileEvict(path.getName());
}

public static boolean isCompleted(Path path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,10 @@ private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String
}
}

public boolean isCachePersistenceEnabled() {
return (prefetchedFileListPath != null) && (persistencePath != null);
}

/**
* Cache the block with the specified name and buffer.
* @param cacheKey block's cache key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
Expand Down Expand Up @@ -1550,11 +1552,35 @@ public Map<byte[], List<HStoreFile>> close() throws IOException {
public static final String CLOSE_WAIT_INTERVAL = "hbase.regionserver.close.wait.interval.ms";
public static final long DEFAULT_CLOSE_WAIT_INTERVAL = 10000; // 10 seconds

public Map<byte[], List<HStoreFile>> close(boolean abort) throws IOException {
return close(abort, false);
}

/**
* Close this HRegion.
* @param abort true if server is aborting (only during testing)
* @param ignoreStatus true if ignore the status (won't be showed on task list)
* @return Vector of all the storage files that the HRegion's component HStores make use of. It's
* a list of StoreFile objects. Can be null if we are not to close at this time, or we are
* already closed.
* @throws IOException e
* @throws DroppedSnapshotException Thrown when replay of wal is required because a Snapshot was
* not properly persisted. The region is put in closing mode, and
* the caller MUST abort after this.
*/
public Map<byte[], List<HStoreFile>> close(boolean abort, boolean ignoreStatus)
throws IOException {
return close(abort, ignoreStatus, false);
}

/**
* Close down this HRegion. Flush the cache unless abort parameter is true, Shut down each HStore,
* don't service any more calls. This method could take some time to execute, so don't call it
* from a time-sensitive thread.
* @param abort true if server is aborting (only during testing)
* @param abort true if server is aborting (only during testing)
* @param ignoreStatus true if ignore the status (wont be showed on task list)
* @param isGracefulStop true if region is being closed during graceful stop and the blocks in the
* BucketCache should not be evicted.
* @return Vector of all the storage files that the HRegion's component HStores make use of. It's
* a list of StoreFile objects. Can be null if we are not to close at this time or we are
* already closed.
Expand All @@ -1563,7 +1589,8 @@ public Map<byte[], List<HStoreFile>> close() throws IOException {
* not properly persisted. The region is put in closing mode, and
* the caller MUST abort after this.
*/
public Map<byte[], List<HStoreFile>> close(boolean abort) throws IOException {
public Map<byte[], List<HStoreFile>> close(boolean abort, boolean ignoreStatus,
boolean isGracefulStop) throws IOException {
// Only allow one thread to close at a time. Serialize them so dual
// threads attempting to close will run up against each other.
MonitoredTask status = TaskMonitor.get().createStatus(
Expand All @@ -1572,6 +1599,22 @@ public Map<byte[], List<HStoreFile>> close(boolean abort) throws IOException {
status.setStatus("Waiting for close lock");
try {
synchronized (closeLock) {
if (isGracefulStop && rsServices != null) {
rsServices.getBlockCache().ifPresent(blockCache -> {
if (blockCache instanceof CombinedBlockCache) {
BlockCache l2 = ((CombinedBlockCache) blockCache).getSecondLevelCache();
if (l2 instanceof BucketCache) {
if (((BucketCache) l2).isCachePersistenceEnabled()) {
LOG.info(
"Closing region {} during a graceful stop, and cache persistence is on, "
+ "so setting evict on close to false. ",
this.getRegionInfo().getRegionNameAsString());
this.getStores().forEach(s -> s.getCacheConfig().setEvictOnClose(false));
}
}
}
});
}
return doClose(abort, status);
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void process() throws IOException {
}

// Close the region
if (region.close(abort) == null) {
if (region.close(abort, false, true) == null) {
// This region has already been closed. Should not happen (A unit test makes this
// happen as a side effect, TestRegionObserverInterface.testPreWALAppendNotCalledOnMetaEdit)
LOG.warn("Can't close {}; already closed", name);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ IOTests.class, MediumTests.class })
public class TestBlockEvictionOnRegionMovement {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBlockEvictionOnRegionMovement.class);

private static final Logger LOG =
LoggerFactory.getLogger(TestBlockEvictionOnRegionMovement.class);

private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();

private Configuration conf;
Path testDir;
MiniZooKeeperCluster zkCluster;
MiniHBaseCluster cluster;
StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build();

@Before
public void setup() throws Exception {
conf = TEST_UTIL.getConfiguration();
testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);

conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache");
conf.setInt("hbase.bucketcache.size", 400);
conf.set("hbase.bucketcache.persistent.path", testDir + "/bucket.persistence");
conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir + "/prefetch.persistence");
conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 100);
conf.setBoolean(CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY, true);
zkCluster = TEST_UTIL.startMiniZKCluster();
cluster = TEST_UTIL.startMiniHBaseCluster(option);
cluster.setConf(conf);
}

@Test
public void testBlockEvictionOnRegionMove() throws Exception {
// Write to table and flush
TableName tableRegionMove = writeDataToTable();

HRegionServer regionServingRS =
cluster.getRegionServer(1).getRegions(tableRegionMove).size() == 1
? cluster.getRegionServer(1)
: cluster.getRegionServer(0);
assertTrue(regionServingRS.getBlockCache().isPresent());
long oldUsedCacheSize =
regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());

Admin admin = TEST_UTIL.getAdmin();
RegionInfo regionToMove = regionServingRS.getRegions(tableRegionMove).get(0).getRegionInfo();
admin.move(regionToMove.getEncodedNameAsBytes(),
TEST_UTIL.getOtherRegionServer(regionServingRS).getServerName());
assertEquals(0, regionServingRS.getRegions(tableRegionMove).size());

long newUsedCacheSize =
regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
assertTrue(oldUsedCacheSize > newUsedCacheSize);
assertEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
}

@Test
public void testBlockEvictionOnGracefulStop() throws Exception {
// Write to table and flush
TableName tableRegionClose = writeDataToTable();

HRegionServer regionServingRS =
cluster.getRegionServer(1).getRegions(tableRegionClose).size() == 1
? cluster.getRegionServer(1)
: cluster.getRegionServer(0);

assertTrue(regionServingRS.getBlockCache().isPresent());
long oldUsedCacheSize =
regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());

cluster.stopRegionServer(regionServingRS.getServerName());
Thread.sleep(500);
cluster.startRegionServer();
Thread.sleep(500);

long newUsedCacheSize =
regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
assertEquals(oldUsedCacheSize, newUsedCacheSize);
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
}

public TableName writeDataToTable() throws IOException, InterruptedException {
TableName tableName = TableName.valueOf("table1");
byte[] row0 = Bytes.toBytes("row1");
byte[] row1 = Bytes.toBytes("row2");
byte[] family = Bytes.toBytes("family");
byte[] qf1 = Bytes.toBytes("qf1");
byte[] qf2 = Bytes.toBytes("qf2");
byte[] value1 = Bytes.toBytes("value1");
byte[] value2 = Bytes.toBytes("value2");

TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
Table table = TEST_UTIL.createTable(td, null);
try {
// put data
Put put0 = new Put(row0);
put0.addColumn(family, qf1, 1, value1);
table.put(put0);
Put put1 = new Put(row1);
put1.addColumn(family, qf2, 1, value2);
table.put(put1);
TEST_UTIL.flush(tableName);
} finally {
Thread.sleep(1000);
}
assertEquals(1, cluster.getRegions(tableName).size());
return tableName;
}

@After
public void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir));
if (zkCluster != null) {
zkCluster.shutdown();
}
}
}