Skip to content
Permalink
Browse files
add test case for MergeRunner
  • Loading branch information
Ewocker committed Mar 18, 2022
1 parent ce0bd76 commit 0dc296d80d019d10310ccde1245cc254aefa017d
Showing 4 changed files with 238 additions and 35 deletions.
@@ -156,6 +156,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-commons</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
@@ -1,5 +1,6 @@
package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.comparator.SizeFileComparator;
import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
@@ -90,8 +91,8 @@ public class MergeRunner implements Runnable {
private static final Logger log = LoggerFactory.getLogger(MergeRunner.class);
private final Charset charset = UTF_8;
private final boolean compressionEnabled;
private final ArrayList<File> mergedFiles = new ArrayList<File>();
private final ArrayList<File> unmergedFiles = new ArrayList<File>();
private final ArrayList<File> mergedFiles = Lists.newArrayList();
private final ArrayList<File> unmergedFiles = Lists.newArrayList();
private final ExecutorService executorService;
private final int threadPoolSize = Integer.getInteger(PROP_MERGE_THREAD_POOL_SIZE, DEFAULT_NUMBER_OF_MERGE_TASK_THREADS);
private final int batchMergeSize = Integer.getInteger(PROP_MERGE_TASK_BATCH_SIZE, DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK);
@@ -106,7 +107,6 @@ public class MergeRunner implements Runnable {
* Directory where intermediate merged files will be created.
*/
private final File mergeDir;
private final String mergeDirName = "merge";

/**
* Comparator used for comparing node states for creating sorted files.
@@ -130,12 +130,12 @@ public class MergeRunner implements Runnable {
* Constructor.
* @param sortedFiles thread safe list containing files to be merged.
* @param comparator comparator used to help with sorting of node state entries.
* @param baseStoreDir base directory where sorted files will be created.
* @param mergeDir directory where sorted files will be created.
* @param compressionEnabled if true, the created files would be compressed
*/
MergeRunner(File sortedFile, BlockingQueue<File> sortedFiles, File baseStoreDir, Comparator comparator,
MergeRunner(File sortedFile, BlockingQueue<File> sortedFiles, File mergeDir, Comparator comparator,
Phaser phaser, boolean compressionEnabled) throws IOException {
this.mergeDir = new File(baseStoreDir, mergeDirName);
this.mergeDir = mergeDir;
FileUtils.forceMkdir(mergeDir);
this.compressionEnabled = compressionEnabled;
this.sortedFiles = sortedFiles;
@@ -147,6 +147,7 @@ public class MergeRunner implements Runnable {
}

private boolean merge(List<File> files, File outputFile) {
log.debug("performing merge for {} with size {} {}", outputFile.getName(), files.size(), files);
try (BufferedWriter writer = createWriter(outputFile, compressionEnabled)) {
Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
@@ -163,12 +164,36 @@ private boolean merge(List<File> files, File outputFile) {
log.error("Merge failed with IOException", e);
return false;
}
log.debug("merge complete for {} with {}", outputFile.getName(), files);
return true;
}

private boolean finalMerge() {
List<File> mergeTarget = new ArrayList<>();
int count = 0;
while(!unmergedFiles.isEmpty()) {
count++;
mergeTarget.clear();
mergeTarget.addAll(getSmallestUnmergedFiles(batchMergeSize));
markAsMerged(mergeTarget);
File outputFile = new File(mergeDir, String.format("final-%s", count));
if (unmergedFiles.isEmpty()) {
outputFile = sortedFile;
}

log.info("running final batch merge task for {} with {}", outputFile.getName(), mergeTarget);
if (!merge(mergeTarget, outputFile)) {
return false;
} else if (outputFile.equals(sortedFile)) {
return true;
}
unmergedFiles.add(outputFile);
}
return false;
}

private List<File> getSmallestUnmergedFiles(int size) {
ArrayList<File> result = new ArrayList<File>(unmergedFiles);
ArrayList<File> result = new ArrayList<>(unmergedFiles);
result.remove(MERGE_POISON_PILL);
result.sort(fileSizeComparator);
int endIdx = size > result.size() ? result.size() : size;
@@ -183,47 +208,44 @@ private void markAsMerged(List<File> target) {
@Override
public void run() {
Phaser mergeTaskPhaser = new Phaser(1);
List<Future<File>> results = new ArrayList<>();
List<File> mergeTarget = new ArrayList<>();
List<Future<File>> results = Lists.newArrayList();
int count = 0;

while (true) {
try {
unmergedFiles.add(sortedFiles.take());
if (sortedFiles.contains(MERGE_POISON_PILL) || unmergedFiles.contains(MERGE_POISON_PILL)) {
File f = sortedFiles.take();
unmergedFiles.add(f);
log.debug("added sorted file {} to the unmerged list", f.getName());
if (f.equals(MERGE_POISON_PILL)) {
break;
}
// add another batchMergeSize so that we choose the smallest of a larger range
if (unmergedFiles.size() >= 2*batchMergeSize) {
count++;
mergeTarget.clear();
mergeTarget = getSmallestUnmergedFiles(batchMergeSize);
Callable<File> mergeTask = new Task(mergeTarget, mergeTaskPhaser,
new File(mergeDir, String.format("intermediate-%s", count)));
List<File> mergeTarget = getSmallestUnmergedFiles(batchMergeSize);
File intermediateMergeFile = new File(mergeDir, String.format("intermediate-%s", count));
Callable<File> mergeTask = new Task(mergeTarget, mergeTaskPhaser, intermediateMergeFile);
markAsMerged(mergeTarget);
results.add(executorService.submit(mergeTask));
log.info("created merge task for {} with {}", intermediateMergeFile.getName(), mergeTarget);
}
} catch (InterruptedException e) {
log.error("Failed while draining from sortedFiles", e);
}
}

log.info("Waiting for batch sorting tasks completion");
mergeTaskPhaser.arriveAndAwaitAdvance();
executorService.shutdown();

// final merge
mergeTarget.clear();
// final merge in batch 64
sortedFiles.drainTo(unmergedFiles);
unmergedFiles.remove(MERGE_POISON_PILL);
mergeTarget.addAll(unmergedFiles);
markAsMerged(mergeTarget);
log.info("There are still {} sorted files not merged yet", mergeTarget.size());
log.info("There are still {} sorted files not merged yet", unmergedFiles.size());
try {
boolean exceptionsCaught = false;
for (Future<File> result : results) {
try {
mergeTarget.add(result.get());
unmergedFiles.add(result.get());
} catch (Throwable e) {
throwables.add(e);
exceptionsCaught = true;
@@ -234,14 +256,8 @@ public void run() {
mergeTaskPhaser.arrive();
}

log.info("All batch sorting tasks have completed, total of {}", count);
log.info("Proceeding to perform merge of {} sorted files", mergeTarget.size());
if (!merge(mergeTarget, sortedFile)) {
log.error("merge failed for {}", sortedFile.getName());
} else {
log.info("merge complete for {}", sortedFile.getName());
}
log.info("Total batch sorted file merged is {}", mergedFiles.size());
finalMerge();
log.info("Total batch sorted files length is {}", mergedFiles.size());

phaser.arriveAndDeregister();
}
@@ -263,7 +279,6 @@ private class Task implements Callable<File> {

@Override
public File call() {
log.info("performing merge for {} with size {}", mergedFile.getName(), mergeTarget.size());
try {
if (merge(mergeTarget, mergedFile)) {
log.info("merge complete for {}", mergedFile.getName());
@@ -19,6 +19,7 @@
package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.index.indexer.document.CompositeException;
import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
@@ -33,7 +34,6 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@@ -149,7 +149,12 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {

private static final Logger log = LoggerFactory.getLogger(MultithreadedTraverseWithSortStrategy.class);
private final boolean compressionEnabled;
/**
/** File sortedFile = new File(storeDir, getSortedStoreFileName(compressionEnabled));
Runnable mergeRunner = new MergeRunner(sortedFile, sortedFiles, storeDir, comparator, mergePhaser, compressionEnabled);
Thread merger = new Thread(mergeRunner, mergerThreadName);
merger.setDaemon(true);
merger.start();
phaser.awaitAdvance(Phases.WAITING_FOR_TASK_SPLITS.value);
* Directory where sorted files will be created.
*/
private final File storeDir;
@@ -234,7 +239,7 @@ public int getValue() {
this.mergeDir = new File(storeDir, mergeDirName);
FileUtils.forceMkdir(mergeDir);
this.compressionEnabled = compressionEnabled;
this.sortedFiles = new LinkedBlockingQueue<File>();
this.sortedFiles = new LinkedBlockingQueue<>();
this.throwables = new ConcurrentLinkedQueue<>();
this.comparator = (e1, e2) -> pathComparator.compare(e1.getPathElements(), e2.getPathElements());
taskQueue = new LinkedBlockingQueue<>();
@@ -372,7 +377,7 @@ public TaskRunner() {
public void run() {
try {
log.info("Using a thread pool of size {}", threadPoolSize);
List<Future<List<File>>> results = new ArrayList<>();
List<Future<List<File>>> results = Lists.newArrayList();
while (true) {
Callable<List<File>> task = taskQueue.take();
if (task == POISON_PILL) {

0 comments on commit 0dc296d

Please sign in to comment.