Skip to content
Permalink
Browse files
Merge pull request #536 from Ewocker/GRANITE-38710
OAK-9747 Download resume needs to save progress and handle hidden node on exception
  • Loading branch information
thomasmueller committed Apr 20, 2022
2 parents 13b66d0 + 5f0f173 commit 6ca4dd75064e34a87a4a8069d7e4af027155653e
Showing 9 changed files with 338 additions and 109 deletions.
@@ -19,16 +19,6 @@

package org.apache.jackrabbit.oak.index.indexer.document;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Stopwatch;
import com.google.common.io.Closer;
@@ -66,6 +56,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_SORTED_FILE_PATH;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
@@ -115,7 +115,8 @@ private MongoNodeStateEntryTraverserFactory(RevisionVector rootRevision, Documen
}

private MongoNodeStateEntryTraverserFactory(RevisionVector rootRevision, DocumentNodeStore documentNodeStore,
MongoDocumentStore documentStore, Logger traversalLogger, CompositeIndexer indexer, Predicate<String> pathPredicate) {
MongoDocumentStore documentStore, Logger traversalLogger, CompositeIndexer indexer,
Predicate<String> pathPredicate) {
this.rootRevision = rootRevision;
this.documentNodeStore = documentNodeStore;
this.documentStore = documentStore;
@@ -140,8 +141,7 @@ public NodeStateEntryTraverser create(MongoDocumentTraverser.TraversingRange tra
throw new RuntimeException(e);
}
traversalLogger.trace(id);
})
.withPathPredicate((pathPredicate != null) ? pathPredicate : indexer::shouldInclude);
});
}
}

@@ -19,12 +19,6 @@

package org.apache.jackrabbit.oak.index.indexer.document;

import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Predicate;

import com.google.common.collect.FluentIterable;
import com.google.common.io.Closer;
import org.apache.jackrabbit.oak.plugins.document.Collection;
@@ -36,10 +30,14 @@
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser;
import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
import org.jetbrains.annotations.NotNull;

import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Predicate;

import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.transform;
import static java.util.Collections.emptyList;
@@ -58,7 +56,6 @@ public class NodeStateEntryTraverser implements Iterable<NodeStateEntry>, Closea
private final TraversingRange traversingRange;

private Consumer<String> progressReporter = id -> {};
private Predicate<String> pathPredicate = path -> true;

private final String id;

@@ -92,11 +89,6 @@ public NodeStateEntryTraverser withProgressCallback(Consumer<String> progressRep
return this;
}

public NodeStateEntryTraverser withPathPredicate(Predicate<String> pathPredicate) {
this.pathPredicate = pathPredicate;
return this;
}

@Override
public void close() throws IOException {
closer.close();
@@ -110,10 +102,7 @@ private Iterable<NodeStateEntry> getIncludedDocs() {
}

private boolean includeDoc(NodeDocument doc) {
String path = doc.getPath().toString();
return !doc.isSplitDocument()
&& !NodeStateUtils.isHiddenPath(path)
&& pathPredicate.test(path);
return !doc.isSplitDocument();
}

/**
@@ -157,31 +146,12 @@ private Iterable<NodeDocument> getDocsFilteredByPath() {

private CloseableIterable<NodeDocument> findAllDocuments() {
return new MongoDocumentTraverser(documentStore)
.getAllDocuments(Collection.NODES, traversingRange, this::includeId);
.getAllDocuments(Collection.NODES, traversingRange, this::reportProgress);
}

private boolean includeId(String id) {
private boolean reportProgress(String id) {
progressReporter.accept(id);
//Cannot interpret long paths as they are hashed. So let them
//be included
if (Utils.isIdFromLongPath(id)){
return true;
}

//Not easy to determine path for previous docs
//Given there count is pretty low compared to others
//include them all so that they become part of cache
if (Utils.isPreviousDocId(id)){
return true;
}

String path = Utils.getPathFromId(id);

//Exclude hidden nodes from index data
if (NodeStateUtils.isHiddenPath(path)){
return false;
}

return pathPredicate.test(path);
// always returning true here and do the path predicate and hidden node filter when iterating.
return true;
}
}
@@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;

import com.google.common.collect.Iterables;
import org.apache.commons.io.FileUtils;
@@ -101,6 +102,7 @@ public class FlatFileNodeStoreBuilder {
private File flatFileStoreDir;
private final MemoryManager memoryManager;
private long dumpThreshold = DEFAULT_DUMP_THRESHOLD;
private Predicate<String> pathPredicate = path -> true;

private final boolean useZip = Boolean.parseBoolean(System.getProperty(OAK_INDEXER_USE_ZIP, "true"));
private final boolean useTraverseWithSort = Boolean.parseBoolean(System.getProperty(OAK_INDEXER_TRAVERSE_WITH_SORT, "true"));
@@ -165,6 +167,11 @@ public FlatFileNodeStoreBuilder withNodeStateEntryTraverserFactory(NodeStateEntr
return this;
}

public FlatFileNodeStoreBuilder withPathPredicate(Predicate<String> pathPredicate) {
this.pathPredicate = pathPredicate;
return this;
}

public FlatFileStore build() throws IOException, CompositeException {
logFlags();
comparator = new PathElementComparator(preferredPathElements);
@@ -210,7 +217,7 @@ SortStrategy createSortStrategy(File dir) throws IOException {
case MULTITHREADED_TRAVERSE_WITH_SORT:
log.info("Using MultithreadedTraverseWithSortStrategy");
return new MultithreadedTraverseWithSortStrategy(nodeStateEntryTraverserFactory, lastModifiedBreakPoints, comparator,
blobStore, dir, existingDataDumpDirs, useZip, memoryManager, dumpThreshold);
blobStore, dir, existingDataDumpDirs, useZip, memoryManager, dumpThreshold, pathPredicate);
}
throw new IllegalStateException("Not a valid sort strategy value " + sortStrategyType);
}
@@ -26,6 +26,7 @@
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
@@ -39,6 +40,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static com.google.common.base.Charsets.UTF_8;
@@ -100,6 +102,20 @@
* </ol>
* </li>
* </ol>
*
* <h3>Force Stop Explanation -</h3>
* <ol>
* <li>On receiving MERGE_FORCE_STOP_POISON_PILL in SORTED_FILE_QUEUE from parent thread, all un-started tasks will be skipped.</li>
* <li>No more tasks will be created on MERGE_FORCE_STOP_POISON_PILL message is received.</li>
* <li>Running merge will continue to finish.</li>
* <li>Final merge will not be performed.</li>
* <li>This will not result in lose of data under conditions that -</li>
* <ol>
* <li>Files will not be removed until merge task is completed. Parent thread should add unmerged files to the SORTED_FILE_QUEUE on retry.</li>
* <li>Merged files will still be under the merge folder. Parent thread should add those files to the SORTED_FILE_QUEUE on retry.</li>
* </ol>
* </li>
* </ol>
*/
public class MergeRunner implements Runnable {
private static final Logger log = LoggerFactory.getLogger(MergeRunner.class);
@@ -136,10 +152,18 @@ public class MergeRunner implements Runnable {
private final Phaser phaser;

/**
* This poison pill is added to {@link #sortedFiles} to indicate that download phase has completed.
* This poison pill is added to {@link #sortedFiles} to indicate that download phase has completed successfully and
* merge should advance to the final merge stage.
*/
public static final File MERGE_POISON_PILL = new File("");

/**
* This poison pill is added to {@link #sortedFiles} to indicate that download phase has failed and
* merge should shut down immediately and not advance to the final merge stage.
*/
public static final File MERGE_FORCE_STOP_POISON_PILL = new File("merge-force-stop-poison-pill");
private final AtomicBoolean mergeCancelled;

/**
* Constructor.
* @param sortedFiles thread safe list containing files to be merged.
@@ -158,6 +182,7 @@ public class MergeRunner implements Runnable {
this.phaser = phaser;
this.batchMergeSize = batchMergeSize;
this.threadPoolSize = threadPoolSize;
this.mergeCancelled = new AtomicBoolean(false);
}

private boolean merge(List<File> files, File outputFile) {
@@ -236,7 +261,7 @@ public void run() {
File f = sortedFiles.take();
unmergedFiles.add(f);
log.debug("added sorted file {} to the unmerged list", f.getName());
if (f.equals(MERGE_POISON_PILL)) {
if (f.equals(MERGE_POISON_PILL) || f.equals(MERGE_FORCE_STOP_POISON_PILL)) {
break;
}
// add another batchMergeSize so that we choose the smallest of a larger range
@@ -254,6 +279,17 @@ public void run() {
}
}
log.info("Waiting for batch sorting tasks completion");

// Parent thread signals to stop immediately
if (unmergedFiles.contains(MERGE_FORCE_STOP_POISON_PILL)) {
log.info("Merger receives force stop signal, shutting down all merge tasks");
this.mergeCancelled.set(true);
mergeTaskPhaser.arriveAndAwaitAdvance();
executorService.shutdown();
mergeTaskPhaser.arrive();
phaser.arriveAndDeregister();
return;
}
mergeTaskPhaser.arriveAndAwaitAdvance();
executorService.shutdown();

@@ -297,14 +333,20 @@ private class Task implements Callable<File> {
mergeTaskPhaser.register();
}


@Override
public File call() {
public File call() throws Exception {
try {
if (merge(mergeTarget, mergedFile)) {
log.info("merge complete for {}", mergedFile.getName());
return mergedFile;
String mergedFileName = mergedFile.getName();
if (mergeCancelled.get()) {
log.debug("merge cancelled, skipping merge task");
throw new EOFException("merge skipped for " + mergedFileName);
} else if (merge(mergeTarget, mergedFile)) {
log.info("merge complete for {}", mergedFileName);
} else {
log.error("merge failed for {}", mergedFileName);
throw new RuntimeException("merge failed for " + mergedFileName);
}
log.error("merge failed for {}", mergedFile.getName());
} finally {
mergeTaskPhaser.arriveAndDeregister();
}
@@ -45,6 +45,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.*;
@@ -149,12 +150,7 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {

private static final Logger log = LoggerFactory.getLogger(MultithreadedTraverseWithSortStrategy.class);
private final boolean compressionEnabled;
/** File sortedFile = new File(storeDir, getSortedStoreFileName(compressionEnabled));
Runnable mergeRunner = new MergeRunner(sortedFile, sortedFiles, storeDir, comparator, mergePhaser, compressionEnabled);
Thread merger = new Thread(mergeRunner, mergerThreadName);
merger.setDaemon(true);
merger.start();
phaser.awaitAdvance(Phases.WAITING_FOR_TASK_SPLITS.value);
/**
* Directory where sorted files will be created.
*/
private final File storeDir;
@@ -189,6 +185,8 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {

private final long dumpThreshold;

private Predicate<String> pathPredicate = path -> true;

/**
* Indicates the various phases of {@link #phaser}
*/
@@ -234,13 +232,15 @@ public int getValue() {
MultithreadedTraverseWithSortStrategy(NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory,
List<Long> lastModifiedBreakPoints, PathElementComparator pathComparator,
BlobStore blobStore, File storeDir, List<File> existingDataDumpDirs,
boolean compressionEnabled, MemoryManager memoryManager, long dumpThreshold) throws IOException {
boolean compressionEnabled, MemoryManager memoryManager, long dumpThreshold,
Predicate<String> pathPredicate) throws IOException {
this.storeDir = storeDir;
this.mergeDir = new File(storeDir, mergeDirName);
this.compressionEnabled = compressionEnabled;
this.sortedFiles = new LinkedBlockingQueue<>();
this.throwables = new ConcurrentLinkedQueue<>();
this.comparator = (e1, e2) -> pathComparator.compare(e1.getPathElements(), e2.getPathElements());
this.pathPredicate = pathPredicate;
taskQueue = new LinkedBlockingQueue<>();
phaser = new Phaser() {
@Override
@@ -274,6 +274,10 @@ void createInitialTasks(NodeStateEntryTraverserFactory nodeStateEntryTraverserFa
continue;
} else if (existingSortWorkDir.getName().equals(mergeDirName)) {
log.info("Intermediate Merge Directory {}. Skipping it.", existingSortWorkDir.getAbsolutePath());
DirectoryHelper.getDataFiles(existingSortWorkDir).forEach(file -> {
log.debug("Including existing intermediate merged file {}", file.getPath());
sortedFiles.add(file);
});
continue;
}
boolean downloadCompleted = DirectoryHelper.hasCompleted(existingSortWorkDir);
@@ -315,11 +319,12 @@ void createInitialTasks(NodeStateEntryTraverserFactory nodeStateEntryTraverserFa
void addTask(TraversingRange range, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, BlobStore blobStore,
ConcurrentLinkedQueue<String> completedTasks) throws IOException {
taskQueue.add(new TraverseAndSortTask(range, comparator, blobStore, storeDir,
compressionEnabled, completedTasks, taskQueue, phaser, nodeStateEntryTraverserFactory, memoryManager, dumpThreshold, sortedFiles));
compressionEnabled, completedTasks, taskQueue, phaser, nodeStateEntryTraverserFactory,
memoryManager, dumpThreshold, sortedFiles, pathPredicate));
}

@Override
public File createSortedStoreFile() throws IOException, CompositeException {
public File createSortedStoreFile() throws CompositeException {
String watcherThreadName = "watcher";
String mergerThreadName = "merger";
Thread watcher = new Thread(new TaskRunner(), watcherThreadName);
@@ -341,6 +346,8 @@ public File createSortedStoreFile() throws IOException, CompositeException {
for (Throwable throwable : throwables) {
exception.addSuppressed(throwable);
}
sortedFiles.add(MergeRunner.MERGE_FORCE_STOP_POISON_PILL);
mergePhaser.awaitAdvance(0);
throw exception;
}
log.debug("Result collection complete. Proceeding to final merge.");

0 comments on commit 6ca4dd7

Please sign in to comment.