Skip to content
Permalink
Browse files
multithread parallel merge
  • Loading branch information
Ewocker committed Mar 10, 2022
1 parent ff097ac commit 2b82c893c25ab0f355b3b69ba59cf7eccb24b36c
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 76 deletions.
@@ -59,12 +59,30 @@ public class FlatFileNodeStoreBuilder {
/**
* Default value for {@link #PROP_THREAD_POOL_SIZE}
*/
static final String DEFAULT_NUMBER_OF_DATA_DUMP_THREADS = "4";
static final int DEFAULT_NUMBER_OF_DATA_DUMP_THREADS = 4;
/**
* System property for specifying number of threads for parallel download when using {@link MultithreadedTraverseWithSortStrategy}
*/
static final String PROP_THREAD_POOL_SIZE = "oak.indexer.dataDumpThreadPoolSize";

/**
* Default value for {@link #PROP_MERGE_THREAD_POOL_SIZE}
*/
static final int DEFAULT_NUMBER_OF_MERGE_TASK_THREADS = 4;
/**
* System property for specifying number of threads for parallel merge when using {@link MultithreadedTraverseWithSortStrategy}
*/
static final String PROP_MERGE_THREAD_POOL_SIZE = "oak.indexer.mergeTaskThreadPoolSize";

/**
* Default value for {@link #PROP_MERGE_TASK_BATCH_SIZE}
*/
static final int DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK = 64;
/**
* System property for specifying number of files for batch merge task when using {@link MultithreadedTraverseWithSortStrategy}
*/
static final String PROP_MERGE_TASK_BATCH_SIZE = "oak.indexer.mergeTaskBatchSize";

/**
* Value of this system property indicates max memory that should be used if jmx based memory monitoring is not available.
*/
@@ -20,6 +20,7 @@

import com.google.common.base.Stopwatch;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.comparator.SizeFileComparator;
import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
import org.apache.jackrabbit.oak.index.indexer.document.CompositeException;
import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
@@ -29,12 +30,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -43,14 +49,16 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.google.common.base.Charsets.UTF_8;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.DEFAULT_NUMBER_OF_DATA_DUMP_THREADS;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_THREAD_POOL_SIZE;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.DEFAULT_NUMBER_OF_MERGE_TASK_THREADS;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_MERGE_THREAD_POOL_SIZE;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_MERGE_TASK_BATCH_SIZE;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.createWriter;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.getSortedStoreFileName;
import static org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser.TraversingRange;
@@ -271,7 +279,7 @@ void createInitialTasks(NodeStateEntryTraverserFactory nodeStateEntryTraverserFa
log.info("Not a directory {}. Skipping it.", existingSortWorkDir.getAbsolutePath());
continue;
} else if (existingSortWorkDir.getName().equals("merge")) {
log.info("Intermediate Merge Directory. Skipping it.", existingSortWorkDir.getAbsolutePath());
log.info("Intermediate Merge Directory {}. Skipping it.", existingSortWorkDir.getAbsolutePath());
continue;
}
boolean downloadCompleted = DirectoryHelper.hasCompleted(existingSortWorkDir);
@@ -336,7 +344,7 @@ public File createSortedStoreFile() throws IOException, CompositeException {
}
throw exception;
}
log.debug("Result collection complete. Proceeding to final merging.");
log.debug("Result collection complete. Proceeding to final merge.");
Stopwatch w = Stopwatch.createStarted();
sortedFiles.add(MERGE_POISON_PILL);
mergePhaser.awaitAdvance(0);
@@ -361,7 +369,7 @@ public long getEntryCount() {
private class TaskRunner implements Runnable {

private final ExecutorService executorService;
private final int threadPoolSize = Integer.parseInt(System.getProperty(PROP_THREAD_POOL_SIZE, DEFAULT_NUMBER_OF_DATA_DUMP_THREADS));
private final int threadPoolSize = Integer.getInteger(PROP_THREAD_POOL_SIZE, DEFAULT_NUMBER_OF_DATA_DUMP_THREADS);

public TaskRunner() {
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
@@ -403,107 +411,167 @@ public void run() {
}
}

private boolean merge(List<File> files, File outputFile) {
try (BufferedWriter writer = createWriter(outputFile, compressionEnabled)) {
Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
ExternalSort.mergeSortedFiles(files,
writer,
comparator,
charset,
true, //distinct
compressionEnabled, //useZip
func2,
func1
);
} catch (IOException e) {
log.error("Merge failed with IOException", e);
return false;
}
return true;
}

/**
* Class responsible for -
* <ol>
* <li>Watching {@link #taskQueue} for new tasks</li>
* <li>Submitting those tasks to an {@link ExecutorService}</li>
* <li>Collecting the results (sorted files) created by those tasks into one place</li>
* </ol>
*/
private class MergeTask implements Callable<File> {
private final Phaser mergeTaskPhaser;
private final List<File> mergeTarget;
private final File mergedFile;
private final int failureThreshold = 5;

MergeTask(List<File> mergeTarget, Phaser mergeTaskPhaser, File mergedFile) {
this.mergeTarget = mergeTarget;
this.mergeTaskPhaser = mergeTaskPhaser;
this.mergedFile = mergedFile;
mergeTaskPhaser.register();
}

@Override
public File call() {
log.info("performing merge for {} with size {}", mergedFile.getName(), mergeTarget.size());
try {
for (int mergeFailureCount = 0; mergeFailureCount <= failureThreshold; mergeFailureCount++) {
if (merge(mergeTarget, mergedFile)) {
log.info("merge complete for {}", mergedFile.getName());
return mergedFile;
}
}
log.error("merge failed for {}", mergedFile.getName());
} finally {
mergeTaskPhaser.arriveAndDeregister();
}

return mergedFile;
}
}

/**
* Class responsible for -
* <ol>
* <li>Watching {@link #sortedFiles} for new sorted files</li>
* <li>Submitting those files in batch to an {@link ExecutorService}</li>
* <li>Collecting the results (sorted files) created by those tasks</li>
* <li>Merge the result with any left over files to create a single sorted file</li>
* </ol>
* Strategy -
* <ol>
* <li>Wait for n files (compare with merged list)</li>
* <li>construct new list of files to be merged by checking if its already merged</li>
* and create intermediate merge file
* (if final merge) merge all intermediate merge files and create sorted file
* <li>add all merged files to merged list</li>
* </ol>
*/
private class MergeRunner implements Runnable {
private final ArrayList<File> intermediateMergedFiles = new ArrayList<File>();
private final ArrayList<File> mergedFiles = new ArrayList<File>();
private final File sortedFile;
private final int failureThreshold = 100;
private final int batchMergeSize = 64;
private int nextMergedLength = 0;
private final ExecutorService executorService;
private final int threadPoolSize = Integer.getInteger(PROP_MERGE_THREAD_POOL_SIZE, DEFAULT_NUMBER_OF_MERGE_TASK_THREADS);
private final int batchMergeSize = Integer.getInteger(PROP_MERGE_TASK_BATCH_SIZE, DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK);


public MergeRunner(File sortedFile) throws IOException {
public MergeRunner(File sortedFile) {
this.sortedFile = sortedFile;
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
}

private boolean merge(ArrayList<File> files, File outputFile) {
try (BufferedWriter writer = createWriter(outputFile, compressionEnabled)) {
Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
ExternalSort.mergeSortedFiles(files,
writer,
comparator,
charset,
true, //distinct
compressionEnabled, //useZip
func2,
func1
);
} catch (IOException e) {
log.error("Merge failed with IOException", e);
return false;
}
return true;
}

private ArrayList<File> getUnmergedFiles(int size) {
private List<File> getSmallestUnmergedFiles(int size) {
ArrayList<File> unmergedFiles = new ArrayList<File>();
for (File f : sortedFiles) {
if (!mergedFiles.contains(f) && f != MERGE_POISON_PILL) {
unmergedFiles.add(f);
}
if (unmergedFiles.size() == size) {
break;
}
}
return unmergedFiles;
unmergedFiles.sort(new SizeFileComparator());

return unmergedFiles.subList(0, (size - 1) > unmergedFiles.size() ? unmergedFiles.size() : (size - 1));
}

@Override
public void run() {
// 1. wait for n files (compare with merged list)
// 2. construct new list of files to be merged by checking if its already merged
// and create intermediate merge file
// (if final merge) merge all intermediate merge files and create sorted file
// 3. add all merged files to merged list
int mergeFailureCount = 0;
nextMergedLength += batchMergeSize;
Phaser mergeTaskPhaser = new Phaser(1);
List<Future<File>> results = new ArrayList<>();

while (true) {
if (mergeFailureCount >= failureThreshold) {
log.error("give up merging due to failure occurs more than %s times", failureThreshold);
break;
while (!sortedFiles.contains(MERGE_POISON_PILL) && sortedFiles.size() <= nextMergedLength) {
// waiting for n files to be merged in a batch
}

boolean isFinal = false;
File mergedFile = new File(mergeDir, String.format("intermediate-%s", nextMergedLength));
while (sortedFiles.size() <= nextMergedLength && !sortedFiles.contains(MERGE_POISON_PILL)) {}

ArrayList<File> mergeTarget = new ArrayList<File>();
if (sortedFiles.contains(MERGE_POISON_PILL)) {
isFinal = true;
mergedFile = sortedFile;
mergeTarget.addAll(intermediateMergedFiles);
break;
}
ArrayList<File> unmergedFiles = getUnmergedFiles(isFinal ? Integer.MAX_VALUE : batchMergeSize);
mergeTarget.addAll(unmergedFiles);

log.info("Performing merge for {} with size {}", mergedFile.getName(), mergeTarget.size());
List<File> mergeTarget = getSmallestUnmergedFiles(batchMergeSize);
Callable<File> mergeTask = new MergeTask(mergeTarget, mergeTaskPhaser,
new File(mergeDir, String.format("intermediate-%s", nextMergedLength)));
results.add(executorService.submit(mergeTask));
nextMergedLength += batchMergeSize;
log.info("next merge length is {}", nextMergedLength);
mergedFiles.addAll(mergeTarget);
}

// for oak-indexing-benchmark to get result only
if (isFinal) {
log.info("Proceeding to perform merge of {} sorted files", sortedFiles.size());
}
if (merge(mergeTarget, mergedFile)) {
mergedFiles.addAll(unmergedFiles);
nextMergedLength += unmergedFiles.size();
intermediateMergedFiles.add(mergedFile);
if (isFinal) {
break;
// final merge
log.info("Waiting for batch sorting tasks completion");
mergeTaskPhaser.arriveAndAwaitAdvance();
ArrayList<File> mergeTarget = new ArrayList<File>();
try {
boolean exceptionsCaught = false;
for (Future<File> result : results) {
try {
mergeTarget.add(result.get());
} catch (Throwable e) {
throwables.add(e);
exceptionsCaught = true;
}
}
log.debug("Completed merge result collection {}. Arriving at phaser now.", exceptionsCaught ? "partially" : "fully");
} finally {
mergeTaskPhaser.arrive();
}
mergeTarget.addAll(getSmallestUnmergedFiles(Integer.MAX_VALUE));

log.info("All batch sorting tasks have completed, total of {}", mergeTarget.size());
log.info("Proceeding to perform merge of {} sorted files", mergeTarget.size());
int finalMergeFailureThreshold = 5;
for (int finalMergeFailureCount = 0; finalMergeFailureCount <= finalMergeFailureThreshold; finalMergeFailureCount++) {
if (!merge(mergeTarget, sortedFile)) {
log.error("merge failed for {}", sortedFile.getName());
} else {
log.error("Merge failed for {}", mergedFile.getName());
mergeFailureCount += 1;
log.info("merge complete for {}", sortedFile.getName());
break;
}
}

log.info("intermediateMergedFiles size: {}", intermediateMergedFiles.size());
log.info("sortedFiles size: {}", sortedFiles.size());
log.warn("total merge failure {}", mergeFailureCount);
// MERGE_POISON_PILL does not count
log.info("Original sorted file length {}", sortedFiles.size()-1);

mergePhaser.arriveAndDeregister();
executorService.shutdown();
}
}

@@ -20,7 +20,6 @@
package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import com.google.common.base.Stopwatch;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverser;

0 comments on commit 2b82c89

Please sign in to comment.