Skip to content
Permalink
Browse files
HBASE-26938 Compaction failures after StoreFileTracker integration (#…
…4350)

Introduce a StoreFileWriterCreationTracker to track the store files being written

Signed-off-by: Josh Elser <elserj@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
  • Loading branch information
Apache9 committed Apr 17, 2022
1 parent 267b2ed commit 48c4a4626e54724beb4ed46becbedbe292841dc4
Showing 27 changed files with 400 additions and 250 deletions.
@@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -38,6 +39,7 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
@@ -51,6 +53,7 @@
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
@@ -146,10 +149,14 @@ public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> s
@Override
public StoreFileWriter createWriter(InternalScanner scanner,
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
boolean shouldDropBehind, boolean major) throws IOException {
boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker)
throws IOException {
// make this writer with tags always because of possible new cells with tags.
return store.getStoreEngine().createWriter(
createParams(fd, shouldDropBehind, major).includeMVCCReadpoint(true).includesTag(true));
return store.getStoreEngine()
.createWriter(
createParams(fd, shouldDropBehind, major, writerCreationTracker)
.includeMVCCReadpoint(true)
.includesTag(true));
}
};

@@ -285,17 +292,19 @@ private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throw
* </ol>
* @param fd File details
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @param throughputController The compaction throughput controller.
* @param major Is a major compaction.
* @param numofFilesToCompact the number of files to compact
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for any reason.
*/
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
boolean major, int numofFilesToCompact, CompactionProgress progress) throws IOException {
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Clear old mob references
@@ -661,9 +670,8 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId
}
}


@Override
protected List<Path> commitWriter(FileDetails fd,
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
@@ -25,6 +25,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -112,7 +113,7 @@ public DefaultMobStoreFlusher(Configuration conf, HStore store) throws IOExcepti
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
FlushLifeCycleTracker tracker, Consumer<Path> writerCreationTracker) throws IOException {
ArrayList<Path> result = new ArrayList<>();
long cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
@@ -126,7 +127,7 @@ public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
writer = createWriter(snapshot, true);
writer = createWriter(snapshot, true, writerCreationTracker);
IOException e = null;
try {
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
@@ -67,7 +67,7 @@ public void init(StoreScanner sourceScanner, WriterFactory factory) {
* comments in HBASE-15400 for more details.
*/
public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException {
return commitWriters(maxSeqId, majorCompaction, Collections.EMPTY_SET);
return commitWriters(maxSeqId, majorCompaction, Collections.emptyList());
}

public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
@@ -110,11 +110,7 @@ public List<Path> abortWriters() {
return paths;
}

/**
* Returns all writers. This is used to prevent deleting currently writen storefiles
* during cleanup.
*/
public abstract Collection<StoreFileWriter> writers();
protected abstract Collection<StoreFileWriter> writers();

/**
* Subclasses override this method to be called at the end of a successful sequence of append; all
@@ -152,7 +152,7 @@ private void cleanFileIfNeeded(FileStatus file, HStore store,
}

private boolean isCompactionResultFile(FileStatus file, HStore store) {
return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath());
return store.getStoreFilesBeingWritten().contains(file.getPath());
}

// Compacted files can still have readers and are cleaned by a separate chore, so they have to
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver;

import java.util.function.Consumer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.yetus.audience.InterfaceAudience;
@@ -40,6 +42,8 @@ public final class CreateStoreFileWriterParams {

private String fileStoragePolicy = HConstants.EMPTY_STRING;

private Consumer<Path> writerCreationTracker;

private CreateStoreFileWriterParams() {
}

@@ -127,8 +131,16 @@ public CreateStoreFileWriterParams fileStoragePolicy(String fileStoragePolicy) {
return this;
}

public Consumer<Path> writerCreationTracker() {
return writerCreationTracker;
}

public CreateStoreFileWriterParams writerCreationTracker(Consumer<Path> writerCreationTracker) {
this.writerCreationTracker = writerCreationTracker;
return this;
}

public static CreateStoreFileWriterParams create() {
return new CreateStoreFileWriterParams();
}

}
@@ -71,7 +71,7 @@ public void append(Cell cell) throws IOException {
}

@Override
public Collection<StoreFileWriter> writers() {
protected Collection<StoreFileWriter> writers() {
return lowerBoundary2Writer.values();
}

@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -44,8 +45,8 @@ public DefaultStoreFlusher(Configuration conf, HStore store) {

@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
Consumer<Path> writerCreationTracker) throws IOException {
ArrayList<Path> result = new ArrayList<>();
int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
@@ -59,7 +60,7 @@ public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
writer = createWriter(snapshot, false);
writer = createWriter(snapshot, false, writerCreationTracker);
IOException e = null;
try {
performFlush(scanner, writer, throughputController);

0 comments on commit 48c4a46

Please sign in to comment.