Skip to content
Permalink
Browse files
address review comment
  • Loading branch information
Ewocker committed Mar 18, 2022
1 parent 0dc296d commit 5b06983cf7378e0341832d2c1b428bea7a6c4768
Showing 3 changed files with 61 additions and 49 deletions.
@@ -93,9 +93,9 @@ public class MergeRunner implements Runnable {
private final boolean compressionEnabled;
private final ArrayList<File> mergedFiles = Lists.newArrayList();
private final ArrayList<File> unmergedFiles = Lists.newArrayList();
private final ExecutorService executorService;
private final int threadPoolSize = Integer.getInteger(PROP_MERGE_THREAD_POOL_SIZE, DEFAULT_NUMBER_OF_MERGE_TASK_THREADS);
private final int batchMergeSize = Integer.getInteger(PROP_MERGE_TASK_BATCH_SIZE, DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK);
private ExecutorService executorService;
private final int threadPoolSize;
private final int batchMergeSize;
private final Comparator fileSizeComparator = new SizeFileComparator();

/**
@@ -134,16 +134,16 @@ public class MergeRunner implements Runnable {
* @param compressionEnabled if true, the created files would be compressed
*/
MergeRunner(File sortedFile, BlockingQueue<File> sortedFiles, File mergeDir, Comparator comparator,
Phaser phaser, boolean compressionEnabled) throws IOException {
Phaser phaser, int batchMergeSize, int threadPoolSize, boolean compressionEnabled) {
this.mergeDir = mergeDir;
FileUtils.forceMkdir(mergeDir);
this.compressionEnabled = compressionEnabled;
this.sortedFiles = sortedFiles;
this.sortedFile = sortedFile;
this.throwables = new ConcurrentLinkedQueue<>();
this.comparator = comparator;
this.phaser = phaser;
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
this.batchMergeSize = batchMergeSize;
this.threadPoolSize = threadPoolSize;
}

private boolean merge(List<File> files, File outputFile) {
@@ -207,6 +207,12 @@ private void markAsMerged(List<File> target) {

@Override
public void run() {
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
try {
FileUtils.forceMkdir(mergeDir);
} catch (IOException e) {
log.error("failed to create merged directory {}", mergeDir.getAbsolutePath());
}
Phaser mergeTaskPhaser = new Phaser(1);
List<Future<File>> results = Lists.newArrayList();
int count = 0;
@@ -47,8 +47,8 @@
import java.util.concurrent.Phaser;
import java.util.stream.Stream;

import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.DEFAULT_NUMBER_OF_DATA_DUMP_THREADS;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_THREAD_POOL_SIZE;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.*;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.DEFAULT_NUMBER_OF_MERGE_TASK_THREADS;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.getSortedStoreFileName;
import static org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser.TraversingRange;

@@ -237,7 +237,6 @@ public int getValue() {
boolean compressionEnabled, MemoryManager memoryManager, long dumpThreshold) throws IOException {
this.storeDir = storeDir;
this.mergeDir = new File(storeDir, mergeDirName);
FileUtils.forceMkdir(mergeDir);
this.compressionEnabled = compressionEnabled;
this.sortedFiles = new LinkedBlockingQueue<>();
this.throwables = new ConcurrentLinkedQueue<>();
@@ -327,7 +326,9 @@ public File createSortedStoreFile() throws IOException, CompositeException {
watcher.setDaemon(true);
watcher.start();
File sortedFile = new File(storeDir, getSortedStoreFileName(compressionEnabled));
Runnable mergeRunner = new MergeRunner(sortedFile, sortedFiles, storeDir, comparator, mergePhaser, compressionEnabled);
int threadPoolSize = Integer.getInteger(PROP_MERGE_THREAD_POOL_SIZE, DEFAULT_NUMBER_OF_MERGE_TASK_THREADS);
int batchMergeSize = Integer.getInteger(PROP_MERGE_TASK_BATCH_SIZE, DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK);
Runnable mergeRunner = new MergeRunner(sortedFile, sortedFiles, mergeDir, comparator, mergePhaser, batchMergeSize, threadPoolSize, compressionEnabled);
Thread merger = new Thread(mergeRunner, mergerThreadName);
merger.setDaemon(true);
merger.start();
@@ -1,6 +1,8 @@
package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import com.google.common.collect.Lists;
import jnr.ffi.annotations.In;
import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -11,11 +13,15 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.assertEquals;
@@ -30,6 +36,11 @@ public class MergeRunnerTest {
private final List<File> testFiles = Lists.newArrayList();
private String expectedSortedFileStr = "",
actualSortedFileStr = "";
private final int threadPoolSize = 1,
batchMergeSize = 3;
private final PathElementComparator pathComparator = new PathElementComparator();
private final Comparator<NodeStateHolder> comparator = (e1, e2) -> pathComparator.compare(e1.getPathElements(), e2.getPathElements());
private final NodeStateEntryWriter entryWriter = new NodeStateEntryWriter(new MemoryBlobStore());

@Before
public void setup(){
@@ -44,23 +55,20 @@ public void after() {
@Test
public void test() throws IOException {
lc.starting();
System.setProperty(FlatFileNodeStoreBuilder.PROP_MERGE_TASK_BATCH_SIZE, "3");
System.setProperty(FlatFileNodeStoreBuilder.PROP_MERGE_THREAD_POOL_SIZE, "1");

File tmpDir = new File(FileUtils.getTempDirectory(), Long.toString(System.nanoTime())),
mergeDir = new File(tmpDir, "merge"),
sortedFile = new File(tmpDir, "sorted-file.json");
List<String> expectedLogOutput = Lists.newArrayList(),
actualLogOutput = Lists.newArrayList();

generateTestFiles(tmpDir);
assertEquals("expected 13 generated test files", 13, testFiles.size());
int testFileCount = 13;
generateTestFiles(tmpDir, testFileCount);
assertEquals("expected generated test files number does not match", testFileCount, testFiles.size());

PathElementComparator pathComparator = new PathElementComparator();
Comparator<NodeStateHolder> comparator = (e1, e2) -> pathComparator.compare(e1.getPathElements(), e2.getPathElements());
BlockingQueue<File> sortedFiles = new LinkedBlockingQueue<>();
Phaser mergePhaser = new Phaser(1);
Runnable mergeRunner = new MergeRunner(sortedFile, sortedFiles, mergeDir, comparator, mergePhaser, false);
Runnable mergeRunner = new MergeRunner(sortedFile, sortedFiles, mergeDir, comparator, mergePhaser, batchMergeSize, threadPoolSize, false);
Thread merger = new Thread(mergeRunner, "test-merger");
merger.setDaemon(true);

@@ -76,13 +84,10 @@ public void test() throws IOException {
sortedFiles.add(MergeRunner.MERGE_POISON_PILL);
mergePhaser.awaitAdvance(0);

actualLogOutput = lc.getLogs();
actualSortedFileStr = FileUtils.readFileToString(sortedFile, UTF_8);
String text = String.join(newline, lc.getLogs());
System.out.println(text);

assertEquals("sorted-file content should be expected", expectedSortedFileStr, actualSortedFileStr);

actualLogOutput = lc.getLogs();
expectedLogOutput.add("created merge task for intermediate-1 with " + (new ArrayList<File>(){
{
add(testFiles.get(0));
@@ -142,35 +147,35 @@ public void test() throws IOException {
assertEquals("final merge log output should be expected", String.join(newline, expectedLogOutput), String.join(newline, actualLogOutput.subList(7, 12)));
}

private void generateTestFiles(File tmpDir) throws IOException {
int nextFileSize = 1;
File testFile = new File(tmpDir, Integer.toString(nextFileSize));
List<String> lineBuffer = Lists.newArrayList();
List<String> resultList = Lists.newArrayList();
for (int i = 0; i <= 90; i++) {
String line = String.format("/%05d|{}", i);
lineBuffer.add(line);
resultList.add(line);
if (lineBuffer.size() == nextFileSize) {
String text = String.join(newline, lineBuffer);
FileUtils.writeStringToFile(testFile, text, UTF_8);
testFiles.add(testFile);
nextFileSize += 1;
testFile = new File(tmpDir, Integer.toString(nextFileSize));
lineBuffer.clear();
// The function will generate <fileCount> files where the filename equals number of lines in the file.
private void generateTestFiles(File tmpDir, int fileCount) throws IOException {
LinkedList<NodeStateHolder> resultNodeState = new LinkedList<>();
List<Integer> input = IntStream.rangeClosed(1, (1+fileCount)*fileCount/2).boxed().collect(Collectors.toList());
Collections.shuffle(input);
for (int fname = 1; fname <= fileCount; fname++) {
LinkedList<NodeStateHolder> tmpNodeState = new LinkedList<>();
for (int line = 0; line < fname; line++) {
NodeStateHolder nodeState = new StateInBytesHolder(String.format("/%08d", input.remove(0)), "{}");
tmpNodeState.add(nodeState);
}
tmpNodeState.sort(comparator);
resultNodeState.addAll(tmpNodeState);
String testFileContent = "";
while (!tmpNodeState.isEmpty()) {
NodeStateHolder h = tmpNodeState.removeFirst();
String text = entryWriter.toString(h.getPathElements(), h.getLine());
testFileContent += text + newline;
}
File testFile = new File(tmpDir, Integer.toString(fname));
FileUtils.writeStringToFile(testFile, testFileContent, UTF_8);
testFiles.add(testFile);
}
expectedSortedFileStr = String.join(newline, resultList) + newline;
}

// private static List<File> getResourceFolderFiles (String folder) {
// ClassLoader loader = Thread.currentThread().getContextClassLoader();
// URL url = loader.getResource(folder);
// String path = url.getPath();
// return Arrays.asList(new File(path).listFiles());
// }


// String log = runIndexingTest(IndexFieldProviderImpl.class, true);
// assertEquals("[" + "Added augmented fields: jcr:content/metadata/predictedTags/[my, a, my:a], 10.0" + "]", log);
resultNodeState.sort(comparator);
while (!resultNodeState.isEmpty()) {
NodeStateHolder h = resultNodeState.removeFirst();
String text = entryWriter.toString(h.getPathElements(), h.getLine());
expectedSortedFileStr += text + newline;
}
}
}

0 comments on commit 5b06983

Please sign in to comment.