Skip to content

Commit

Permalink
HBASE-25972 Dual File Compaction (#5545)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
  • Loading branch information
kadirozde committed May 17, 2024
1 parent 716adf5 commit 00f078a
Show file tree
Hide file tree
Showing 26 changed files with 1,272 additions and 319 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
"Run random seek scan with both start and stop row (max 10000 rows)");
addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
addCommandDescriptor(RandomDeleteTest.class, "randomDelete", "Run random delete test");
addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
addCommandDescriptor(SequentialDeleteTest.class, "sequentialDelete",
"Run sequential delete test");
addCommandDescriptor(MetaWriteTest.class, "metaWrite",
"Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
Expand Down Expand Up @@ -352,7 +355,8 @@ static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
boolean needsDelete = false, exists = admin.tableExists(tableName);
boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
|| opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
if (!exists && isReadCmd) {
boolean isDeleteCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("delete");
if (!exists && (isReadCmd || isDeleteCmd)) {
throw new IllegalStateException(
"Must specify an existing table for read commands. Run a write command first.");
}
Expand All @@ -367,7 +371,8 @@ static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
&& opts.presplitRegions != admin.getRegions(tableName).size())
|| (!isReadCmd && desc != null
&& !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
|| (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
|| (!(isReadCmd || isDeleteCmd) && desc != null
&& desc.getRegionReplication() != opts.replicas)
|| (desc != null && desc.getColumnFamilyCount() != opts.families)
) {
needsDelete = true;
Expand Down Expand Up @@ -2071,6 +2076,18 @@ protected byte[] generateRow(final int i) {

}

static class RandomDeleteTest extends SequentialDeleteTest {
RandomDeleteTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}

@Override
protected byte[] generateRow(final int i) {
return getRandomRow(this.rand, opts.totalRows);
}

}

static class ScanTest extends TableTest {
private ResultScanner testScanner;

Expand Down Expand Up @@ -2406,6 +2423,34 @@ boolean testRow(final int i, final long startTime) throws IOException {
}
}

static class SequentialDeleteTest extends BufferedMutatorTest {

SequentialDeleteTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}

protected byte[] generateRow(final int i) {
return format(i);
}

@Override
boolean testRow(final int i, final long startTime) throws IOException {
byte[] row = generateRow(i);
Delete delete = new Delete(row);
for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
delete.addFamily(familyName);
}
delete.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
if (opts.autoFlush) {
table.delete(delete);
} else {
mutator.mutate(delete);
}
return true;
}
}

/*
* Insert fake regions into meta table with contiguous split keys.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ private void printMeta(HFile.Reader reader, Map<byte[], byte[]> fileInfo) throws
Bytes.equals(e.getKey(), HStoreFile.MAJOR_COMPACTION_KEY)
|| Bytes.equals(e.getKey(), HFileInfo.TAGS_COMPRESSED)
|| Bytes.equals(e.getKey(), HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY)
|| Bytes.equals(e.getKey(), HStoreFile.HISTORICAL_KEY)
) {
out.println(Bytes.toBoolean(e.getValue()));
} else if (Bytes.equals(e.getKey(), HFileInfo.LASTKEY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private boolean isCompactedFile(FileStatus file, HStore store) {
}

private boolean isActiveStorefile(FileStatus file, HStore store) {
return store.getStoreEngine().getStoreFileManager().getStorefiles().stream()
return store.getStoreEngine().getStoreFileManager().getStoreFiles().stream()
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -34,4 +35,14 @@ public interface CellSink {
* @param cell the cell to be added
*/
void append(Cell cell) throws IOException;

/**
* Append the given (possibly partial) list of cells of a row
* @param cellList the cell list to be added
*/
default void appendAll(List<Cell> cellList) throws IOException {
for (Cell cell : cellList) {
append(cell);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
DateTieredCompactionPolicy, DateTieredCompactor, DefaultStoreFileManager> {
@Override
public boolean needsCompaction(List<HStoreFile> filesCompacting) {
return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(), filesCompacting);
return compactionPolicy.needsCompaction(storeFileManager.getStoreFiles(), filesCompacting);
}

@Override
Expand All @@ -65,14 +65,14 @@ private final class DateTieredCompactionContext extends CompactionContext {

@Override
public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStorefiles(),
return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStoreFiles(),
filesCompacting);
}

@Override
public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
boolean mayUseOffPeak, boolean forceMajor) throws IOException {
request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting,
request = compactionPolicy.selectCompaction(storeFileManager.getStoreFiles(), filesCompacting,
isUserCompaction, mayUseOffPeak, forceMajor);
return request != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class DefaultStoreEngine extends StoreEngine<DefaultStoreFlusher, RatioBa

@Override
public boolean needsCompaction(List<HStoreFile> filesCompacting) {
return compactionPolicy.needsCompaction(this.storeFileManager.getStorefiles(), filesCompacting);
return compactionPolicy.needsCompaction(this.storeFileManager.getStoreFiles(), filesCompacting);
}

@Override
Expand Down Expand Up @@ -111,7 +111,7 @@ private class DefaultCompactionContext extends CompactionContext {
@Override
public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
boolean mayUseOffPeak, boolean forceMajor) throws IOException {
request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting,
request = compactionPolicy.selectCompaction(storeFileManager.getStoreFiles(), filesCompacting,
isUserCompaction, mayUseOffPeak, forceMajor);
return request != null;
}
Expand All @@ -124,7 +124,7 @@ public List<Path> compact(ThroughputController throughputController, User user)

@Override
public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStorefiles(),
return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStoreFiles(),
filesCompacting);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hbase.regionserver;

import static org.apache.hadoop.hbase.regionserver.StoreFileWriter.shouldEnableHistoricalCompactionFiles;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -48,36 +52,71 @@ class DefaultStoreFileManager implements StoreFileManager {
private final CompactionConfiguration comConf;
private final int blockingFileCount;
private final Comparator<HStoreFile> storeFileComparator;
/**
* List of store files inside this store. This is an immutable list that is atomically replaced
* when its contents change.
*/
private volatile ImmutableList<HStoreFile> storefiles = ImmutableList.of();

static class StoreFileList {
/**
* List of store files inside this store. This is an immutable list that is atomically replaced
* when its contents change.
*/
final ImmutableList<HStoreFile> all;
/**
* List of store files that include the latest cells inside this store. This is an immutable
* list that is atomically replaced when its contents change.
*/
@Nullable
final ImmutableList<HStoreFile> live;

StoreFileList(ImmutableList<HStoreFile> storeFiles, ImmutableList<HStoreFile> liveStoreFiles) {
this.all = storeFiles;
this.live = liveStoreFiles;
}
}

private volatile StoreFileList storeFiles;

/**
* List of compacted files inside this store that needs to be excluded in reads because further
* new reads will be using only the newly created files out of compaction. These compacted files
* will be deleted/cleared once all the existing readers on these compacted files are done.
*/
private volatile ImmutableList<HStoreFile> compactedfiles = ImmutableList.of();
private final boolean enableLiveFileTracking;

public DefaultStoreFileManager(CellComparator cellComparator,
Comparator<HStoreFile> storeFileComparator, Configuration conf,
CompactionConfiguration comConf) {
this.cellComparator = cellComparator;
this.storeFileComparator = storeFileComparator;
this.comConf = comConf;
this.blockingFileCount =
blockingFileCount =
conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
enableLiveFileTracking = shouldEnableHistoricalCompactionFiles(conf);
storeFiles =
new StoreFileList(ImmutableList.of(), enableLiveFileTracking ? ImmutableList.of() : null);
}

private List<HStoreFile> getLiveFiles(Collection<HStoreFile> storeFiles) throws IOException {
List<HStoreFile> liveFiles = new ArrayList<>(storeFiles.size());
for (HStoreFile file : storeFiles) {
file.initReader();
if (!file.isHistorical()) {
liveFiles.add(file);
}
}
return liveFiles;
}

@Override
public void loadFiles(List<HStoreFile> storeFiles) {
this.storefiles = ImmutableList.sortedCopyOf(storeFileComparator, storeFiles);
public void loadFiles(List<HStoreFile> storeFiles) throws IOException {
this.storeFiles = new StoreFileList(ImmutableList.sortedCopyOf(storeFileComparator, storeFiles),
enableLiveFileTracking
? ImmutableList.sortedCopyOf(storeFileComparator, getLiveFiles(storeFiles))
: null);
}

@Override
public final Collection<HStoreFile> getStorefiles() {
return storefiles;
public final Collection<HStoreFile> getStoreFiles() {
return storeFiles.all;
}

@Override
Expand All @@ -86,15 +125,20 @@ public Collection<HStoreFile> getCompactedfiles() {
}

@Override
public void insertNewFiles(Collection<HStoreFile> sfs) {
this.storefiles =
ImmutableList.sortedCopyOf(storeFileComparator, Iterables.concat(this.storefiles, sfs));
public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException {
storeFiles = new StoreFileList(
ImmutableList.sortedCopyOf(storeFileComparator, Iterables.concat(storeFiles.all, sfs)),
enableLiveFileTracking
? ImmutableList.sortedCopyOf(storeFileComparator,
Iterables.concat(storeFiles.live, getLiveFiles(sfs)))
: null);
}

@Override
public ImmutableCollection<HStoreFile> clearFiles() {
ImmutableList<HStoreFile> result = storefiles;
storefiles = ImmutableList.of();
ImmutableList<HStoreFile> result = storeFiles.all;
storeFiles =
new StoreFileList(ImmutableList.of(), enableLiveFileTracking ? ImmutableList.of() : null);
return result;
}

Expand All @@ -107,7 +151,7 @@ public Collection<HStoreFile> clearCompactedFiles() {

@Override
public final int getStorefileCount() {
return storefiles.size();
return storeFiles.all.size();
}

@Override
Expand All @@ -117,28 +161,38 @@ public final int getCompactedFilesCount() {

@Override
public void addCompactionResults(Collection<HStoreFile> newCompactedfiles,
Collection<HStoreFile> results) {
this.storefiles = ImmutableList.sortedCopyOf(storeFileComparator, Iterables
.concat(Iterables.filter(storefiles, sf -> !newCompactedfiles.contains(sf)), results));
Collection<HStoreFile> results) throws IOException {
ImmutableList<HStoreFile> liveStoreFiles = null;
if (enableLiveFileTracking) {
liveStoreFiles = ImmutableList.sortedCopyOf(storeFileComparator,
Iterables.concat(Iterables.filter(storeFiles.live, sf -> !newCompactedfiles.contains(sf)),
getLiveFiles(results)));
}
storeFiles =
new StoreFileList(
ImmutableList
.sortedCopyOf(storeFileComparator,
Iterables.concat(
Iterables.filter(storeFiles.all, sf -> !newCompactedfiles.contains(sf)), results)),
liveStoreFiles);
// Mark the files as compactedAway once the storefiles and compactedfiles list is finalized
// Let a background thread close the actual reader on these compacted files and also
// ensure to evict the blocks from block cache so that they are no longer in
// cache
newCompactedfiles.forEach(HStoreFile::markCompactedAway);
this.compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator,
Iterables.concat(this.compactedfiles, newCompactedfiles));
compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator,
Iterables.concat(compactedfiles, newCompactedfiles));
}

@Override
public void removeCompactedFiles(Collection<HStoreFile> removedCompactedfiles) {
this.compactedfiles =
this.compactedfiles.stream().filter(sf -> !removedCompactedfiles.contains(sf))
.sorted(storeFileComparator).collect(ImmutableList.toImmutableList());
compactedfiles = compactedfiles.stream().filter(sf -> !removedCompactedfiles.contains(sf))
.sorted(storeFileComparator).collect(ImmutableList.toImmutableList());
}

@Override
public final Iterator<HStoreFile> getCandidateFilesForRowKeyBefore(KeyValue targetKey) {
return this.storefiles.reverse().iterator();
return storeFiles.all.reverse().iterator();
}

@Override
Expand All @@ -153,25 +207,28 @@ public Iterator<HStoreFile> updateCandidateFilesForRowKeyBefore(

@Override
public final Optional<byte[]> getSplitPoint() throws IOException {
return StoreUtils.getSplitPoint(storefiles, cellComparator);
return StoreUtils.getSplitPoint(storeFiles.all, cellComparator);
}

@Override
public final Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow,
byte[] stopRow, boolean includeStopRow) {
public Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow,
byte[] stopRow, boolean includeStopRow, boolean onlyLatestVersion) {
if (onlyLatestVersion && enableLiveFileTracking) {
return storeFiles.live;
}
// We cannot provide any useful input and already have the files sorted by seqNum.
return getStorefiles();
return getStoreFiles();
}

@Override
public int getStoreCompactionPriority() {
int priority = blockingFileCount - storefiles.size();
int priority = blockingFileCount - storeFiles.all.size();
return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority;
}

@Override
public Collection<HStoreFile> getUnneededFiles(long maxTs, List<HStoreFile> filesCompacting) {
ImmutableList<HStoreFile> files = storefiles;
ImmutableList<HStoreFile> files = storeFiles.all;
// 1) We can never get rid of the last file which has the maximum seqid.
// 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
return files.stream().limit(Math.max(0, files.size() - 1)).filter(sf -> {
Expand Down

0 comments on commit 00f078a

Please sign in to comment.