Skip to content

Commit

Permalink
fixes user compaction stuck when producing no output (#3013)
Browse files Browse the repository at this point in the history
This commit fixes a bug where :

 * user compactions take multiple compaction steps (because the tablet has many files)
 * the intermediate steps produce no output

When the above happened CompactableImpl and Tablet would disagree about what files there were.
The Tablet code would ignore the empty file produced by an intermediate compaction.
CompactableImpl would expect Tablet to know of this file. When this happened things would hang
until a tserver was restarted.

Ran into this bug while continually running Bulk random walk test to reproduce #2667
  • Loading branch information
keith-turner committed Oct 12, 2022
1 parent da50454 commit 2c37144
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ private static TServerInstance getTServerInstance(String address, ServiceLock zo
}

public static void replaceDatafiles(ServerContext context, KeyExtent extent,
Set<StoredTabletFile> datafilesToDelete, Set<StoredTabletFile> scanFiles, TabletFile path,
Long compactionId, DataFileValue size, String address, TServerInstance lastLocation,
ServiceLock zooLock, Optional<ExternalCompactionId> ecid) {
Set<StoredTabletFile> datafilesToDelete, Set<StoredTabletFile> scanFiles,
Optional<StoredTabletFile> path, Long compactionId, DataFileValue size, String address,
TServerInstance lastLocation, ServiceLock zooLock, Optional<ExternalCompactionId> ecid) {

context.getAmple().putGcCandidates(extent.tableId(), datafilesToDelete);

Expand All @@ -193,8 +193,8 @@ public static void replaceDatafiles(ServerContext context, KeyExtent extent,
datafilesToDelete.forEach(tablet::deleteFile);
scanFiles.forEach(tablet::putScan);

if (size.getNumEntries() > 0)
tablet.putFile(path, size);
if (path.isPresent())
tablet.putFile(path.get(), size);

if (compactionId != null)
tablet.putCompactionId(compactionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,8 @@ private Set<StoredTabletFile> handleUserSelectorCompaction(Set<StoredTabletFile>
Set<StoredTabletFile> candidates = Sets.difference(selectedFiles, allCompactingFiles);
// verify that candidates are still around and fail quietly if not
if (!currFiles.containsAll(candidates)) {
log.debug("Selected files not in all files {} {}", candidates, currFiles);
log.debug("Selected files not in all files {} {} {}",
Sets.difference(candidates, currFiles), candidates, currFiles);
return Set.of();
}
// must create a copy because the sets passed to Sets.difference could change after this
Expand Down Expand Up @@ -575,21 +576,21 @@ private KeyExtent getExtent() {
* @param newFile
* The file produced by a compaction. If the compaction failed, this can be null.
*/
void completed(CompactionJob job, Set<StoredTabletFile> jobFiles, StoredTabletFile newFile) {
void completed(CompactionJob job, Set<StoredTabletFile> jobFiles,
Optional<StoredTabletFile> newFile) {
Preconditions.checkArgument(!jobFiles.isEmpty());
Preconditions.checkState(allCompactingFiles.removeAll(jobFiles));
if (newFile != null) {
choppedFiles.add(newFile);
if (newFile.isPresent()) {
choppedFiles.add(newFile.get());
}

if ((job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR)
&& newFile != null) {
if ((job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR)) {
selectedCompactionCompleted(job, jobFiles, newFile);
}
}

private void selectedCompactionCompleted(CompactionJob job, Set<StoredTabletFile> jobFiles,
StoredTabletFile newFile) {
Optional<StoredTabletFile> newFile) {
Preconditions.checkArgument(
job.getKind() == CompactionKind.USER || job.getKind() == CompactionKind.SELECTOR);
Preconditions.checkState(selectedFiles.containsAll(jobFiles));
Expand All @@ -603,9 +604,11 @@ private void selectedCompactionCompleted(CompactionJob job, Set<StoredTabletFile
selectStatus = FileSelectionStatus.NOT_ACTIVE;
log.trace("Selected compaction status changed {} {}", getExtent(), selectStatus);
} else if (selectStatus == FileSelectionStatus.RESERVED) {
selectedFiles.add(newFile);
if (newFile.isPresent()) {
selectedFiles.add(newFile.get());
}
log.trace("Compacted subset of selected files {} {} -> {}", getExtent(),
asFileNames(jobFiles), newFile.getFileName());
asFileNames(jobFiles), newFile.orElse(null));
} else {
log.debug("Canceled selected compaction completed {} but others still running ",
getExtent());
Expand Down Expand Up @@ -1226,18 +1229,20 @@ private Optional<CompactionInfo> reserveFilesForCompaction(CompactionServiceId s
// check is done after the file are exclusively reserved in this class to avoid race conditions.
if (!tablet.getDatafiles().keySet().containsAll(cInfo.jobFiles)) {
// The tablet does not know of all these files, so unreserve them.
completeCompaction(job, cInfo.jobFiles, null);
completeCompaction(job, cInfo.jobFiles, Optional.empty(), true);
return Optional.empty();
}

return Optional.of(cInfo);
}

private void completeCompaction(CompactionJob job, Set<StoredTabletFile> jobFiles,
StoredTabletFile metaFile) {
Optional<StoredTabletFile> metaFile, boolean successful) {
synchronized (this) {
Preconditions.checkState(removeJob(job));
fileMgr.completed(job, jobFiles, metaFile);
if (successful) {
fileMgr.completed(job, jobFiles, metaFile);
}

if (!compactionRunning) {
notifyAll();
Expand All @@ -1257,11 +1262,14 @@ public void compact(CompactionServiceId service, CompactionJob job, RateLimiter
return;

var cInfo = ocInfo.get();
StoredTabletFile newFile = null;
Optional<StoredTabletFile> newFile = Optional.empty();
long startTime = System.currentTimeMillis();
CompactionKind kind = job.getKind();

CompactionStats stats = new CompactionStats();

boolean successful = false;

try {
TabletLogger.compacting(getExtent(), job, cInfo.localCompactionCfg);
tablet.incrementStatusMajor();
Expand All @@ -1278,14 +1286,16 @@ public void compact(CompactionServiceId service, CompactionJob job, RateLimiter
newFile = CompactableUtils.bringOnline(tablet.getDatafileManager(), cInfo, stats,
compactFiles, allFiles, kind, tmpFileName);

TabletLogger.compacted(getExtent(), job, newFile);
TabletLogger.compacted(getExtent(), job, newFile.orElse(null));

successful = true;
} catch (CompactionCanceledException cce) {
log.debug("Compaction canceled {} ", getExtent());
} catch (Exception e) {
newFile = null;
newFile = Optional.empty();
throw new RuntimeException(e);
} finally {
completeCompaction(job, cInfo.jobFiles, newFile);
completeCompaction(job, cInfo.jobFiles, newFile, successful);
tablet.updateTimer(MAJOR, queuedTime, startTime, stats.getEntriesRead(), newFile == null);
}
}
Expand Down Expand Up @@ -1334,7 +1344,7 @@ public ExternalCompactionJob reserveExternalCompaction(CompactionServiceId servi

} catch (Exception e) {
externalCompactions.remove(externalCompactionId);
completeCompaction(job, cInfo.jobFiles, null);
completeCompaction(job, cInfo.jobFiles, Optional.empty(), false);
throw new RuntimeException(e);
}
}
Expand All @@ -1357,22 +1367,25 @@ public void commitExternalCompaction(ExternalCompactionId extCompactionId, long

ExternalCompactionInfo ecInfo = externalCompactions.get(extCompactionId);

boolean successful = false;

if (ecInfo != null) {
log.debug("Attempting to commit external compaction {}", extCompactionId);
StoredTabletFile metaFile = null;
Optional<StoredTabletFile> metaFile = Optional.empty();
try {
metaFile =
tablet.getDatafileManager().bringMajorCompactionOnline(ecInfo.meta.getJobFiles(),
ecInfo.meta.getCompactTmpName(), ecInfo.meta.getCompactionId(),
Sets.union(ecInfo.meta.getJobFiles(), ecInfo.meta.getNextFiles()),
new DataFileValue(fileSize, entries), Optional.of(extCompactionId));
TabletLogger.compacted(getExtent(), ecInfo.job, metaFile);
TabletLogger.compacted(getExtent(), ecInfo.job, metaFile.orElse(null));
successful = true;
} catch (Exception e) {
metaFile = null;
metaFile = Optional.empty();
log.error("Error committing external compaction {}", extCompactionId, e);
throw new RuntimeException(e);
} finally {
completeCompaction(ecInfo.job, ecInfo.meta.getJobFiles(), metaFile);
completeCompaction(ecInfo.job, ecInfo.meta.getJobFiles(), metaFile, successful);
externalCompactions.remove(extCompactionId);
log.debug("Completed commit of external compaction {}", extCompactionId);
}
Expand Down Expand Up @@ -1408,7 +1421,7 @@ public void externalCompactionFailed(ExternalCompactionId ecid) {
if (ecInfo != null) {
tablet.getContext().getAmple().mutateTablet(getExtent()).deleteExternalCompaction(ecid)
.mutate();
completeCompaction(ecInfo.job, ecInfo.meta.getJobFiles(), null);
completeCompaction(ecInfo.job, ecInfo.meta.getJobFiles(), Optional.empty(), false);
externalCompactions.remove(ecid);
log.debug("Processed external compaction failure {}", ecid);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ static CompactionStats compact(Tablet tablet, CompactionJob job,
/**
* Finish major compaction by bringing the new file online and returning the completed file.
*/
static StoredTabletFile bringOnline(DatafileManager datafileManager,
static Optional<StoredTabletFile> bringOnline(DatafileManager datafileManager,
CompactableImpl.CompactionInfo cInfo, CompactionStats stats,
Map<StoredTabletFile,DataFileValue> compactFiles,
SortedMap<StoredTabletFile,DataFileValue> allFiles, CompactionKind kind,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ Optional<StoredTabletFile> bringMinorCompactionOnline(TabletFile tmpDatafile,
return newFile;
}

StoredTabletFile bringMajorCompactionOnline(Set<StoredTabletFile> oldDatafiles,
Optional<StoredTabletFile> bringMajorCompactionOnline(Set<StoredTabletFile> oldDatafiles,
TabletFile tmpDatafile, Long compactionId, Set<StoredTabletFile> selectedFiles,
DataFileValue dfv, Optional<ExternalCompactionId> ecid) throws IOException {
final KeyExtent extent = tablet.getExtent();
Expand All @@ -449,9 +449,14 @@ StoredTabletFile bringMajorCompactionOnline(Set<StoredTabletFile> oldDatafiles,
}

TServerInstance lastLocation = null;
// calling insert to get the new file before inserting into the metadata
StoredTabletFile newFile = newDatafile.insert();
Optional<StoredTabletFile> newFile;

if (dfv.getNumEntries() > 0) {
// calling insert to get the new file before inserting into the metadata
newFile = Optional.of(newDatafile.insert());
} else {
newFile = Optional.empty();
}
Long compactionIdToWrite = null;

// increment start count before metadata update AND updating in memory map of files
Expand All @@ -465,8 +470,8 @@ StoredTabletFile bringMajorCompactionOnline(Set<StoredTabletFile> oldDatafiles,
Preconditions.checkState(datafileSizes.keySet().containsAll(oldDatafiles),
"Compacted files %s are not a subset of tablet files %s", oldDatafiles,
datafileSizes.keySet());
if (dfv.getNumEntries() > 0) {
Preconditions.checkState(!datafileSizes.containsKey(newFile),
if (newFile.isPresent()) {
Preconditions.checkState(!datafileSizes.containsKey(newFile.get()),
"New compaction file %s already exist in tablet files %s", newFile,
datafileSizes.keySet());
}
Expand All @@ -475,8 +480,8 @@ StoredTabletFile bringMajorCompactionOnline(Set<StoredTabletFile> oldDatafiles,

datafileSizes.keySet().removeAll(oldDatafiles);

if (dfv.getNumEntries() > 0) {
datafileSizes.put(newFile, dfv);
if (newFile.isPresent()) {
datafileSizes.put(newFile.get(), dfv);
// could be used by a follow on compaction in a multipass compaction
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ boolean reserveFiles(TestCompactionJob job) {
}

void completed(TestCompactionJob job, StoredTabletFile newFile) {
super.completed(job, job.getSTFiles(), newFile);
super.completed(job, job.getSTFiles(), Optional.ofNullable(newFile));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.user.GrepIterator;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
Expand Down Expand Up @@ -494,6 +495,56 @@ public void testSuccessfulCompaction() throws Exception {
}
}

@Test
public void testMultiStepCompactionThatDeletesAll() throws Exception {

// There was a bug where user compactions would never complete when : the tablet had to be
// compacted in multiple passes AND the intermediate passes produced no output.

try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
final String tableName = getUniqueNames(1)[0];
c.tableOperations().create(tableName);
c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "100.0");

var beforeCount = countFiles(c);

final int NUM_ENTRIES_AND_FILES = 60;

try (var writer = c.createBatchWriter(tableName)) {
for (int i = 0; i < NUM_ENTRIES_AND_FILES; i++) {
Mutation m = new Mutation("r" + i);
m.put("f1", "q1", "v" + i);
writer.addMutation(m);
writer.flush();
c.tableOperations().flush(tableName, null, null, true);
}
}

try (var scanner = c.createScanner(tableName)) {
assertEquals(NUM_ENTRIES_AND_FILES, scanner.stream().count());
}

var afterCount = countFiles(c);

assertTrue(afterCount >= beforeCount + NUM_ENTRIES_AND_FILES);

CompactionConfig comactionConfig = new CompactionConfig();
// configure an iterator that drops all data
IteratorSetting iter = new IteratorSetting(100, GrepIterator.class);
GrepIterator.setTerm(iter, "keep");
comactionConfig.setIterators(List.of(iter));
comactionConfig.setWait(true);
c.tableOperations().compact(tableName, comactionConfig);

try (var scanner = c.createScanner(tableName)) {
assertEquals(0, scanner.stream().count());
}

var finalCount = countFiles(c);
assertTrue(finalCount <= beforeCount);
}
}

private int countFiles(AccumuloClient c) throws Exception {
try (Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
s.fetchColumnFamily(new Text(TabletColumnFamily.NAME));
Expand Down

0 comments on commit 2c37144

Please sign in to comment.