Skip to content
Permalink
Browse files
keep only parallel merge code
  • Loading branch information
Ewocker committed Mar 4, 2022
1 parent b34e971 commit 2e726a84fac4348bd7fc8b8765d8fe0961cf2576
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 217 deletions.
@@ -608,7 +608,6 @@ public static <T> int mergeSortedFiles(List<File> files,
} catch (Exception e) {}
}
for (File f : files) {
System.out.printf("=====x deleting %s\n", f.getName());
f.delete();
}
}
@@ -160,7 +160,6 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {
* Comparator used for comparing node states for creating sorted files.
*/
private final Comparator<NodeStateHolder> comparator;
private final Set<String> preferred;
private final ConcurrentLinkedQueue<File> sortedFiles;
private final ConcurrentLinkedQueue<Throwable> throwables;
/**
@@ -238,7 +237,6 @@ public int getValue() {
this.sortedFiles = new ConcurrentLinkedQueue<>();
this.throwables = new ConcurrentLinkedQueue<>();
this.comparator = (e1, e2) -> pathComparator.compare(e1.getPathElements(), e2.getPathElements());
this.preferred = pathComparator == null ? new HashSet<>() : pathComparator.getPreferred();
this.entryWriter = new NodeStateEntryWriter(blobStore);
taskQueue = new LinkedBlockingQueue<>();
phaser = new Phaser() {
@@ -310,7 +308,7 @@ void createInitialTasks(NodeStateEntryTraverserFactory nodeStateEntryTraverserFa

void addTask(TraversingRange range, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory, BlobStore blobStore,
ConcurrentLinkedQueue<String> completedTasks) throws IOException {
taskQueue.add(new TraverseAndSortTask(range, comparator, preferred, blobStore, storeDir,
taskQueue.add(new TraverseAndSortTask(range, comparator, blobStore, storeDir,
compressionEnabled, completedTasks, taskQueue, phaser, nodeStateEntryTraverserFactory, memoryManager, dumpThreshold, sortedFiles));
}

@@ -19,15 +19,10 @@

package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.base.Joiner;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
import org.apache.jackrabbit.oak.json.JsonSerializer;
@@ -40,7 +35,6 @@
public class NodeStateEntryWriter {
private static final String OAK_CHILD_ORDER = ":childOrder";
private static final String DELIMITER = "|";
private static final String SLASH_REPLACEMENT = "\t";
private final JsopBuilder jw = new JsopBuilder();
private final JsonSerializer serializer;
private final Joiner pathJoiner = Joiner.on('/');
@@ -74,48 +68,6 @@ public String toString(List<String> pathElements, String nodeStateAsJson) {
return sb.toString();
}

// To Format: \t<prefer>path|{}
// /|{} => \t|{}
// /content|{} => \t1content|{}
// /content/dam/test|{} => \t1content\t1dam\t1test|{}
// /content/dam/jcr:content|{} => \t1content\t1dam\t0jcr:content|{} # ex. jcr:content is preferred
public String serialize(List<String> pathElements, String nodeStateAsJson, Set<String> preferred) {
int pathStringSize = pathElements.stream().mapToInt(String::length).sum();
int numOfSlashes = pathElements.size() == 0 ? pathElements.size() : 1;
int serialLength = pathElements.size() + 1; // <prefer>, and delimiter
int strSize = nodeStateAsJson.length() + pathStringSize + numOfSlashes + serialLength;
StringBuilder sb = new StringBuilder(strSize);

if (pathElements.size() == 0) {
sb.append(SLASH_REPLACEMENT);
} else {
for (String element : pathElements) {
sb.append(SLASH_REPLACEMENT);
sb.append(preferred.contains(element) ? '0' : '1');
sb.append(element);
}
}

sb.append(DELIMITER).append(nodeStateAsJson);
return sb.toString();
}

public String deserialize(String content) {
String serializedStr = NodeStateEntryWriter.getPath(content);
String nodeStateAsJson = NodeStateEntryWriter.getNodeState(content);

String subStr = serializedStr.substring(1);
List<String> pathElements = new ArrayList<String>();
if (subStr.length() != 0) {
List<String> list = new ArrayList<String>(
Arrays.asList(subStr.split(SLASH_REPLACEMENT)));
pathElements = list.stream()
.map(str -> str.substring(1))
.collect(Collectors.toList());
}
return toString(pathElements, nodeStateAsJson);
}

public String asJson(NodeState nodeState) {
jw.resetWriter();
jw.object();
@@ -140,10 +92,6 @@ public static String getPath(String entryLine){
return entryLine.substring(0, getDelimiterPosition(entryLine));
}

public static String getNodeState(String entryLine){
return entryLine.substring(getDelimiterPosition(entryLine) + 1);
}

public static String[] getParts(String line) {
int pos = getDelimiterPosition(line);
return new String[] {line.substring(0, pos), line.substring(pos + 1)};
@@ -37,10 +37,6 @@ public PathElementComparator(Iterable<String> preferredPathElements) {
this.preferred = ImmutableSet.copyOf(preferredPathElements);
}

public Set<String> getPreferred() {
return preferred;
}

@Override
public int compare(Iterable<String> p1, Iterable<String> p2) {
Iterator<String> i1 = p1.iterator();
@@ -30,8 +30,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.*;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Phaser;
@@ -62,7 +71,6 @@ class TraverseAndSortTask implements Callable<List<File>>, MemoryManagerClient {
private final File storeDir;
private final boolean compressionEnabled;
private final Comparator<NodeStateHolder> comparator;
private final Set<String> preferred;
private long entryCount;
private long memoryUsed;
private final File sortWorkDir;
@@ -100,7 +108,7 @@ class TraverseAndSortTask implements Callable<List<File>>, MemoryManagerClient {
private final long dumpThreshold;

TraverseAndSortTask(MongoDocumentTraverser.TraversingRange range, Comparator<NodeStateHolder> comparator,
Set<String> preferred, BlobStore blobStore, File storeDir, boolean compressionEnabled,
BlobStore blobStore, File storeDir, boolean compressionEnabled,
Queue<String> completedTasks, Queue<Callable<List<File>>> newTasksQueue,
Phaser phaser, NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory,
MemoryManager memoryManager, long dumpThreshold, ConcurrentLinkedQueue parentSortedFiles) throws IOException {
@@ -113,7 +121,6 @@ class TraverseAndSortTask implements Callable<List<File>>, MemoryManagerClient {
this.storeDir = storeDir;
this.compressionEnabled = compressionEnabled;
this.comparator = comparator;
this.preferred = preferred;
this.completedTasks = completedTasks;
this.newTasksQueue = newTasksQueue;
this.phaser = phaser;
@@ -245,7 +252,7 @@ void addEntry(NodeStateEntry e) throws IOException {
log.info("Splitting task {}. New Upper limit for this task {}. New task range - {} to {}", taskID, splitPoint, splitPoint, this.lastModifiedUpperBound);
newTasksQueue.add(new TraverseAndSortTask(new MongoDocumentTraverser.TraversingRange(
new LastModifiedRange(splitPoint, this.lastModifiedUpperBound), null),
comparator, preferred, blobStore, storeDir, compressionEnabled, completedTasks,
comparator, blobStore, storeDir, compressionEnabled, completedTasks,
newTasksQueue, phaser, nodeStateEntryTraverserFactory, memoryManager, dumpThreshold, parentSortedFiles));
this.lastModifiedUpperBound = splitPoint;
DirectoryHelper.setLastModifiedUpperLimit(sortWorkDir, lastModifiedUpperBound);
@@ -19,15 +19,18 @@

package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import java.io.*;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.lang.management.MemoryNotificationInfo;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryUsage;
import java.nio.charset.Charset;
import java.util.*;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.IntStream;

import javax.management.Notification;
import javax.management.NotificationEmitter;
@@ -67,7 +70,6 @@ class TraverseWithSortStrategy implements SortStrategy {
private final boolean compressionEnabled;
private final Charset charset = UTF_8;
private final Comparator<NodeStateHolder> comparator;
private final Set<String> preferred;
private NotificationEmitter emitter;
private MemoryListener listener;
private final int maxMemory = Integer.getInteger(OAK_INDEXER_MAX_SORT_MEMORY_IN_GB, OAK_INDEXER_MAX_SORT_MEMORY_IN_GB_DEFAULT);
@@ -97,7 +99,6 @@ class TraverseWithSortStrategy implements SortStrategy {
this.storeDir = storeDir;
this.compressionEnabled = compressionEnabled;
this.comparator = (e1, e2) -> pathComparator.compare(e1.getPathElements(), e2.getPathElements());
this.preferred = pathComparator == null ? new HashSet<>() : pathComparator.getPreferred();
}

@Override
@@ -121,71 +122,19 @@ private File sortStoreFile() throws IOException {
log.info("Proceeding to perform merge of {} sorted files", sortedFiles.size());
Stopwatch w = Stopwatch.createStarted();
File sortedFile = new File(storeDir, getSortedStoreFileName(compressionEnabled));
File serializedSortedFile = new File(storeDir, String.format("serialized-%s", getSortedStoreFileName(false)));

// try(BufferedWriter writer = createWriter(sortedFile, compressionEnabled)) {
// Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
// Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
// ExternalSort.mergeSortedFiles(sortedFiles,
// writer,
// comparator,
// charset,
// true, //distinct
// compressionEnabled, //useZip
// func2,
// func1
// );
// }

List<String> commands = new ArrayList<String>();
Collections.addAll(commands, "/usr/bin/sort");
Collections.addAll(commands, "-T", storeDir.getAbsolutePath());
Collections.addAll(commands, "-S", "2G");
Collections.addAll(commands, "-o", serializedSortedFile.getAbsolutePath());
Collections.addAll(commands, "-t", "/");
// Max depth of 100 level
IntStream.range(1, 50).forEach(i -> Collections.addAll(commands, String.format("-k%s,%s", i, i)));
if (compressionEnabled) {
Collections.addAll(commands, "--compress-program", "gzip");
Collections.addAll(commands, "-m");
sortedFiles.forEach(f -> commands.add(String.format("<(gunzip -c %s)", f.getAbsolutePath())));
} else {
Collections.addAll(commands, "-m");
sortedFiles.forEach(f -> commands.add(f.getAbsolutePath()));
}

ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c", String.join(" ", commands));
log.info("Running merge command {}", pb.command());
Process p = pb.start();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
BufferedReader errReader = new BufferedReader(new InputStreamReader(p.getErrorStream()))) {
String line;
p.waitFor();
log.info("Merging of sorted files completed in {}", w);
while ((line = reader.readLine()) != null) log.info(line);
Boolean hasError = false;
while ((line = errReader.readLine()) != null) {
log.error(line);
hasError = true;
}
if (hasError) throw new Exception("command execution fail");
log.info("Sort command executed successfully");
} catch (Exception e) {
throw new RuntimeException(String.format("Error while running command %s", pb.command()));
}

Stopwatch wDeserialize = Stopwatch.createStarted();
try (BufferedReader reader = FlatFileStoreUtils.createReader(serializedSortedFile, false);
BufferedWriter writer = FlatFileStoreUtils.createWriter(sortedFile, compressionEnabled)) {
String line = reader.readLine();
while (line != null) {
String deserializeLine = entryWriter.deserialize(line);
writer.write(deserializeLine);
writer.newLine();
line = reader.readLine();
}
try(BufferedWriter writer = createWriter(sortedFile, compressionEnabled)) {
Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
ExternalSort.mergeSortedFiles(sortedFiles,
writer,
comparator,
charset,
true, //distinct
compressionEnabled, //useZip
func2,
func1
);
}
log.info("Deserialize of sorted file completed in {}", wDeserialize);
log.info("Merging of sorted files completed in {}", w);
return sortedFile;
}
@@ -241,8 +190,7 @@ private void sortAndSaveBatch() throws IOException {
try (BufferedWriter writer = FlatFileStoreUtils.createWriter(newtmpfile, compressionEnabled)) {
for (NodeStateHolder h : entryBatch) {
//Here holder line only contains nodeState json
//String text = entryWriter.toString(h.getPathElements(), h.getLine());
String text = entryWriter.serialize(h.getPathElements(), h.getLine(), preferred);
String text = entryWriter.toString(h.getPathElements(), h.getLine());
writer.write(text);
writer.newLine();
textSize += text.length() + 1;

This file was deleted.

0 comments on commit 2e726a8

Please sign in to comment.