Skip to content
Permalink
Browse files
OAK-9748 Parallel Merge does not handle download retry correctly
remove documentation typos

improve test case

OAK-9748 address review comment

OAK-9748 update javadoc and refactor

OAK-9748 address review comment
  • Loading branch information
Ewocker committed Apr 13, 2022
1 parent e38d861 commit 983f82ebe99229f69128d21d3fe5c47c3e523db5
Showing 3 changed files with 128 additions and 25 deletions.
@@ -39,6 +39,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static com.google.common.base.Charsets.UTF_8;
@@ -100,6 +101,20 @@
* </ol>
* </li>
* </ol>
*
* <h3>Force Stop Explanation -</h3>
* <ol>
* <li>On receiving MERGE_FORCE_STOP_POISON_PILL in SORTED_FILE_QUEUE from parent thread, all un-started tasks will be skipped.</li>
* <li>No more tasks will be created on MERGE_FORCE_STOP_POISON_PILL message is received.</li>
* <li>Running merge will continue to finish.</li>
* <li>Final merge will not be performed.</li>
* <li>This will not result in lose of data under conditions that -</li>
* <ol>
* <li>Files will not be removed until merge task is completed. Parent thread should add unmerged files to the SORTED_FILE_QUEUE on retry.</li>
* <li>Merged files will still be under the merge folder. Parent thread should add those files to the SORTED_FILE_QUEUE on retry.</li>
* </ol>
* </li>
* </ol>
*/
public class MergeRunner implements Runnable {
private static final Logger log = LoggerFactory.getLogger(MergeRunner.class);
@@ -136,10 +151,18 @@ public class MergeRunner implements Runnable {
private final Phaser phaser;

/**
* This poison pill is added to {@link #sortedFiles} to indicate that download phase has completed.
* This poison pill is added to {@link #sortedFiles} to indicate that download phase has completed successfully and
* merge should advance to the final merge stage.
*/
public static final File MERGE_POISON_PILL = new File("");

/**
* This poison pill is added to {@link #sortedFiles} to indicate that download phase has failed and
* merge should shut down immediately and not advance to the final merge stage.
*/
public static final File MERGE_FORCE_STOP_POISON_PILL = new File("merge-force-stop-poison-pill");
private final AtomicBoolean mergeCancelled;

/**
* Constructor.
* @param sortedFiles thread safe list containing files to be merged.
@@ -158,6 +181,7 @@ public class MergeRunner implements Runnable {
this.phaser = phaser;
this.batchMergeSize = batchMergeSize;
this.threadPoolSize = threadPoolSize;
this.mergeCancelled = new AtomicBoolean(false);
}

private boolean merge(List<File> files, File outputFile) {
@@ -236,7 +260,7 @@ public void run() {
File f = sortedFiles.take();
unmergedFiles.add(f);
log.debug("added sorted file {} to the unmerged list", f.getName());
if (f.equals(MERGE_POISON_PILL)) {
if (f.equals(MERGE_POISON_PILL) || f.equals(MERGE_FORCE_STOP_POISON_PILL)) {
break;
}
// add another batchMergeSize so that we choose the smallest of a larger range
@@ -254,6 +278,17 @@ public void run() {
}
}
log.info("Waiting for batch sorting tasks completion");

// Parent thread signals to stop immediately
if (unmergedFiles.contains(MERGE_FORCE_STOP_POISON_PILL)) {
log.info("Merger receives force stop signal, shutting down all merge tasks");
this.mergeCancelled.set(true);
mergeTaskPhaser.arriveAndAwaitAdvance();
executorService.shutdown();
mergeTaskPhaser.arrive();
phaser.arriveAndDeregister();
return;
}
mergeTaskPhaser.arriveAndAwaitAdvance();
executorService.shutdown();

@@ -297,14 +332,20 @@ private class Task implements Callable<File> {
mergeTaskPhaser.register();
}


@Override
public File call() {
public File call() throws Exception {
try {
if (merge(mergeTarget, mergedFile)) {
log.info("merge complete for {}", mergedFile.getName());
return mergedFile;
String mergedFileName = mergedFile.getName();
if (mergeCancelled.get()) {
log.debug("merge cancelled, skipping merge task");
throw new Exception("merge skipped for " + mergedFileName);
} else if (merge(mergeTarget, mergedFile)) {
log.info("merge complete for {}", mergedFileName);
} else {
log.error("merge failed for {}", mergedFileName);
throw new Exception("merge failed for " + mergedFileName);
}
log.error("merge failed for {}", mergedFile.getName());
} finally {
mergeTaskPhaser.arriveAndDeregister();
}
@@ -149,12 +149,7 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {

private static final Logger log = LoggerFactory.getLogger(MultithreadedTraverseWithSortStrategy.class);
private final boolean compressionEnabled;
/** File sortedFile = new File(storeDir, getSortedStoreFileName(compressionEnabled));
Runnable mergeRunner = new MergeRunner(sortedFile, sortedFiles, storeDir, comparator, mergePhaser, compressionEnabled);
Thread merger = new Thread(mergeRunner, mergerThreadName);
merger.setDaemon(true);
merger.start();
phaser.awaitAdvance(Phases.WAITING_FOR_TASK_SPLITS.value);
/**
* Directory where sorted files will be created.
*/
private final File storeDir;
@@ -274,6 +269,10 @@ void createInitialTasks(NodeStateEntryTraverserFactory nodeStateEntryTraverserFa
continue;
} else if (existingSortWorkDir.getName().equals(mergeDirName)) {
log.info("Intermediate Merge Directory {}. Skipping it.", existingSortWorkDir.getAbsolutePath());
DirectoryHelper.getDataFiles(existingSortWorkDir).forEach(file -> {
log.debug("Including existing intermediate merged file {}", file.getPath());
sortedFiles.add(file);
});
continue;
}
boolean downloadCompleted = DirectoryHelper.hasCompleted(existingSortWorkDir);
@@ -319,7 +318,7 @@ void addTask(TraversingRange range, NodeStateEntryTraverserFactory nodeStateEntr
}

@Override
public File createSortedStoreFile() throws IOException, CompositeException {
public File createSortedStoreFile() throws CompositeException {
String watcherThreadName = "watcher";
String mergerThreadName = "merger";
Thread watcher = new Thread(new TaskRunner(), watcherThreadName);
@@ -341,6 +340,8 @@ public File createSortedStoreFile() throws IOException, CompositeException {
for (Throwable throwable : throwables) {
exception.addSuppressed(throwable);
}
sortedFiles.add(MergeRunner.MERGE_FORCE_STOP_POISON_PILL);
mergePhaser.awaitAdvance(0);
throw exception;
}
log.debug("Result collection complete. Proceeding to final merge.");
@@ -19,17 +19,6 @@

package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import com.google.common.collect.Iterables;
import org.apache.jackrabbit.oak.index.indexer.document.CompositeException;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
@@ -46,11 +35,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_SORT_STRATEGY_TYPE;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_MERGE_TASK_BATCH_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

@SuppressWarnings("StaticPseudoFunctionalStyleMethod")
public class FlatFileStoreTest {
@@ -212,6 +214,65 @@ public void resumePreviousUnfinishedDownload() throws Exception {
}
}

private void assertContainsMergeFolder(File dir, Boolean mustBeEmpty) {
Boolean mergeFolderExist = false;
for (File workDir : dir.listFiles()) {
if (workDir.getName().equals("merge") && workDir.isDirectory()) {
mergeFolderExist = true;
if (mustBeEmpty) {
assertTrue("merge directory should not be empty", workDir.listFiles().length == 0);
}
break;
}
}
assertTrue("merge directory should exist", mergeFolderExist);
}

@Test
public void resumePreviousUnfinishedDownloadAndMerge() throws Exception {
try {
System.setProperty(OAK_INDEXER_SORT_STRATEGY_TYPE, FlatFileNodeStoreBuilder.SortStrategyType.MULTITHREADED_TRAVERSE_WITH_SORT.toString());
System.setProperty(PROP_MERGE_TASK_BATCH_SIZE, "2");
List<TestMongoDoc> mongoDocs = getTestData();
List<Long> lmValues = mongoDocs.stream().map(md -> md.lastModified).distinct().sorted().collect(Collectors.toList());
List<Long> lastModifiedBreakpoints = DocumentStoreSplitter.simpleSplit(lmValues.get(0), lmValues.get(lmValues.size() - 1), 10);
TestMemoryManager memoryManager = new TestMemoryManager(true);
FlatFileNodeStoreBuilder spyBuilder = Mockito.spy(new FlatFileNodeStoreBuilder(folder.getRoot(), memoryManager));
TestNodeStateEntryTraverserFactory nsetf = new TestNodeStateEntryTraverserFactory(mongoDocs);
nsetf.setDeliveryBreakPoint((int)(mongoDocs.size() * 0.50));
FlatFileStore flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, true);
assertNull(flatStore);
File existingFlatFileStoreDir1 = spyBuilder.getFlatFileStoreDir();
assertContainsMergeFolder(existingFlatFileStoreDir1, false);
spyBuilder.addExistingDataDumpDir(existingFlatFileStoreDir1);
nsetf.setDeliveryBreakPoint((int)(mongoDocs.size() * 0.75));
flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, true);
assertNull(flatStore);
memoryManager.isMemoryLow = false;
List<String> entryPaths;
File existingFlatFileStoreDir2 = spyBuilder.getFlatFileStoreDir();
assertContainsMergeFolder(existingFlatFileStoreDir2, false);
spyBuilder.addExistingDataDumpDir(existingFlatFileStoreDir2);
nsetf.setDeliveryBreakPoint(Integer.MAX_VALUE);
flatStore = buildFlatFileStore(spyBuilder, lastModifiedBreakpoints, nsetf, false);
entryPaths = StreamSupport.stream(flatStore.spliterator(), false)
.map(NodeStateEntry::getPath)
.collect(Collectors.toList());

// Intermediate MergeFiles should be deleted after being merged
assertContainsMergeFolder(existingFlatFileStoreDir1, true);
assertContainsMergeFolder(existingFlatFileStoreDir2, true);
assertContainsMergeFolder(spyBuilder.getFlatFileStoreDir(), true);

List<String> sortedPaths = TestUtils.sortPaths(mongoDocs.stream().map(md -> md.path).collect(Collectors.toList()));
assertEquals(mongoDocs.size(), nsetf.getTotalProvidedDocCount());
assertEquals(sortedPaths, entryPaths);
} finally {
System.clearProperty(OAK_INDEXER_SORT_STRATEGY_TYPE);
System.clearProperty(PROP_MERGE_TASK_BATCH_SIZE);
}
}

private static class TestMemoryManager implements MemoryManager {

boolean isMemoryLow;

0 comments on commit 983f82e

Please sign in to comment.