From 9b78dc8de1732745fd37f091a889f8f64ee95c8a Mon Sep 17 00:00:00 2001 From: Yu-An Lin Date: Sat, 2 Apr 2022 12:22:08 -0700 Subject: [PATCH] OAK-9747 Download resume needs to save progress and handle hidden node on exception remove hidden node filter for traverser add hidden node handle, test failing fix test add fix OAK-9747 update resume merge test OAK-9747 add hidden node test case OAK-9747 address review comment --- .../document/DocumentStoreIndexerBase.java | 26 +-- .../document/NodeStateEntryTraverser.java | 52 ++---- .../flatfile/FlatFileNodeStoreBuilder.java | 9 +- ...MultithreadedTraverseWithSortStrategy.java | 10 +- .../flatfile/TraverseAndSortTask.java | 39 ++++- .../document/flatfile/FlatFileStoreTest.java | 154 ++++++++++++++++-- ...ithreadedTraverseWithSortStrategyTest.java | 4 +- .../flatfile/TraverseAndSortTaskTest.java | 2 +- 8 files changed, 210 insertions(+), 86 deletions(-) diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java index 4dfe938f89d..14ab422d757 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java @@ -19,16 +19,6 @@ package org.apache.jackrabbit.oak.index.indexer.document; -import java.io.Closeable; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; - import com.codahale.metrics.MetricRegistry; import com.google.common.base.Stopwatch; import com.google.common.io.Closer; @@ -66,6 +56,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; + import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_SORTED_FILE_PATH; import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME; @@ -115,7 +115,8 @@ private MongoNodeStateEntryTraverserFactory(RevisionVector rootRevision, Documen } private MongoNodeStateEntryTraverserFactory(RevisionVector rootRevision, DocumentNodeStore documentNodeStore, - MongoDocumentStore documentStore, Logger traversalLogger, CompositeIndexer indexer, Predicate pathPredicate) { + MongoDocumentStore documentStore, Logger traversalLogger, CompositeIndexer indexer, + Predicate pathPredicate) { this.rootRevision = rootRevision; this.documentNodeStore = documentNodeStore; this.documentStore = documentStore; @@ -140,8 +141,7 @@ public NodeStateEntryTraverser create(MongoDocumentTraverser.TraversingRange tra throw new RuntimeException(e); } traversalLogger.trace(id); - }) - .withPathPredicate((pathPredicate != null) ? pathPredicate : indexer::shouldInclude); + }); } } diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntryTraverser.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntryTraverser.java index 1157ef1cc13..05d4e7cbc6b 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntryTraverser.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/NodeStateEntryTraverser.java @@ -19,12 +19,6 @@ package org.apache.jackrabbit.oak.index.indexer.document; -import java.io.Closeable; -import java.io.IOException; -import java.util.Iterator; -import java.util.function.Consumer; -import java.util.function.Predicate; - import com.google.common.collect.FluentIterable; import com.google.common.io.Closer; import org.apache.jackrabbit.oak.plugins.document.Collection; @@ -36,10 +30,14 @@ import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore; import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser; import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable; -import org.apache.jackrabbit.oak.plugins.document.util.Utils; -import org.apache.jackrabbit.oak.spi.state.NodeStateUtils; import org.jetbrains.annotations.NotNull; +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; +import java.util.function.Consumer; +import java.util.function.Predicate; + import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.transform; import static java.util.Collections.emptyList; @@ -58,7 +56,6 @@ public class NodeStateEntryTraverser implements Iterable, Closea private final TraversingRange traversingRange; private Consumer progressReporter = id -> {}; - private Predicate pathPredicate = path -> true; private final String id; @@ -92,11 +89,6 @@ public NodeStateEntryTraverser withProgressCallback(Consumer progressRep return this; } - public NodeStateEntryTraverser withPathPredicate(Predicate pathPredicate) { - this.pathPredicate = pathPredicate; - return this; - } - @Override public void close() throws IOException { closer.close(); @@ -110,10 +102,7 @@ private Iterable getIncludedDocs() { } private boolean includeDoc(NodeDocument doc) { - String path = doc.getPath().toString(); - return !doc.isSplitDocument() - && !NodeStateUtils.isHiddenPath(path) - && pathPredicate.test(path); + return !doc.isSplitDocument(); } /** @@ -157,31 +146,12 @@ private Iterable getDocsFilteredByPath() { private CloseableIterable findAllDocuments() { return new MongoDocumentTraverser(documentStore) - .getAllDocuments(Collection.NODES, traversingRange, this::includeId); + .getAllDocuments(Collection.NODES, traversingRange, this::reportProgress); } - private boolean includeId(String id) { + private boolean reportProgress(String id) { progressReporter.accept(id); - //Cannot interpret long paths as they are hashed. So let them - //be included - if (Utils.isIdFromLongPath(id)){ - return true; - } - - //Not easy to determine path for previous docs - //Given there count is pretty low compared to others - //include them all so that they become part of cache - if (Utils.isPreviousDocId(id)){ - return true; - } - - String path = Utils.getPathFromId(id); - - //Exclude hidden nodes from index data - if (NodeStateUtils.isHiddenPath(path)){ - return false; - } - - return pathPredicate.test(path); + // always returning true here and do the path predicate and hidden node filter when iterating. + return true; } } diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java index a9de938b3e9..5d76f495d25 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.function.Predicate; import com.google.common.collect.Iterables; import org.apache.commons.io.FileUtils; @@ -101,6 +102,7 @@ public class FlatFileNodeStoreBuilder { private File flatFileStoreDir; private final MemoryManager memoryManager; private long dumpThreshold = DEFAULT_DUMP_THRESHOLD; + private Predicate pathPredicate = path -> true; private final boolean useZip = Boolean.parseBoolean(System.getProperty(OAK_INDEXER_USE_ZIP, "true")); private final boolean useTraverseWithSort = Boolean.parseBoolean(System.getProperty(OAK_INDEXER_TRAVERSE_WITH_SORT, "true")); @@ -165,6 +167,11 @@ public FlatFileNodeStoreBuilder withNodeStateEntryTraverserFactory(NodeStateEntr return this; } + public FlatFileNodeStoreBuilder withPathPredicate(Predicate pathPredicate) { + this.pathPredicate = pathPredicate; + return this; + } + public FlatFileStore build() throws IOException, CompositeException { logFlags(); comparator = new PathElementComparator(preferredPathElements); @@ -210,7 +217,7 @@ SortStrategy createSortStrategy(File dir) throws IOException { case MULTITHREADED_TRAVERSE_WITH_SORT: log.info("Using MultithreadedTraverseWithSortStrategy"); return new MultithreadedTraverseWithSortStrategy(nodeStateEntryTraverserFactory, lastModifiedBreakPoints, comparator, - blobStore, dir, existingDataDumpDirs, useZip, memoryManager, dumpThreshold); + blobStore, dir, existingDataDumpDirs, useZip, memoryManager, dumpThreshold, pathPredicate); } throw new IllegalStateException("Not a valid sort strategy value " + sortStrategyType); } diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java index 2b07c90d33b..3f4bccb8097 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java @@ -45,6 +45,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Phaser; +import java.util.function.Predicate; import java.util.stream.Stream; import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.*; @@ -184,6 +185,8 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy { private final long dumpThreshold; + private Predicate pathPredicate = path -> true; + /** * Indicates the various phases of {@link #phaser} */ @@ -229,13 +232,15 @@ public int getValue() { MultithreadedTraverseWithSortStrategy(NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, List lastModifiedBreakPoints, PathElementComparator pathComparator, BlobStore blobStore, File storeDir, List existingDataDumpDirs, - boolean compressionEnabled, MemoryManager memoryManager, long dumpThreshold) throws IOException { + boolean compressionEnabled, MemoryManager memoryManager, long dumpThreshold, + Predicate pathPredicate) throws IOException { this.storeDir = storeDir; this.mergeDir = new File(storeDir, mergeDirName); this.compressionEnabled = compressionEnabled; this.sortedFiles = new LinkedBlockingQueue<>(); this.throwables = new ConcurrentLinkedQueue<>(); this.comparator = (e1, e2) -> pathComparator.compare(e1.getPathElements(), e2.getPathElements()); + this.pathPredicate = pathPredicate; taskQueue = new LinkedBlockingQueue<>(); phaser = new Phaser() { @Override @@ -314,7 +319,8 @@ void createInitialTasks(NodeStateEntryTraverserFactory nodeStateEntryTraverserFa void addTask(TraversingRange range, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, BlobStore blobStore, ConcurrentLinkedQueue completedTasks) throws IOException { taskQueue.add(new TraverseAndSortTask(range, comparator, blobStore, storeDir, - compressionEnabled, completedTasks, taskQueue, phaser, nodeStateEntryTraverserFactory, memoryManager, dumpThreshold, sortedFiles)); + compressionEnabled, completedTasks, taskQueue, phaser, nodeStateEntryTraverserFactory, + memoryManager, dumpThreshold, sortedFiles, pathPredicate)); } @Override diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java index 32eef968408..aecfc057b15 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTask.java @@ -26,6 +26,7 @@ import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory; import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser; import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.state.NodeStateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +45,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount; import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.sizeOf; @@ -105,12 +107,14 @@ class TraverseAndSortTask implements Callable>, MemoryManagerClient { private final MemoryManager memoryManager; private String registrationID; private final long dumpThreshold; + private Predicate pathPredicate = path -> true; TraverseAndSortTask(MongoDocumentTraverser.TraversingRange range, Comparator comparator, BlobStore blobStore, File storeDir, boolean compressionEnabled, Queue completedTasks, Queue>> newTasksQueue, Phaser phaser, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, - MemoryManager memoryManager, long dumpThreshold, BlockingQueue parentSortedFiles) throws IOException { + MemoryManager memoryManager, long dumpThreshold, BlockingQueue parentSortedFiles, + Predicate pathPredicate) throws IOException { this.nodeStates = nodeStateEntryTraverserFactory.create(range); this.taskID = ID_PREFIX + nodeStates.getId(); this.lastModifiedLowerBound = nodeStates.getDocumentTraversalRange().getLastModifiedRange().getLastModifiedFrom(); @@ -127,6 +131,7 @@ class TraverseAndSortTask implements Callable>, MemoryManagerClient { this.memoryManager = memoryManager; this.dumpThreshold = dumpThreshold; this.parentSortedFiles = parentSortedFiles; + this.pathPredicate = pathPredicate; sortWorkDir = DirectoryHelper.createdSortWorkDir(storeDir, taskID, lastModifiedLowerBound, lastModifiedUpperBound); if (range.getStartAfterDocumentID() != null) { DirectoryHelper.markLastProcessedStatus(sortWorkDir, lastModifiedLowerBound, range.getStartAfterDocumentID()); @@ -176,7 +181,17 @@ public List call() { return sortedFiles; } catch (IOException e) { - log.error(taskID + " could not complete download ", e); + log.error(taskID + " could not complete download with ", e); + } catch (Exception e) { + log.error(taskID + " dumping existing progress because it could not complete download with ", e); + try { + sortAndSaveBatch(); + reset(); + log.debug(taskID + " complete dumping existing progress"); + } catch (Exception dumpErr) { + log.warn(taskID + " failed to dump existing progress with ", dumpErr); + } + throw e; } finally { phaser.arriveAndDeregister(); log.info("{} entered finally block.", taskID); @@ -252,19 +267,25 @@ void addEntry(NodeStateEntry e) throws IOException { newTasksQueue.add(new TraverseAndSortTask(new MongoDocumentTraverser.TraversingRange( new LastModifiedRange(splitPoint, this.lastModifiedUpperBound), null), comparator, blobStore, storeDir, compressionEnabled, completedTasks, - newTasksQueue, phaser, nodeStateEntryTraverserFactory, memoryManager, dumpThreshold, parentSortedFiles)); + newTasksQueue, phaser, nodeStateEntryTraverserFactory, memoryManager, + dumpThreshold, parentSortedFiles, pathPredicate)); this.lastModifiedUpperBound = splitPoint; DirectoryHelper.setLastModifiedUpperLimit(sortWorkDir, lastModifiedUpperBound); } } - String jsonText = entryWriter.asJson(e.getNodeState()); - //Here logic differs from NodeStateEntrySorter in sense that - //Holder line consist only of json and not 'path|json' - NodeStateHolder h = new StateInBytesHolder(e.getPath(), jsonText); - entryBatch.add(h); + String path = e.getPath(); + if (!NodeStateUtils.isHiddenPath(path) && pathPredicate.test(path)) { + log.debug("Adding to entry, path={} hidden={} predicate={}", path, NodeStateUtils.isHiddenPath(path), pathPredicate.test(path)); + String jsonText = entryWriter.asJson(e.getNodeState()); + //Here logic differs from NodeStateEntrySorter in sense that + //Holder line consist only of json and not 'path|json' + NodeStateHolder h = new StateInBytesHolder(path, jsonText); + entryBatch.add(h); + updateMemoryUsed(h); + } + lastSavedNodeStateEntry = e; - updateMemoryUsed(h); } diff --git a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java index 1bb8173b118..66d9a1afa01 100644 --- a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java +++ b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java @@ -20,6 +20,7 @@ package org.apache.jackrabbit.oak.index.indexer.document.flatfile; import com.google.common.collect.Iterables; +import org.apache.commons.io.FileUtils; import org.apache.jackrabbit.oak.index.indexer.document.CompositeException; import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry; import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverser; @@ -27,6 +28,7 @@ import org.apache.jackrabbit.oak.plugins.document.mongo.DocumentStoreSplitter; import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser; import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore; +import org.apache.jackrabbit.oak.spi.state.NodeStateUtils; import org.jetbrains.annotations.NotNull; import org.junit.Rule; import org.junit.Test; @@ -36,23 +38,29 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static java.util.Arrays.asList; import static java.util.Collections.singleton; import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_SORT_STRATEGY_TYPE; +import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_USE_ZIP; import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_MERGE_TASK_BATCH_SIZE; +import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_THREAD_POOL_SIZE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @SuppressWarnings("StaticPseudoFunctionalStyleMethod") public class FlatFileStoreTest { @@ -159,7 +167,7 @@ public void parallelDownload() throws Exception { } private FlatFileStore buildFlatFileStore(FlatFileNodeStoreBuilder spyBuilder, List lastModifiedBreakpoints, - TestNodeStateEntryTraverserFactory nsetf, boolean expectException) throws Exception { + TestNodeStateEntryTraverserFactory nsetf, boolean expectException, Long dumpThreshold) throws Exception { boolean exceptionCaught = false; FlatFileStore flatFileStore = null; try { @@ -167,7 +175,7 @@ private FlatFileStore buildFlatFileStore(FlatFileNodeStoreBuilder spyBuilder, Li .withPreferredPathElements(preferred) .withLastModifiedBreakPoints(lastModifiedBreakpoints) .withNodeStateEntryTraverserFactory(nsetf) - .withDumpThreshold(0) + .withDumpThreshold(dumpThreshold) .build(); } catch (CompositeException e) { exceptionCaught = true; @@ -182,6 +190,7 @@ private FlatFileStore buildFlatFileStore(FlatFileNodeStoreBuilder spyBuilder, Li @Test public void resumePreviousUnfinishedDownload() throws Exception { + Long dumpThreshold = 0L; try { System.setProperty(OAK_INDEXER_SORT_STRATEGY_TYPE, FlatFileNodeStoreBuilder.SortStrategyType.MULTITHREADED_TRAVERSE_WITH_SORT.toString()); List mongoDocs = getTestData(); @@ -191,17 +200,17 @@ public void resumePreviousUnfinishedDownload() throws Exception { FlatFileNodeStoreBuilder spyBuilder = Mockito.spy(new FlatFileNodeStoreBuilder(folder.getRoot(), memoryManager)); TestNodeStateEntryTraverserFactory nsetf = new TestNodeStateEntryTraverserFactory(mongoDocs); nsetf.setDeliveryBreakPoint((int)(mongoDocs.size() * 0.25)); - FlatFileStore flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, true); + FlatFileStore flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, true, dumpThreshold); assertNull(flatStore); spyBuilder.addExistingDataDumpDir(spyBuilder.getFlatFileStoreDir()); nsetf.setDeliveryBreakPoint((int)(mongoDocs.size() * 0.50)); - flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, true); + flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, true, dumpThreshold); assertNull(flatStore); memoryManager.isMemoryLow = false; List entryPaths; spyBuilder.addExistingDataDumpDir(spyBuilder.getFlatFileStoreDir()); nsetf.setDeliveryBreakPoint(Integer.MAX_VALUE); - flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, false); + flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, false, dumpThreshold); entryPaths = StreamSupport.stream(flatStore.spliterator(), false) .map(NodeStateEntry::getPath) .collect(Collectors.toList()); @@ -214,6 +223,124 @@ public void resumePreviousUnfinishedDownload() throws Exception { } } + private boolean flatFileStoreMatchCondition(File dir, String filenamePattern, String entry) { + Pattern pattern = Pattern.compile(filenamePattern, Pattern.CASE_INSENSITIVE); + for (File innerDir : Objects.requireNonNull(dir.listFiles())) { + if (innerDir.isDirectory()) { + for (File innerFile : Objects.requireNonNull(innerDir.listFiles())) { + if (!pattern.matcher(innerFile.getName()).find()) { + continue; + } + List lines = null; + try { + lines = FileUtils.readLines(innerFile, StandardCharsets.UTF_8); + } catch (Exception e) { + fail("failed to read FlatFileStore"); + } + for (String line : lines) { + if (line.contains(entry)) { + return true; + } + } + } + } + } + return false; + } + + // with larger size of dump threshold (which result in almost never dump), + // fail in the middle, check that hidden node progress are saved + @Test + public void resumePreviousUnfinishedDownloadWithHiddenNode() throws Exception { + Long dumpThreshold = FileUtils.ONE_MB; + try { + System.setProperty(OAK_INDEXER_SORT_STRATEGY_TYPE, FlatFileNodeStoreBuilder.SortStrategyType.MULTITHREADED_TRAVERSE_WITH_SORT.toString()); + System.setProperty(PROP_THREAD_POOL_SIZE, "1"); + System.setProperty(OAK_INDEXER_USE_ZIP, "false"); + List mongoDocs = new ArrayList() {{ + add(new TestMongoDoc("/10-0", 10)); + add(new TestMongoDoc("/10-0/:hidden1", 10)); + add(new TestMongoDoc("/10-0/:hidden2", 10)); + add(new TestMongoDoc("/10-0/:hidden3", 10)); + add(new TestMongoDoc("/10-0/:hidden4", 10)); + add(new TestMongoDoc("/10-1/end", 10)); + }}; + List lmValues = mongoDocs.stream().map(md -> md.lastModified).distinct().sorted().collect(Collectors.toList()); + List lastModifiedBreakpoints = DocumentStoreSplitter.simpleSplit(lmValues.get(0), lmValues.get(lmValues.size() - 1), 1); + TestMemoryManager memoryManager = new TestMemoryManager(true); + FlatFileNodeStoreBuilder spyBuilder = Mockito.spy(new FlatFileNodeStoreBuilder(folder.getRoot(), memoryManager)); + TestNodeStateEntryTraverserFactory nsetf = new TestNodeStateEntryTraverserFactory(mongoDocs); + List entryPaths; + nsetf.setDeliveryBreakPoint(4); + FlatFileStore flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, true, dumpThreshold); + assertNull(flatStore); + File existingFlatFileStoreDir1 = spyBuilder.getFlatFileStoreDir(); + assertTrue("flatFileStore should dump entry even if exception caught", + flatFileStoreMatchCondition(existingFlatFileStoreDir1, "flatfile", "/10-0")); + assertTrue("flatFileStore should save hidden node progress on exception caught", + flatFileStoreMatchCondition(existingFlatFileStoreDir1, "last-saved","/10-0/:hidden3")); + nsetf.setDeliveryBreakPoint(Integer.MAX_VALUE); + spyBuilder.addExistingDataDumpDir(existingFlatFileStoreDir1); + flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, false, dumpThreshold); + entryPaths = StreamSupport.stream(flatStore.spliterator(), false) + .map(NodeStateEntry::getPath) + .collect(Collectors.toList()); + + List sortedPaths = TestUtils.sortPaths(mongoDocs.stream() + .map(md -> md.path) + .filter(path -> !NodeStateUtils.isHiddenPath(path)) + .collect(Collectors.toList())); + assertEquals(mongoDocs.size(), nsetf.getTotalProvidedDocCount()); + assertEquals(sortedPaths, entryPaths); + } finally { + System.clearProperty(OAK_INDEXER_SORT_STRATEGY_TYPE); + System.clearProperty(PROP_THREAD_POOL_SIZE); + System.clearProperty(OAK_INDEXER_USE_ZIP); + } + } + + // with larger size of dump threshold (which result in almost never dump), + // fail in the middle, check that there are data being dumped + @Test + public void resumePreviousUnfinishedDownloadWithGracefulDump() throws Exception { + Long dumpThreshold = FileUtils.ONE_MB; + try { + System.setProperty(OAK_INDEXER_SORT_STRATEGY_TYPE, FlatFileNodeStoreBuilder.SortStrategyType.MULTITHREADED_TRAVERSE_WITH_SORT.toString()); + System.setProperty(PROP_THREAD_POOL_SIZE, "1"); + System.setProperty(OAK_INDEXER_USE_ZIP, "false"); + List mongoDocs = new ArrayList() {{ + add(new TestMongoDoc("/10-0", 10)); + add(new TestMongoDoc("/10-1", 10)); + }}; + List lmValues = mongoDocs.stream().map(md -> md.lastModified).distinct().sorted().collect(Collectors.toList()); + List lastModifiedBreakpoints = DocumentStoreSplitter.simpleSplit(lmValues.get(0), lmValues.get(lmValues.size() - 1), 1); + TestMemoryManager memoryManager = new TestMemoryManager(true); + FlatFileNodeStoreBuilder spyBuilder = Mockito.spy(new FlatFileNodeStoreBuilder(folder.getRoot(), memoryManager)); + TestNodeStateEntryTraverserFactory nsetf = new TestNodeStateEntryTraverserFactory(mongoDocs); + List entryPaths; + nsetf.setDeliveryBreakPoint(1); + FlatFileStore flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, true, dumpThreshold); + assertNull(flatStore); + File existingFlatFileStoreDir1 = spyBuilder.getFlatFileStoreDir(); + assertTrue("flatFileStore should dump entry even if exception caught", + flatFileStoreMatchCondition(existingFlatFileStoreDir1, "flatfile", "/10-0")); + nsetf.setDeliveryBreakPoint(Integer.MAX_VALUE); + spyBuilder.addExistingDataDumpDir(existingFlatFileStoreDir1); + flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, false, dumpThreshold); + entryPaths = StreamSupport.stream(flatStore.spliterator(), false) + .map(NodeStateEntry::getPath) + .collect(Collectors.toList()); + + List sortedPaths = TestUtils.sortPaths(mongoDocs.stream().map(md -> md.path).collect(Collectors.toList())); + assertEquals(mongoDocs.size(), nsetf.getTotalProvidedDocCount()); + assertEquals(sortedPaths, entryPaths); + } finally { + System.clearProperty(OAK_INDEXER_SORT_STRATEGY_TYPE); + System.clearProperty(PROP_THREAD_POOL_SIZE); + System.clearProperty(OAK_INDEXER_USE_ZIP); + } + } + private void assertContainsMergeFolder(File dir, Boolean mustBeEmpty) { Boolean mergeFolderExist = false; for (File workDir : dir.listFiles()) { @@ -230,6 +357,7 @@ private void assertContainsMergeFolder(File dir, Boolean mustBeEmpty) { @Test public void resumePreviousUnfinishedDownloadAndMerge() throws Exception { + Long dumpThreshold = 0L; try { System.setProperty(OAK_INDEXER_SORT_STRATEGY_TYPE, FlatFileNodeStoreBuilder.SortStrategyType.MULTITHREADED_TRAVERSE_WITH_SORT.toString()); System.setProperty(PROP_MERGE_TASK_BATCH_SIZE, "2"); @@ -240,13 +368,13 @@ public void resumePreviousUnfinishedDownloadAndMerge() throws Exception { FlatFileNodeStoreBuilder spyBuilder = Mockito.spy(new FlatFileNodeStoreBuilder(folder.getRoot(), memoryManager)); TestNodeStateEntryTraverserFactory nsetf = new TestNodeStateEntryTraverserFactory(mongoDocs); nsetf.setDeliveryBreakPoint((int)(mongoDocs.size() * 0.50)); - FlatFileStore flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, true); + FlatFileStore flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, true, dumpThreshold); assertNull(flatStore); File existingFlatFileStoreDir1 = spyBuilder.getFlatFileStoreDir(); assertContainsMergeFolder(existingFlatFileStoreDir1, false); spyBuilder.addExistingDataDumpDir(existingFlatFileStoreDir1); nsetf.setDeliveryBreakPoint((int)(mongoDocs.size() * 0.75)); - flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, true); + flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, true, dumpThreshold); assertNull(flatStore); memoryManager.isMemoryLow = false; List entryPaths; @@ -254,7 +382,7 @@ public void resumePreviousUnfinishedDownloadAndMerge() throws Exception { assertContainsMergeFolder(existingFlatFileStoreDir2, false); spyBuilder.addExistingDataDumpDir(existingFlatFileStoreDir2); nsetf.setDeliveryBreakPoint(Integer.MAX_VALUE); - flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, false); + flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, false, dumpThreshold); entryPaths = StreamSupport.stream(flatStore.spliterator(), false) .map(NodeStateEntry::getPath) .collect(Collectors.toList()); @@ -320,16 +448,11 @@ private static class TestNodeStateEntryTraverserFactory implements NodeStateEntr * factory has created till now. */ final AtomicInteger providedDocuments; - /** - * Keeps count of documents which have already been returned in the past - */ - final AtomicInteger duplicateCount; public TestNodeStateEntryTraverserFactory(List mongoDocs) { this.mongoDocs = mongoDocs; this.breakAfterDelivering = new AtomicInteger(Integer.MAX_VALUE); this.providedDocuments = new AtomicInteger(0); - this.duplicateCount = new AtomicInteger(0); } void setDeliveryBreakPoint(int value) { @@ -363,7 +486,6 @@ public boolean hasNext() { public NodeStateEntry next() { if (providedDocuments.get() == breakAfterDelivering.get()) { logger.debug("{} Breaking after getting docs with id {}", traverserId, lastReturnedDoc.getId()); - duplicateCount.incrementAndGet(); throw new IllegalStateException(EXCEPTION_MESSAGE); } providedDocuments.incrementAndGet(); @@ -378,9 +500,8 @@ public NodeStateEntry next() { } int getTotalProvidedDocCount() { - return providedDocuments.get() - duplicateCount.get(); + return providedDocuments.get(); } - } private List createTestPaths() { @@ -467,5 +588,4 @@ private List getTestData() { }}; } - } \ No newline at end of file diff --git a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategyTest.java b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategyTest.java index f5e9ef96ff3..5530f8a2b83 100644 --- a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategyTest.java +++ b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategyTest.java @@ -44,7 +44,7 @@ public void initialRanges() throws IOException { List ranges = new ArrayList<>(); MultithreadedTraverseWithSortStrategy mtws = new MultithreadedTraverseWithSortStrategy(null, lastModifiedBreakpoints, null, null, null, null, true, null, - FlatFileNodeStoreBuilder.DEFAULT_DUMP_THRESHOLD) { + FlatFileNodeStoreBuilder.DEFAULT_DUMP_THRESHOLD, path -> true) { @Override void addTask(TraversingRange range, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, BlobStore blobStore, ConcurrentLinkedQueue completedTasks) throws IOException { @@ -104,7 +104,7 @@ public void rangesDuringResume() throws IOException { List ranges = new ArrayList<>(); MultithreadedTraverseWithSortStrategy mtws = new MultithreadedTraverseWithSortStrategy(null, null, null, null, null, workDirs, true, null, - FlatFileNodeStoreBuilder.DEFAULT_DUMP_THRESHOLD) { + FlatFileNodeStoreBuilder.DEFAULT_DUMP_THRESHOLD, path -> true) { @Override void addTask(TraversingRange range, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, BlobStore blobStore, ConcurrentLinkedQueue completedTasks) throws IOException { diff --git a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTaskTest.java b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTaskTest.java index 3d0132fef02..9030614aa18 100644 --- a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTaskTest.java +++ b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTaskTest.java @@ -70,7 +70,7 @@ public void taskSplit() throws IOException { File store = new File("target/" + this.getClass().getSimpleName() + "-" + System.currentTimeMillis()); TraverseAndSortTask tst = new TraverseAndSortTask(traversingRange, null, null, store, true, new LinkedList<>(Collections.singletonList("1")), newTaskQueue, phaser, new NodeStateEntryTraverserFactoryImpl(), mockMemManager, - FlatFileNodeStoreBuilder.DEFAULT_DUMP_THRESHOLD, new LinkedBlockingQueue()); + FlatFileNodeStoreBuilder.DEFAULT_DUMP_THRESHOLD, new LinkedBlockingQueue(), path -> true); NodeStateEntry mockEntry = Mockito.mock(NodeStateEntry.class); long lastModified = (lmRange.getLastModifiedFrom() + lmRange.getLastModifiedTo())/2;