Skip to content

Commit

Permalink
OAK-6005: Add record id of the compacted root to the GC journal
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1789457 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
mduerig committed Mar 30, 2017
1 parent e45ef1f commit af43913
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 92 deletions.
Expand Up @@ -80,7 +80,6 @@
import org.apache.jackrabbit.oak.segment.RecordId;
import org.apache.jackrabbit.oak.segment.Segment;
import org.apache.jackrabbit.oak.segment.SegmentId;
import org.apache.jackrabbit.oak.segment.SegmentIdTable;
import org.apache.jackrabbit.oak.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
import org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener;
Expand Down Expand Up @@ -386,7 +385,7 @@ public GCEstimation estimateCompactionGain() {
* @return {@code true} on success, {@code false} otherwise.
*/
public boolean compact() {
return garbageCollector.compact() > 0;
return garbageCollector.compact().isSuccess();
}

/**
Expand All @@ -397,7 +396,9 @@ public boolean compact() {
* skipping the reclaimed segments.
*/
public void cleanup() throws IOException {
garbageCollector.cleanup();
CompactionResult compactionResult = CompactionResult.skipped(
getGcGeneration(), garbageCollector.gcOptions);
fileReaper.add(garbageCollector.cleanup(compactionResult));
}

/**
Expand Down Expand Up @@ -741,14 +742,13 @@ synchronized void run() throws IOException {

if (sufficientEstimatedGain) {
if (!gcOptions.isPaused()) {
int gen = compact();
if (gen > 0) {
fileReaper.add(cleanupOldGenerations(gen));
CompactionResult compactionResult = compact();
if (compactionResult.isSuccess()) {
lastSuccessfullGC = System.currentTimeMillis();
} else if (gen < 0) {
} else {
gcListener.info("TarMK GC #{}: cleaning up after failed compaction", GC_COUNT);
fileReaper.add(cleanupGeneration(-gen));
}
fileReaper.add(cleanup(compactionResult));
} else {
gcListener.skipped("TarMK GC #{}: compaction paused", GC_COUNT);
}
Expand All @@ -771,17 +771,20 @@ synchronized GCEstimation estimateCompactionGain(Supplier<Boolean> stop) {
stats.getApproximateSize());
}

private int compactionAborted(int generation) {
@Nonnull
private CompactionResult compactionAborted(int generation) {
gcListener.compactionFailed(generation);
return -generation;
return CompactionResult.aborted(getGcGeneration(), generation);
}

private int compactionSucceeded(int generation) {
@Nonnull
private CompactionResult compactionSucceeded(int generation, @Nonnull RecordId compactedRootId) {
gcListener.compactionSucceeded(generation);
return generation;
return CompactionResult.succeeded(generation, gcOptions, compactedRootId);
}

synchronized int compact() {
@Nonnull
synchronized CompactionResult compact() {
final int newGeneration = getGcGeneration() + 1;
try {
Stopwatch watch = Stopwatch.createStarted();
Expand Down Expand Up @@ -873,7 +876,7 @@ synchronized int compact() {
writer.flush();
gcListener.info("TarMK GC #{}: compaction succeeded in {} ({} ms), after {} cycles",
GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles);
return compactionSucceeded(newGeneration);
return compactionSucceeded(newGeneration, after.getRecordId());
} else {
gcListener.info("TarMK GC #{}: compaction failed after {} ({} ms), and {} cycles",
GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles);
Expand Down Expand Up @@ -962,43 +965,13 @@ public RecordId apply(RecordId base) {
: null;
}

synchronized void cleanup() throws IOException {
fileReaper.add(cleanupOldGenerations(getGcGeneration()));
}

/**
* Cleanup segments that are from an old generation. That segments whose generation
* is {@code gcGeneration - SegmentGCOptions.getRetainedGenerations()} or older.
* @param gcGeneration
* Cleanup segments whose generation matches the {@link CompactionResult#reclaimer()} predicate.
* @return list of files to be removed
* @throws IOException
*/
private List<File> cleanupOldGenerations(int gcGeneration) throws IOException {
final int reclaimGeneration = gcGeneration - gcOptions.getRetainedGenerations();

Predicate<Integer> reclaimPredicate = new Predicate<Integer>() {
@Override
public boolean apply(Integer generation) {
return generation <= reclaimGeneration;
}
};
return cleanup(reclaimPredicate,
"gc-count=" + GC_COUNT +
",gc-status=success" +
",store-generation=" + gcGeneration +
",reclaim-predicate=(generation<=" + reclaimGeneration + ")");
}

/**
* Cleanup segments whose generation matches the {@code reclaimGeneration} predicate.
* @param reclaimGeneration
* @param gcInfo gc information to be passed to {@link SegmentIdTable#clearSegmentIdTables(Set, String)}
* @return list of files to be removed
* @throws IOException
*/
private List<File> cleanup(
@Nonnull Predicate<Integer> reclaimGeneration,
@Nonnull String gcInfo)
@Nonnull
private List<File> cleanup(@Nonnull CompactionResult compactionResult)
throws IOException {
Stopwatch watch = Stopwatch.createStarted();
Set<UUID> bulkRefs = newHashSet();
Expand Down Expand Up @@ -1032,7 +1005,7 @@ private List<File> cleanup(

Set<UUID> reclaim = newHashSet();
for (TarReader reader : cleaned.keySet()) {
reader.mark(bulkRefs, reclaim, reclaimGeneration);
reader.mark(bulkRefs, reclaim, compactionResult.reclaimer());
log.info("{}: size of bulk references/reclaim set {}/{}",
reader, bulkRefs.size(), reclaim.size());
if (shutdown) {
Expand Down Expand Up @@ -1077,7 +1050,7 @@ private List<File> cleanup(
} finally {
fileStoreLock.writeLock().unlock();
}
tracker.clearSegmentIdTables(reclaimed, gcInfo);
tracker.clearSegmentIdTables(reclaimed, compactionResult.gcInfo());

// Close old readers *after* setting readers to the new readers to avoid accessing
// a closed reader from readSegment()
Expand All @@ -1092,7 +1065,9 @@ private List<File> cleanup(
long finalSize = size();
long reclaimedSize = initialSize - afterCleanupSize;
stats.reclaimed(reclaimedSize);
gcJournal.persist(reclaimedSize, finalSize, getGcGeneration(), compactionMonitor.getCompactedNodes());
gcJournal.persist(reclaimedSize, finalSize, getGcGeneration(),
compactionMonitor.getCompactedNodes(),
compactionResult.getCompactedRootId().toString10());
gcListener.cleaned(reclaimedSize, finalSize);
gcListener.info("TarMK GC #{}: cleanup completed in {} ({} ms). Post cleanup size is {} ({} bytes)" +
" and space reclaimed {} ({} bytes).",
Expand Down Expand Up @@ -1123,26 +1098,6 @@ private void collectBulkReferences(Set<UUID> bulkRefs) {
}
}

/**
* Cleanup segments of the given generation {@code gcGeneration}.
* @param gcGeneration
* @return list of files to be removed
* @throws IOException
*/
private List<File> cleanupGeneration(final int gcGeneration) throws IOException {
Predicate<Integer> cleanupPredicate = new Predicate<Integer>() {
@Override
public boolean apply(Integer generation) {
return generation == gcGeneration;
}
};
return cleanup(cleanupPredicate,
"gc-count=" + GC_COUNT +
",gc-status=failed" +
",store-generation=" + (gcGeneration - 1) +
",reclaim-predicate=(generation==" + gcGeneration + ")");
}

/**
* Finds all external blob references that are currently accessible
* in this repository and adds them to the given collector. Useful
Expand Down Expand Up @@ -1222,4 +1177,147 @@ public Boolean get() {
}
}

/**
* Instances of this class represent the result from a compaction.
* Either {@link #succeeded(int, SegmentGCOptions, RecordId) succeeded},
* {@link #aborted(int, int) aborted} or {@link #skipped(int, SegmentGCOptions) skipped}.
*/
private abstract static class CompactionResult {
private final int currentGeneration;

protected CompactionResult(int currentGeneration) {
this.currentGeneration = currentGeneration;
}

/**
* Result of a succeeded compaction.
* @param newGeneration the generation successfully created by compaction
* @param gcOptions the current GC options used by compaction
* @param compactedRootId the record id of the root created by compaction
*/
static CompactionResult succeeded(
final int newGeneration,
@Nonnull final SegmentGCOptions gcOptions,
@Nonnull final RecordId compactedRootId) {
return new CompactionResult(newGeneration) {
int oldGeneration = newGeneration - gcOptions.getRetainedGenerations();

@Override
Predicate<Integer> reclaimer() {
return CompactionResult.newOldReclaimer(oldGeneration);
}

@Override
boolean isSuccess() {
return true;
}

@Override
RecordId getCompactedRootId() {
return compactedRootId;
}
};
}

/**
* Result of an aborted compaction.
* @param currentGeneration the current generation of the store
* @param failedGeneration the generation that compaction attempted to create
*/
static CompactionResult aborted(
int currentGeneration,
final int failedGeneration) {
return new CompactionResult(currentGeneration) {
@Override
Predicate<Integer> reclaimer() {
return CompactionResult.newFailedReclaimer(failedGeneration);
}

@Override
boolean isSuccess() {
return false;
}
};
}

/**
* Result serving as a placeholder for a compaction that was skipped.
* @param currentGeneration the current generation of the store
* @param gcOptions the current GC options used by compaction
*/
static CompactionResult skipped(
final int currentGeneration,
@Nonnull final SegmentGCOptions gcOptions) {
return new CompactionResult(currentGeneration) {
int oldGeneration = currentGeneration - gcOptions.getRetainedGenerations();
@Override
Predicate<Integer> reclaimer() {
return CompactionResult.newOldReclaimer(oldGeneration);
}

@Override
boolean isSuccess() {
return true;
}
};
}

/**
* @return a predicate determining which segments to
* {@link GarbageCollector#cleanup(CompactionResult) clean up} for
* the given compaction result.
*/
abstract Predicate<Integer> reclaimer();

/**
* @return {@code true} for {@link #succeeded(int, SegmentGCOptions, RecordId) succeeded}
* and {@link #skipped(int, SegmentGCOptions) skipped}, {@code false} otherwise.
*/
abstract boolean isSuccess();

/**
* @return the record id of the compacted root on {@link #isSuccess() success},
* {@link RecordId#NULL} otherwise.
*/
RecordId getCompactedRootId() {
return RecordId.NULL;
}

/**
* @return a diagnostic message describing the outcome of this compaction.
*/
String gcInfo() {
return "gc-count=" + GC_COUNT +
",gc-status=" + (isSuccess() ? "success" : "failed") +
",store-generation=" + currentGeneration +
",reclaim-predicate=" + reclaimer();
}

private static Predicate<Integer> newFailedReclaimer(final int failedGeneration) {
return new Predicate<Integer>() {
@Override
public boolean apply(Integer generation) {
return generation == failedGeneration;
}
@Override
public String toString() {
return "(generation==" + failedGeneration + ")";
}
};
}

private static Predicate<Integer> newOldReclaimer(final int oldGeneration) {
return new Predicate<Integer>() {
@Override
public boolean apply(Integer generation) {
return generation <= oldGeneration;
}
@Override
public String toString() {
return "(generation<=" + oldGeneration + ")";
}
};
}
}

}

0 comments on commit af43913

Please sign in to comment.