Skip to content
Permalink
Browse files
OAK-9747 Download resume needs to save progress and handle hidden nod…
…e on exception

remove hidden node filter for traverser

add hidden node handle, test failing

fix test

add fix

OAK-9747 update resume merge test

OAK-9747 add hidden node test case

OAK-9747 address review comment
  • Loading branch information
Ewocker committed Apr 13, 2022
1 parent 983f82e commit 9b78dc8de1732745fd37f091a889f8f64ee95c8a
Showing 8 changed files with 210 additions and 86 deletions.
@@ -19,16 +19,6 @@

package org.apache.jackrabbit.oak.index.indexer.document;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Stopwatch;
import com.google.common.io.Closer;
@@ -66,6 +56,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_SORTED_FILE_PATH;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
@@ -115,7 +115,8 @@ private MongoNodeStateEntryTraverserFactory(RevisionVector rootRevision, Documen
}

private MongoNodeStateEntryTraverserFactory(RevisionVector rootRevision, DocumentNodeStore documentNodeStore,
MongoDocumentStore documentStore, Logger traversalLogger, CompositeIndexer indexer, Predicate<String> pathPredicate) {
MongoDocumentStore documentStore, Logger traversalLogger, CompositeIndexer indexer,
Predicate<String> pathPredicate) {
this.rootRevision = rootRevision;
this.documentNodeStore = documentNodeStore;
this.documentStore = documentStore;
@@ -140,8 +141,7 @@ public NodeStateEntryTraverser create(MongoDocumentTraverser.TraversingRange tra
throw new RuntimeException(e);
}
traversalLogger.trace(id);
})
.withPathPredicate((pathPredicate != null) ? pathPredicate : indexer::shouldInclude);
});
}
}

@@ -19,12 +19,6 @@

package org.apache.jackrabbit.oak.index.indexer.document;

import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Predicate;

import com.google.common.collect.FluentIterable;
import com.google.common.io.Closer;
import org.apache.jackrabbit.oak.plugins.document.Collection;
@@ -36,10 +30,14 @@
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser;
import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
import org.jetbrains.annotations.NotNull;

import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Predicate;

import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.transform;
import static java.util.Collections.emptyList;
@@ -58,7 +56,6 @@ public class NodeStateEntryTraverser implements Iterable<NodeStateEntry>, Closea
private final TraversingRange traversingRange;

private Consumer<String> progressReporter = id -> {};
private Predicate<String> pathPredicate = path -> true;

private final String id;

@@ -92,11 +89,6 @@ public NodeStateEntryTraverser withProgressCallback(Consumer<String> progressRep
return this;
}

public NodeStateEntryTraverser withPathPredicate(Predicate<String> pathPredicate) {
this.pathPredicate = pathPredicate;
return this;
}

@Override
public void close() throws IOException {
closer.close();
@@ -110,10 +102,7 @@ private Iterable<NodeStateEntry> getIncludedDocs() {
}

private boolean includeDoc(NodeDocument doc) {
String path = doc.getPath().toString();
return !doc.isSplitDocument()
&& !NodeStateUtils.isHiddenPath(path)
&& pathPredicate.test(path);
return !doc.isSplitDocument();
}

/**
@@ -157,31 +146,12 @@ private Iterable<NodeDocument> getDocsFilteredByPath() {

private CloseableIterable<NodeDocument> findAllDocuments() {
return new MongoDocumentTraverser(documentStore)
.getAllDocuments(Collection.NODES, traversingRange, this::includeId);
.getAllDocuments(Collection.NODES, traversingRange, this::reportProgress);
}

private boolean includeId(String id) {
private boolean reportProgress(String id) {
progressReporter.accept(id);
//Cannot interpret long paths as they are hashed. So let them
//be included
if (Utils.isIdFromLongPath(id)){
return true;
}

//Not easy to determine path for previous docs
//Given there count is pretty low compared to others
//include them all so that they become part of cache
if (Utils.isPreviousDocId(id)){
return true;
}

String path = Utils.getPathFromId(id);

//Exclude hidden nodes from index data
if (NodeStateUtils.isHiddenPath(path)){
return false;
}

return pathPredicate.test(path);
// always returning true here and do the path predicate and hidden node filter when iterating.
return true;
}
}
@@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;

import com.google.common.collect.Iterables;
import org.apache.commons.io.FileUtils;
@@ -101,6 +102,7 @@ public class FlatFileNodeStoreBuilder {
private File flatFileStoreDir;
private final MemoryManager memoryManager;
private long dumpThreshold = DEFAULT_DUMP_THRESHOLD;
private Predicate<String> pathPredicate = path -> true;

private final boolean useZip = Boolean.parseBoolean(System.getProperty(OAK_INDEXER_USE_ZIP, "true"));
private final boolean useTraverseWithSort = Boolean.parseBoolean(System.getProperty(OAK_INDEXER_TRAVERSE_WITH_SORT, "true"));
@@ -165,6 +167,11 @@ public FlatFileNodeStoreBuilder withNodeStateEntryTraverserFactory(NodeStateEntr
return this;
}

public FlatFileNodeStoreBuilder withPathPredicate(Predicate<String> pathPredicate) {
this.pathPredicate = pathPredicate;
return this;
}

public FlatFileStore build() throws IOException, CompositeException {
logFlags();
comparator = new PathElementComparator(preferredPathElements);
@@ -210,7 +217,7 @@ SortStrategy createSortStrategy(File dir) throws IOException {
case MULTITHREADED_TRAVERSE_WITH_SORT:
log.info("Using MultithreadedTraverseWithSortStrategy");
return new MultithreadedTraverseWithSortStrategy(nodeStateEntryTraverserFactory, lastModifiedBreakPoints, comparator,
blobStore, dir, existingDataDumpDirs, useZip, memoryManager, dumpThreshold);
blobStore, dir, existingDataDumpDirs, useZip, memoryManager, dumpThreshold, pathPredicate);
}
throw new IllegalStateException("Not a valid sort strategy value " + sortStrategyType);
}
@@ -45,6 +45,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.*;
@@ -184,6 +185,8 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {

private final long dumpThreshold;

private Predicate<String> pathPredicate = path -> true;

/**
* Indicates the various phases of {@link #phaser}
*/
@@ -229,13 +232,15 @@ public int getValue() {
MultithreadedTraverseWithSortStrategy(NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory,
List<Long> lastModifiedBreakPoints, PathElementComparator pathComparator,
BlobStore blobStore, File storeDir, List<File> existingDataDumpDirs,
boolean compressionEnabled, MemoryManager memoryManager, long dumpThreshold) throws IOException {
boolean compressionEnabled, MemoryManager memoryManager, long dumpThreshold,
Predicate<String> pathPredicate) throws IOException {
this.storeDir = storeDir;
this.mergeDir = new File(storeDir, mergeDirName);
this.compressionEnabled = compressionEnabled;
this.sortedFiles = new LinkedBlockingQueue<>();
this.throwables = new ConcurrentLinkedQueue<>();
this.comparator = (e1, e2) -> pathComparator.compare(e1.getPathElements(), e2.getPathElements());
this.pathPredicate = pathPredicate;
taskQueue = new LinkedBlockingQueue<>();
phaser = new Phaser() {
@Override
@@ -314,7 +319,8 @@ void createInitialTasks(NodeStateEntryTraverserFactory nodeStateEntryTraverserFa
void addTask(TraversingRange range, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, BlobStore blobStore,
ConcurrentLinkedQueue<String> completedTasks) throws IOException {
taskQueue.add(new TraverseAndSortTask(range, comparator, blobStore, storeDir,
compressionEnabled, completedTasks, taskQueue, phaser, nodeStateEntryTraverserFactory, memoryManager, dumpThreshold, sortedFiles));
compressionEnabled, completedTasks, taskQueue, phaser, nodeStateEntryTraverserFactory,
memoryManager, dumpThreshold, sortedFiles, pathPredicate));
}

@Override
@@ -26,6 +26,7 @@
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -44,6 +45,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.sizeOf;
@@ -105,12 +107,14 @@ class TraverseAndSortTask implements Callable<List<File>>, MemoryManagerClient {
private final MemoryManager memoryManager;
private String registrationID;
private final long dumpThreshold;
private Predicate<String> pathPredicate = path -> true;

TraverseAndSortTask(MongoDocumentTraverser.TraversingRange range, Comparator<NodeStateHolder> comparator,
BlobStore blobStore, File storeDir, boolean compressionEnabled,
Queue<String> completedTasks, Queue<Callable<List<File>>> newTasksQueue,
Phaser phaser, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory,
MemoryManager memoryManager, long dumpThreshold, BlockingQueue parentSortedFiles) throws IOException {
MemoryManager memoryManager, long dumpThreshold, BlockingQueue parentSortedFiles,
Predicate<String> pathPredicate) throws IOException {
this.nodeStates = nodeStateEntryTraverserFactory.create(range);
this.taskID = ID_PREFIX + nodeStates.getId();
this.lastModifiedLowerBound = nodeStates.getDocumentTraversalRange().getLastModifiedRange().getLastModifiedFrom();
@@ -127,6 +131,7 @@ class TraverseAndSortTask implements Callable<List<File>>, MemoryManagerClient {
this.memoryManager = memoryManager;
this.dumpThreshold = dumpThreshold;
this.parentSortedFiles = parentSortedFiles;
this.pathPredicate = pathPredicate;
sortWorkDir = DirectoryHelper.createdSortWorkDir(storeDir, taskID, lastModifiedLowerBound, lastModifiedUpperBound);
if (range.getStartAfterDocumentID() != null) {
DirectoryHelper.markLastProcessedStatus(sortWorkDir, lastModifiedLowerBound, range.getStartAfterDocumentID());
@@ -176,7 +181,17 @@ public List<File> call() {

return sortedFiles;
} catch (IOException e) {
log.error(taskID + " could not complete download ", e);
log.error(taskID + " could not complete download with ", e);
} catch (Exception e) {
log.error(taskID + " dumping existing progress because it could not complete download with ", e);
try {
sortAndSaveBatch();
reset();
log.debug(taskID + " complete dumping existing progress");
} catch (Exception dumpErr) {
log.warn(taskID + " failed to dump existing progress with ", dumpErr);
}
throw e;
} finally {
phaser.arriveAndDeregister();
log.info("{} entered finally block.", taskID);
@@ -252,19 +267,25 @@ void addEntry(NodeStateEntry e) throws IOException {
newTasksQueue.add(new TraverseAndSortTask(new MongoDocumentTraverser.TraversingRange(
new LastModifiedRange(splitPoint, this.lastModifiedUpperBound), null),
comparator, blobStore, storeDir, compressionEnabled, completedTasks,
newTasksQueue, phaser, nodeStateEntryTraverserFactory, memoryManager, dumpThreshold, parentSortedFiles));
newTasksQueue, phaser, nodeStateEntryTraverserFactory, memoryManager,
dumpThreshold, parentSortedFiles, pathPredicate));
this.lastModifiedUpperBound = splitPoint;
DirectoryHelper.setLastModifiedUpperLimit(sortWorkDir, lastModifiedUpperBound);
}
}

String jsonText = entryWriter.asJson(e.getNodeState());
//Here logic differs from NodeStateEntrySorter in sense that
//Holder line consist only of json and not 'path|json'
NodeStateHolder h = new StateInBytesHolder(e.getPath(), jsonText);
entryBatch.add(h);
String path = e.getPath();
if (!NodeStateUtils.isHiddenPath(path) && pathPredicate.test(path)) {
log.debug("Adding to entry, path={} hidden={} predicate={}", path, NodeStateUtils.isHiddenPath(path), pathPredicate.test(path));
String jsonText = entryWriter.asJson(e.getNodeState());
//Here logic differs from NodeStateEntrySorter in sense that
//Holder line consist only of json and not 'path|json'
NodeStateHolder h = new StateInBytesHolder(path, jsonText);
entryBatch.add(h);
updateMemoryUsed(h);
}

lastSavedNodeStateEntry = e;
updateMemoryUsed(h);

}

0 comments on commit 9b78dc8

Please sign in to comment.