Skip to content

Commit

Permalink
HBASE-23680 RegionProcedureStore missing cleaning of hfile archive
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Jan 11, 2020
1 parent 4ed4669 commit e8c29cb
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,28 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -74,6 +82,8 @@ class RegionFlusherAndCompactor implements Closeable {

private static final int DEFAULT_COMPACT_MIN = 4;

private final Configuration conf;

private final Abortable abortable;

private final HRegion region;
Expand Down Expand Up @@ -109,6 +119,7 @@ class RegionFlusherAndCompactor implements Closeable {
private volatile boolean closed = false;

RegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region) {
this.conf = conf;
this.abortable = abortable;
this.region = region;
flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
Expand Down Expand Up @@ -136,9 +147,30 @@ static void setupConf(Configuration conf) {
flushPerChanges, flushIntervalMs);
}

private void deleteCompactedHFiles() throws IOException {
HStore store = Iterables.getOnlyElement(region.getStores());
store.closeAndArchiveCompactedFiles();
// for now we just deleted these HFiles, without moving them to the global archive directory.
// This is because that, our HFiles are on the WAL file system, but the global HFile archive
// directory is on the root(HFile) file system, we can not move between two different file
// systems.
Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, region.getRegionInfo(),
store.getColumnFamilyDescriptor().getName());
FileSystem fs = archiveDir.getFileSystem(conf);
if (LOG.isDebugEnabled()) {
FileStatus[] toDelete = fs.listStatus(archiveDir);
if (toDelete != null && toDelete.length > 0) {
LOG.debug("Delete all archived HFiles under {}: {}", archiveDir, Stream.of(toDelete)
.map(s -> s.getPath().getName()).collect(Collectors.joining(", ", "[", "]")));
}
}
fs.delete(archiveDir, true);
}

private void compact() {
try {
region.compact(true);
deleteCompactedHFiles();
} catch (IOException e) {
LOG.error("Failed to compact procedure store region", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ public class RegionProcedureStore extends ProcedureStoreBase {
@VisibleForTesting
HRegion region;

private RegionFlusherAndCompactor flusherAndCompactor;
@VisibleForTesting
RegionFlusherAndCompactor flusherAndCompactor;

@VisibleForTesting
RegionProcedureStoreWALRoller walRoller;
Expand Down Expand Up @@ -361,8 +362,8 @@ public void handleCorrupted(ProcedureIterator procIter) throws IOException {
LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
}
if (!fs.delete(procWALDir, true)) {
throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " +
procWALDir);
throw new IOException(
"Failed to delete the WALProcedureStore migrated proc wal directory " + procWALDir);
}
LOG.info("Migration of WALProcedureStore finished");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;

@Category({ MasterTests.class, MediumTests.class })
public class TestRegionProcedureStoreCompaction {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionProcedureStoreCompaction.class);

private HBaseCommonTestingUtility htu;

private FileSystem fs;

private RegionProcedureStore store;

private int compactMin = 4;

@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
Configuration conf = htu.getConfiguration();
conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
Path testDir = htu.getDataTestDir();
fs = testDir.getFileSystem(conf);
CommonFSUtils.setWALRootDir(conf, testDir);
conf.setInt(RegionFlusherAndCompactor.COMPACT_MIN_KEY, compactMin);
store = RegionProcedureStoreTestHelper.createStore(conf, new LoadCounter());
}

@After
public void tearDown() throws IOException {
store.stop(true);
htu.cleanupTestDir();
}

private int getStorefilesCount() {
return Iterables.getOnlyElement(store.region.getStores()).getStorefilesCount();
}

@Test
public void test() throws IOException {
for (int i = 0; i < compactMin - 1; i++) {
store.insert(new RegionProcedureStoreTestProcedure(), null);
store.region.flush(true);
}
assertEquals(compactMin - 1, getStorefilesCount());
store.insert(new RegionProcedureStoreTestProcedure(), null);
store.flusherAndCompactor.requestFlush();
htu.waitFor(15000, () -> getStorefilesCount() == 1);
Path tableArchivePath = HFileArchiveUtil.getTableArchivePath(
new Path(htu.getDataTestDir(), RegionProcedureStore.MASTER_PROCEDURE_DIR),
RegionProcedureStore.TABLE_NAME);
// we should have created this directory, when archiving compacted files
htu.waitFor(15000, () -> fs.exists(tableArchivePath));
// and then we deleted the store file directory, for deleting the archived compacted files
htu.waitFor(15000, () -> !fs.listFiles(tableArchivePath, true).hasNext());
}
}

0 comments on commit e8c29cb

Please sign in to comment.