Skip to content
Permalink
Browse files
fix unit test
  • Loading branch information
Ewocker committed Mar 4, 2022
1 parent a2a0154 commit 602cffc52ef485dcfe74988fba2107f4afd832c5
Showing 4 changed files with 102 additions and 43 deletions.
@@ -49,6 +49,7 @@
import java.util.concurrent.Phaser;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.google.common.base.Charsets.UTF_8;
@@ -237,7 +238,7 @@ public int getValue() {
this.sortedFiles = new ConcurrentLinkedQueue<>();
this.throwables = new ConcurrentLinkedQueue<>();
this.comparator = (e1, e2) -> pathComparator.compare(e1.getPathElements(), e2.getPathElements());
this.preferred = pathComparator.getPreferred();
this.preferred = pathComparator == null ? new HashSet<>() : pathComparator.getPreferred();
this.entryWriter = new NodeStateEntryWriter(blobStore);
taskQueue = new LinkedBlockingQueue<>();
phaser = new Phaser() {
@@ -340,8 +341,7 @@ public long getEntryCount() {
private File sortStoreFile() throws IOException {
log.info("Proceeding to perform merge of {} sorted files", sortedFiles.size());
Stopwatch w = Stopwatch.createStarted();
String sortedFileName = getSortedStoreFileName(compressionEnabled);
File sortedFile = new File(storeDir, sortedFileName);
File sortedFile = new File(storeDir, getSortedStoreFileName(compressionEnabled));
File serializedSortedFile = new File(storeDir, String.format("serialized-%s", getSortedStoreFileName(false)));

// List<File> inputSortedFilesToMerge = new ArrayList<>(sortedFiles);
@@ -363,9 +363,10 @@ private File sortStoreFile() throws IOException {
Collections.addAll(commands, "/usr/bin/sort");
Collections.addAll(commands, "-T", storeDir.getAbsolutePath());
Collections.addAll(commands, "-S", "2G");
Collections.addAll(commands, "-k", "1");
Collections.addAll(commands, "-t", "\\|");
Collections.addAll(commands, "-o", serializedSortedFile.getAbsolutePath());
Collections.addAll(commands, "-t", "/");
// Max depth of 100 level
IntStream.range(1, 50).forEach(i -> Collections.addAll(commands, String.format("-k%s,%s", i, i)));
if (compressionEnabled) {
Collections.addAll(commands, "--compress-program", "gzip");
Collections.addAll(commands, "-m");
@@ -374,24 +375,28 @@ private File sortStoreFile() throws IOException {
Collections.addAll(commands, "-m");
sortedFiles.forEach(f -> commands.add(f.getAbsolutePath()));
}
System.out.println(commands.toString());

ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c", String.join(" ", commands));
System.out.println(pb.command());
log.info("Running merge command {}", pb.command());
Process p = pb.start();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
BufferedReader errReader = new BufferedReader(new InputStreamReader(p.getErrorStream()))) {
String line;
p.waitFor();
log.info("Merging of sorted files completed in {}", w);
while ((line = reader.readLine()) != null) log.info(line);
String cmdError = "";
while ((line = errReader.readLine()) != null) cmdError += line;
if (cmdError.length() != 0) throw new Exception("command execution fail");
Boolean hasError = false;
while ((line = errReader.readLine()) != null) {
log.error(line);
hasError = true;
}
if (hasError) throw new Exception("command execution fail");
log.info("Sort command executed successfully");
} catch (Exception e) {
throw new RuntimeException(String.format("Error while running command %s", pb.command()));
}

Stopwatch wDeserialize = Stopwatch.createStarted();
try (BufferedReader reader = FlatFileStoreUtils.createReader(serializedSortedFile, false);
BufferedWriter writer = FlatFileStoreUtils.createWriter(sortedFile, compressionEnabled)) {
String line = reader.readLine();
@@ -402,8 +407,9 @@ private File sortStoreFile() throws IOException {
line = reader.readLine();
}
}
log.info("Deserialize of sorted file completed in {}", wDeserialize);


log.info("Merging of sorted files completed in {}", w);
return sortedFile;
}

@@ -74,15 +74,15 @@ public String toString(List<String> pathElements, String nodeStateAsJson) {
return sb.toString();
}

// To Format: /<prefer>path|{}
// /|{} => /|{}
// /content|{} => /1content|{}
// /content/dam/test|{} => /1content/1dam/1test|{}
// /content/dam/jcr:content|{} => /1content/1dam/0jcr:content|{} # ex. jcr:content is preferred
// To Format: /<prefer>path//|{}
// /|{} => ///|{}
// /content|{} => /1content//|{}
// /content/dam/test|{} => /1content/1dam/1test//|{}
// /content/dam/jcr:content|{} => /1content/1dam/0jcr:content//|{} # ex. jcr:content is preferred
public String serialize(List<String> pathElements, String nodeStateAsJson, Set<String> preferred) {
int pathStringSize = pathElements.stream().mapToInt(String::length).sum();
int numOfSlashes = pathElements.size() == 0 ? pathElements.size() : 1;
int serialLength = pathElements.size() + 1; // <prefer> and delimiter
int serialLength = pathElements.size() + 2 + 1; // <prefer>, //, and delimiter
int strSize = nodeStateAsJson.length() + pathStringSize + numOfSlashes + serialLength;
StringBuilder sb = new StringBuilder(strSize);

@@ -96,6 +96,7 @@ public String serialize(List<String> pathElements, String nodeStateAsJson, Set<S
}
}

sb.append("//");
sb.append(DELIMITER).append(nodeStateAsJson);
return sb.toString();
}
@@ -104,7 +105,7 @@ public String deserialize(String content) {
String serializedStr = NodeStateEntryWriter.getPath(content);
String nodeStateAsJson = NodeStateEntryWriter.getNodeState(content);

String subStr = serializedStr.substring(1);
String subStr = serializedStr.substring(1, serializedStr.length()-2);
List<String> pathElements = new ArrayList<String>();
if (subStr.length() != 0) {
List<String> list = new ArrayList<String>(
@@ -19,18 +19,15 @@

package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.*;
import java.lang.management.MemoryNotificationInfo;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryUsage;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.IntStream;

import javax.management.Notification;
import javax.management.NotificationEmitter;
@@ -70,6 +67,7 @@ class TraverseWithSortStrategy implements SortStrategy {
private final boolean compressionEnabled;
private final Charset charset = UTF_8;
private final Comparator<NodeStateHolder> comparator;
private final Set<String> preferred;
private NotificationEmitter emitter;
private MemoryListener listener;
private final int maxMemory = Integer.getInteger(OAK_INDEXER_MAX_SORT_MEMORY_IN_GB, OAK_INDEXER_MAX_SORT_MEMORY_IN_GB_DEFAULT);
@@ -99,6 +97,7 @@ class TraverseWithSortStrategy implements SortStrategy {
this.storeDir = storeDir;
this.compressionEnabled = compressionEnabled;
this.comparator = (e1, e2) -> pathComparator.compare(e1.getPathElements(), e2.getPathElements());
this.preferred = pathComparator == null ? new HashSet<>() : pathComparator.getPreferred();
}

@Override
@@ -122,19 +121,71 @@ private File sortStoreFile() throws IOException {
log.info("Proceeding to perform merge of {} sorted files", sortedFiles.size());
Stopwatch w = Stopwatch.createStarted();
File sortedFile = new File(storeDir, getSortedStoreFileName(compressionEnabled));
try(BufferedWriter writer = createWriter(sortedFile, compressionEnabled)) {
Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
ExternalSort.mergeSortedFiles(sortedFiles,
writer,
comparator,
charset,
true, //distinct
compressionEnabled, //useZip
func2,
func1
);
File serializedSortedFile = new File(storeDir, String.format("serialized-%s", getSortedStoreFileName(false)));

// try(BufferedWriter writer = createWriter(sortedFile, compressionEnabled)) {
// Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
// Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
// ExternalSort.mergeSortedFiles(sortedFiles,
// writer,
// comparator,
// charset,
// true, //distinct
// compressionEnabled, //useZip
// func2,
// func1
// );
// }

List<String> commands = new ArrayList<String>();
Collections.addAll(commands, "/usr/bin/sort");
Collections.addAll(commands, "-T", storeDir.getAbsolutePath());
Collections.addAll(commands, "-S", "2G");
Collections.addAll(commands, "-o", serializedSortedFile.getAbsolutePath());
Collections.addAll(commands, "-t", "/");
// Max depth of 100 level
IntStream.range(1, 50).forEach(i -> Collections.addAll(commands, String.format("-k%s,%s", i, i)));
if (compressionEnabled) {
Collections.addAll(commands, "--compress-program", "gzip");
Collections.addAll(commands, "-m");
sortedFiles.forEach(f -> commands.add(String.format("<(gunzip -c %s)", f.getAbsolutePath())));
} else {
Collections.addAll(commands, "-m");
sortedFiles.forEach(f -> commands.add(f.getAbsolutePath()));
}

ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c", String.join(" ", commands));
log.info("Running merge command {}", pb.command());
Process p = pb.start();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
BufferedReader errReader = new BufferedReader(new InputStreamReader(p.getErrorStream()))) {
String line;
p.waitFor();
log.info("Merging of sorted files completed in {}", w);
while ((line = reader.readLine()) != null) log.info(line);
Boolean hasError = false;
while ((line = errReader.readLine()) != null) {
log.error(line);
hasError = true;
}
if (hasError) throw new Exception("command execution fail");
log.info("Sort command executed successfully");
} catch (Exception e) {
throw new RuntimeException(String.format("Error while running command %s", pb.command()));
}

Stopwatch wDeserialize = Stopwatch.createStarted();
try (BufferedReader reader = FlatFileStoreUtils.createReader(serializedSortedFile, false);
BufferedWriter writer = FlatFileStoreUtils.createWriter(sortedFile, compressionEnabled)) {
String line = reader.readLine();
while (line != null) {
String deserializeLine = entryWriter.deserialize(line);
writer.write(deserializeLine);
writer.newLine();
line = reader.readLine();
}
}
log.info("Deserialize of sorted file completed in {}", wDeserialize);
log.info("Merging of sorted files completed in {}", w);
return sortedFile;
}
@@ -190,7 +241,8 @@ private void sortAndSaveBatch() throws IOException {
try (BufferedWriter writer = FlatFileStoreUtils.createWriter(newtmpfile, compressionEnabled)) {
for (NodeStateHolder h : entryBatch) {
//Here holder line only contains nodeState json
String text = entryWriter.toString(h.getPathElements(), h.getLine());
//String text = entryWriter.toString(h.getPathElements(), h.getLine());
String text = entryWriter.serialize(h.getPathElements(), h.getLine(), preferred);
writer.write(text);
writer.newLine();
textSize += text.length() + 1;
@@ -40,18 +40,18 @@ public NodeStateEntryWriterSerializeTest(String inputPath, String propKey, Strin
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
//inputPath propKey propVal preferredStr expectedSerialized expectedDeserialized
{ "/", "foo", "bar", "", "/|{\"foo\":\"bar\"}", "/|{\"foo\":\"bar\"}" },
{ "/test", "foo", "bar", "", "/1test|{\"foo\":\"bar\"}", "/test|{\"foo\":\"bar\"}" },
{ "/dir/asset", "key", "value", "", "/1dir/1asset|{\"key\":\"value\"}",
{ "/", "foo", "bar", "", "///|{\"foo\":\"bar\"}", "/|{\"foo\":\"bar\"}" },
{ "/test", "foo", "bar", "", "/1test//|{\"foo\":\"bar\"}", "/test|{\"foo\":\"bar\"}" },
{ "/dir/asset", "key", "value", "", "/1dir/1asset//|{\"key\":\"value\"}",
"/dir/asset|{\"key\":\"value\"}" },
{ "/content/dam/jcr:content", "foo", "bar", "jcr:content",
"/1content/1dam/0jcr:content|{\"foo\":\"bar\"}",
"/1content/1dam/0jcr:content//|{\"foo\":\"bar\"}",
"/content/dam/jcr:content|{\"foo\":\"bar\"}" },
{ "/content/dam/jcr:content/test", "foo", "bar", "jcr:content,dam",
"/1content/0dam/0jcr:content/1test|{\"foo\":\"bar\"}",
"/1content/0dam/0jcr:content/1test//|{\"foo\":\"bar\"}",
"/content/dam/jcr:content/test|{\"foo\":\"bar\"}" },
{ "/1/2/3/4/5/6/7/8/9/10/11/12", "12levels", "testcase", "jcr:content,dam",
"/11/12/13/14/15/16/17/18/19/110/111/112|{\"12levels\":\"testcase\"}",
"/11/12/13/14/15/16/17/18/19/110/111/112//|{\"12levels\":\"testcase\"}",
"/1/2/3/4/5/6/7/8/9/10/11/12|{\"12levels\":\"testcase\"}" },
});
}

0 comments on commit 602cffc

Please sign in to comment.