Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27126 Support multi-threads cleaner for MOB files #5833

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -4503,4 +4503,8 @@ protected String getDescription() {
}
});
}

public MobFileCleanerChore getMobFileCleanerChore() {
return mobFileCleanerChore;
}
}
Expand Up @@ -144,6 +144,9 @@ public final class MobConstants {
public static final String MOB_COMPACTION_THREADS_MAX = "hbase.mob.compaction.threads.max";
public static final int DEFAULT_MOB_COMPACTION_THREADS_MAX = 1;

public static final String MOB_CLEANER_THREAD_COUNT = "hbase.master.mob.cleaner.threads";
public static final int DEFAULT_MOB_CLEANER_THREAD_COUNT = 1;

private MobConstants() {

}
Expand Down
Expand Up @@ -17,9 +17,18 @@
*/
package org.apache.hadoop.hbase.mob;

import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableDescriptors;
Expand All @@ -31,6 +40,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* The class MobFileCleanerChore for running cleaner regularly to remove the expired and obsolete
* (files which have no active references to) mob files.
Expand All @@ -39,8 +50,10 @@
public class MobFileCleanerChore extends ScheduledChore {

private static final Logger LOG = LoggerFactory.getLogger(MobFileCleanerChore.class);
public static final int TIME_OUT_VALUE = 30;
private final HMaster master;
private ExpiredMobFileCleaner cleaner;
private final ExecutorService threadPool;

public MobFileCleanerChore(HMaster master) {
super(master.getServerName() + "-MobFileCleanerChore", master,
Expand All @@ -52,6 +65,16 @@ public MobFileCleanerChore(HMaster master) {
this.master = master;
cleaner = new ExpiredMobFileCleaner();
cleaner.setConf(master.getConfiguration());
int threadCount = master.getConfiguration().getInt(MobConstants.MOB_CLEANER_THREAD_COUNT,
MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT);

ThreadFactory threadFactory =
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("mobfile-cleaner-pool-%d").build();
if (threadCount == 1) {
threadPool = MoreExecutors.newDirectExecutorService();
} else {
threadPool = Executors.newFixedThreadPool(threadCount, threadFactory);
}
checkObsoleteConfigurations();
}

Expand Down Expand Up @@ -83,29 +106,43 @@ protected void chore() {
LOG.error("MobFileCleanerChore failed", e);
return;
}
List<Future> futureList = new ArrayList<>(map.size());
for (TableDescriptor htd : map.values()) {
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
try {
cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd);
} catch (IOException e) {
LOG.error("Failed to clean the expired mob files table={} family={}",
htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
}
}
}
Future<?> future = threadPool.submit(() -> handleOneTable(htd));
futureList.add(future);
}

for (Future future : futureList) {
try {
// Now clean obsolete files for a table
LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
try (final Admin admin = master.getConnection().getAdmin()) {
MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName(),
admin);
future.get(TIME_OUT_VALUE, TimeUnit.MINUTES);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.warn("Exception during the execution of MobFileCleanerChore", e);
}
}
}

private void handleOneTable(TableDescriptor htd) {
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
try {
cleaner.cleanExpiredMobFiles(htd.getTableName().getNameAsString(), hcd);
} catch (IOException e) {
LOG.error("Failed to clean the expired mob files table={} family={}",
htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
}
LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());
} catch (IOException e) {
LOG.error("Failed to clean the obsolete mob files for table={}", htd.getTableName(), e);
}
}
try {
// Now clean obsolete files for a table
LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
try (final Admin admin = master.getConnection().getAdmin()) {
MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName(),
admin);
}
LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());
} catch (IOException e) {
LOG.error("Failed to clean the obsolete mob files for table={}", htd.getTableName(), e);
}
}

}
@@ -0,0 +1,240 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;

import static org.apache.hadoop.hbase.mob.MobConstants.MOB_CLEANER_BATCH_SIZE_UPPER_BOUND;
import static org.junit.Assert.assertEquals;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(MediumTests.class)
public class TestExpiredMobFileCleanerChore {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestExpiredMobFileCleanerChore.class);

private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private final static TableName tableName = TableName.valueOf("TestExpiredMobFileCleaner");
private final static TableName tableName2 = TableName.valueOf("TestExpiredMobFileCleaner2");
private final static String family = "family";
private final static byte[] row1 = Bytes.toBytes("row1");
private final static byte[] row2 = Bytes.toBytes("row2");
private final static byte[] row3 = Bytes.toBytes("row3");
private final static byte[] qf = Bytes.toBytes("qf");

private static BufferedMutator table;
private static Admin admin;
private BufferedMutator table2;

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
TEST_UTIL.getConfiguration().setInt(MOB_CLEANER_BATCH_SIZE_UPPER_BOUND, 2);
}

@AfterClass
public static void tearDownAfterClass() throws Exception {

}

@Before
public void setUp() throws Exception {
TEST_UTIL.startMiniCluster(1);
}

@After
public void tearDown() throws Exception {
admin.disableTable(tableName);
admin.deleteTable(tableName);
admin.disableTable(tableName2);
admin.deleteTable(tableName2);
admin.close();
TEST_UTIL.shutdownMiniCluster();
TEST_UTIL.getTestFileSystem().delete(TEST_UTIL.getDataTestDir(), true);
}

private void init() throws Exception {
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);
ColumnFamilyDescriptor columnFamilyDescriptor =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).setMobEnabled(true)
.setMobThreshold(3L).setMaxVersions(4).build();
tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);

admin = TEST_UTIL.getAdmin();
admin.createTable(tableDescriptorBuilder.build());

table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
.getBufferedMutator(tableName);

TableDescriptorBuilder tableDescriptorBuilder2 = TableDescriptorBuilder.newBuilder(tableName2);
ColumnFamilyDescriptor columnFamilyDescriptor2 =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).setMobEnabled(true)
.setMobThreshold(3L).setMaxVersions(4).build();
tableDescriptorBuilder2.setColumnFamily(columnFamilyDescriptor2);
admin.createTable(tableDescriptorBuilder2.build());

table2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
.getBufferedMutator(tableName2);
}

private void modifyColumnExpiryDays(int expireDays) throws Exception {

// change ttl as expire days to make some row expired
int timeToLive = expireDays * secondsOfDay();
ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes(family)).setMobEnabled(true).setMobThreshold(3L);
columnFamilyDescriptorBuilder.setTimeToLive(timeToLive);

admin.modifyColumnFamily(tableName, columnFamilyDescriptorBuilder.build());

ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder2 = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes(family)).setMobEnabled(true).setMobThreshold(3L);
columnFamilyDescriptorBuilder2.setTimeToLive(timeToLive);

admin.modifyColumnFamily(tableName2, columnFamilyDescriptorBuilder2.build());
}

private void putKVAndFlush(BufferedMutator table, byte[] row, byte[] value, long ts,
TableName tableName) throws Exception {

Put put = new Put(row, ts);
put.addColumn(Bytes.toBytes(family), qf, value);
table.mutate(put);

table.flush();
admin.flush(tableName);
}

/**
* Creates a 3 day old hfile and an 1 day old hfile then sets expiry to 2 days. Verifies that the
* 3 day old hfile is removed but the 1 day one is still present after the expiry based cleaner is
* run.
*/
@Test
public void testCleaner() throws Exception {
init();

Path mobDirPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, family);

byte[] dummyData = makeDummyData(600);
long ts = EnvironmentEdgeManager.currentTime() - 3 * secondsOfDay() * 1000; // 3 days before
putKVAndFlush(table, row1, dummyData, ts, tableName);
FileStatus[] firstFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
// the first mob file
assertEquals("Before cleanup without delay 1", 1, firstFiles.length);
String firstFile = firstFiles[0].getPath().getName();

// 1.5 day before
ts = (long) (EnvironmentEdgeManager.currentTime() - 1.5 * secondsOfDay() * 1000);
putKVAndFlush(table, row2, dummyData, ts, tableName);
FileStatus[] secondFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
// now there are 2 mob files
assertEquals("Before cleanup without delay 2", 2, secondFiles.length);
String f1 = secondFiles[0].getPath().getName();
String f2 = secondFiles[1].getPath().getName();
String secondFile = f1.equals(firstFile) ? f2 : f1;

ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before
putKVAndFlush(table, row3, dummyData, ts, tableName);
ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before
putKVAndFlush(table, row3, dummyData, ts, tableName);
FileStatus[] thirdFiles = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
// now there are 4 mob files
assertEquals("Before cleanup without delay 3", 4, thirdFiles.length);

// modifyColumnExpiryDays(2); // ttl = 2, make the first row expired

// for table 2
Path mobDirPath2 = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName2, family);

byte[] dummyData2 = makeDummyData(600);

putKVAndFlush(table2, row1, dummyData2, ts, tableName2);
FileStatus[] firstFiles2 = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
// the first mob file
assertEquals("Before cleanup without delay 1", 1, firstFiles2.length);
String firstFile2 = firstFiles2[0].getPath().getName();

// 1.5 day before
ts = (long) (EnvironmentEdgeManager.currentTime() - 1.5 * secondsOfDay() * 1000);
putKVAndFlush(table2, row2, dummyData2, ts, tableName2);
FileStatus[] secondFiles2 = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
// now there are 2 mob files
assertEquals("Before cleanup without delay 2", 2, secondFiles2.length);
String f1Second = secondFiles2[0].getPath().getName();
String f2Second = secondFiles2[1].getPath().getName();
String secondFile2 = f1Second.equals(firstFile2) ? f2Second : f1Second;
ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before
putKVAndFlush(table2, row3, dummyData2, ts, tableName2);
ts = EnvironmentEdgeManager.currentTime() - 4 * secondsOfDay() * 1000; // 4 days before
putKVAndFlush(table2, row3, dummyData2, ts, tableName2);
FileStatus[] thirdFiles2 = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
// now there are 4 mob files
assertEquals("Before cleanup without delay 3", 4, thirdFiles2.length);

modifyColumnExpiryDays(2); // ttl = 2, make the first row expired

// run the cleaner chore
MobFileCleanerChore mobFileCleanerChore =
TEST_UTIL.getMiniHBaseCluster().getMaster().getMobFileCleanerChore();
mobFileCleanerChore.chore();

FileStatus[] filesAfterClean = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath);
String lastFile = filesAfterClean[0].getPath().getName();
// there are 4 mob files in total, but only 3 need to be cleaned
assertEquals("After cleanup without delay 1", 1, filesAfterClean.length);
assertEquals("After cleanup without delay 2", secondFile, lastFile);

filesAfterClean = TEST_UTIL.getTestFileSystem().listStatus(mobDirPath2);
lastFile = filesAfterClean[0].getPath().getName();
// there are 4 mob files in total, but only 3 need to be cleaned
assertEquals("After cleanup without delay 1", 1, filesAfterClean.length);
assertEquals("After cleanup without delay 2", secondFile2, lastFile);
}

private int secondsOfDay() {
return 24 * 3600;
}

private byte[] makeDummyData(int size) {
byte[] dummyData = new byte[size];
Bytes.random(dummyData);
return dummyData;
}
}