Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public ExternalCompactionMetadata(Set<StoredTabletFile> jobFiles, Set<StoredTabl
TabletFile compactTmpName, String compactorId, CompactionKind kind, short priority,
CompactionExecutorId ceid, boolean propagateDeletes, boolean initiallySelectedAll,
Long compactionId) {
if (!initiallySelectedAll && !propagateDeletes
&& (kind == CompactionKind.SELECTOR || kind == CompactionKind.USER)) {
throw new IllegalArgumentException(
"When user or selector compactions do not propagate deletes, it's expected that all "
+ "files were selected initially.");
}
this.jobFiles = Objects.requireNonNull(jobFiles);
this.nextFiles = Objects.requireNonNull(nextFiles);
this.compactTmpName = Objects.requireNonNull(compactTmpName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,9 +602,30 @@ public CompactableImpl(Tablet tablet, CompactionManager manager,

Map<ExternalCompactionId,String> extCompactionsToRemove = new HashMap<>();

var extSelInfo = initExternalSelection(extCompactions, tablet, extCompactionsToRemove);
// Memoize the supplier so it only calls tablet.getCompactionID() once, because the impl goes to
// zookeeper. It's a supplier because it may not be needed.
Supplier<Optional<Pair<Long,CompactionConfig>>> tabletCompactionId = Suppliers.memoize(() -> {
try {
return Optional.of(tablet.getCompactionID());
} catch (NoNodeException nne) {
return Optional.empty();
}
});

verifyExternalCompactions(extCompactions, dataFileSizes.keySet(), extCompactionsToRemove);
var extSelInfo =
processExternalMetadata(extCompactions, () -> tabletCompactionId.get().map(Pair::getFirst),
dataFileSizes.keySet(), extCompactionsToRemove);

if (extSelInfo.isPresent()) {
if (extSelInfo.get().selectKind == CompactionKind.USER) {
this.chelper = CompactableUtils.getHelper(extSelInfo.get().selectKind, tablet,
tabletCompactionId.get().get().getFirst(), tabletCompactionId.get().get().getSecond());
this.compactionConfig = tabletCompactionId.get().get().getSecond();
this.compactionId = tabletCompactionId.get().get().getFirst();
} else if (extSelInfo.get().selectKind == CompactionKind.SELECTOR) {
this.chelper = CompactableUtils.getHelper(extSelInfo.get().selectKind, tablet, null, null);
}
}

extCompactionsToRemove.forEach((ecid, reason) -> {
log.warn("Removing external compaction {} for {} because {} meta: {}", ecid,
Expand Down Expand Up @@ -669,30 +690,6 @@ protected long getNanoTime() {
};
}

private void verifyExternalCompactions(
Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions,
Set<StoredTabletFile> tabletFiles, Map<ExternalCompactionId,String> extCompactionsToRemove) {

Set<StoredTabletFile> seen = new HashSet<>();
boolean overlap = false;

for (var entry : extCompactions.entrySet()) {
ExternalCompactionMetadata ecMeta = entry.getValue();
if (!tabletFiles.containsAll(ecMeta.getJobFiles())) {
extCompactionsToRemove.putIfAbsent(entry.getKey(), "Has files outside of tablet files");
} else if (!Collections.disjoint(seen, ecMeta.getJobFiles())) {
overlap = true;
}
seen.addAll(ecMeta.getJobFiles());
}

if (overlap) {
extCompactions.keySet().forEach(ecid -> {
extCompactionsToRemove.putIfAbsent(ecid, "Some external compaction files overlap");
});
}
}

private synchronized boolean addJob(CompactionJob job) {
if (runningJobs.add(job)) {
compactionRunning = true;
Expand Down Expand Up @@ -815,16 +812,47 @@ private void checkIfUserCompactionCanceled() {
}

/**
* For user compactions a set of files is selected. Those files then get compacted by one or more
* compactions until the set is empty. This method attempts to reconstruct the selected set of
* files when a tablet is loaded with an external user compaction. It avoids repeating work and
* when a user compaction completes, files are verified against the selected set. Since the data
* is coming from persisted storage, lots of checks are done in this method rather than assuming
* the persisted data is correct.
* This method validates metadata about external compactions. It also extracts specific
* information needed for user and selector compactions.
*/
private Optional<SelectedInfo> initExternalSelection(
Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions, Tablet tablet,
static Optional<SelectedInfo> processExternalMetadata(
Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions,
Supplier<Optional<Long>> tabletCompactionId, Set<StoredTabletFile> tabletFiles,
Map<ExternalCompactionId,String> externalCompactionsToRemove) {

// Check that external compactions have disjoint sets of files. Also check that each external
// compaction only has files inside a tablet.
Set<StoredTabletFile> seen = new HashSet<>();
boolean overlap = false;

for (var entry : extCompactions.entrySet()) {
ExternalCompactionMetadata ecMeta = entry.getValue();
if (!tabletFiles.containsAll(ecMeta.getJobFiles())) {
externalCompactionsToRemove.putIfAbsent(entry.getKey(),
"Has files outside of tablet files");
} else if (!Collections.disjoint(seen, ecMeta.getJobFiles())) {
overlap = true;
}
seen.addAll(ecMeta.getJobFiles());
}

if (overlap) {
extCompactions.keySet().forEach(ecid -> {
externalCompactionsToRemove.putIfAbsent(ecid, "Some external compaction files overlap");
});
return Optional.empty();
}

/*
* The rest of the code validates user compaction metadata and extracts needed information.
*
* For user compactions a set of files is selected. Those files then get compacted by one or
* more compactions until the set is empty. This method attempts to reconstruct the selected set
* of files when a tablet is loaded with an external user compaction. It avoids repeating work
* and when a user compaction completes, files are verified against the selected set. Since the
* data is coming from persisted storage, lots of checks are done in this method rather than
* assuming the persisted data is correct.
*/
CompactionKind extKind = null;
boolean unexpectedExternal = false;
Set<StoredTabletFile> tmpSelectedFiles = null;
Expand Down Expand Up @@ -902,17 +930,14 @@ private Optional<SelectedInfo> initExternalSelection(
reasons.add("Concurrent compactions not propagatingDeletes");
}

Pair<Long,CompactionConfig> idAndCfg = null;
if (extKind == CompactionKind.USER) {
try {
idAndCfg = tablet.getCompactionID();
if (!idAndCfg.getFirst().equals(cid)) {
unexpectedExternal = true;
reasons.add("Compaction id mismatch with zookeeper");
}
} catch (NoNodeException e) {
Optional<Long> compactionId = tabletCompactionId.get();
if (compactionId.isEmpty()) {
unexpectedExternal = true;
reasons.add("No compaction id in zookeeper");
} else if (!compactionId.get().equals(cid)) {
unexpectedExternal = true;
reasons.add("Compaction id mismatch with zookeeper");
}
}

Expand All @@ -925,17 +950,8 @@ private Optional<SelectedInfo> initExternalSelection(
return Optional.empty();
}

if (extKind != null) {
if (extKind == CompactionKind.USER) {
this.chelper = CompactableUtils.getHelper(extKind, tablet, cid, idAndCfg.getSecond());
this.compactionConfig = idAndCfg.getSecond();
this.compactionId = cid;
} else if (extKind == CompactionKind.SELECTOR) {
this.chelper = CompactableUtils.getHelper(extKind, tablet, null, null);
}

if (extKind != null)
return Optional.of(new SelectedInfo(initiallySelAll, tmpSelectedFiles, extKind));
}

return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,11 +448,11 @@ Set<StoredTabletFile> getCompactingFiles() {

}

private static StoredTabletFile newFile(String f) {
static StoredTabletFile newFile(String f) {
return new StoredTabletFile("hdfs://nn1/accumulo/tables/1/t-0001/" + f);
}

private static Set<StoredTabletFile> newFiles(String... strings) {
static Set<StoredTabletFile> newFiles(String... strings) {
return Arrays.asList(strings).stream().map(s -> newFile(s)).collect(Collectors.toSet());
}

Expand Down
Loading