Skip to content
Permalink
Browse files
a good state with uncompressed can be merged
  • Loading branch information
Ewocker committed Mar 4, 2022
1 parent ea1e15e commit 2c845ba788b9a88942b7f7fa0241a60355ebb085
Showing 5 changed files with 226 additions and 47 deletions.
@@ -20,6 +20,8 @@

import com.google.common.base.Stopwatch;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
import org.apache.jackrabbit.oak.index.indexer.document.CompositeException;
import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
@@ -29,11 +31,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.*;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.*;
@@ -151,6 +153,7 @@ public class MultithreadedTraverseWithSortStrategy implements SortStrategy {
private static final Logger log = LoggerFactory.getLogger(MultithreadedTraverseWithSortStrategy.class);
private final Charset charset = UTF_8;
private final boolean compressionEnabled;
private final NodeStateEntryWriter entryWriter;
/**
* Directory where sorted files will be created.
*/
@@ -234,6 +237,7 @@ public int getValue() {
this.throwables = new ConcurrentLinkedQueue<>();
this.comparator = (e1, e2) -> pathComparator.compare(e1.getPathElements(), e2.getPathElements());
this.preferred = pathComparator.getPreferred();
this.entryWriter = new NodeStateEntryWriter(blobStore);
taskQueue = new LinkedBlockingQueue<>();
phaser = new Phaser() {
@Override
@@ -335,21 +339,103 @@ public long getEntryCount() {
private File sortStoreFile() throws IOException {
log.info("Proceeding to perform merge of {} sorted files", sortedFiles.size());
Stopwatch w = Stopwatch.createStarted();
File sortedFile = new File(storeDir, getSortedStoreFileName(compressionEnabled));
System.out.println("!!!======================= A");
String sortedFileName = getSortedStoreFileName(compressionEnabled);
System.out.println("!!!======================= B");
File sortedFile = new File(storeDir, sortedFileName);
System.out.println("!!!======================= C");
File serializedSortedFile = new File(storeDir, String.format("serialized-%s", sortedFileName));

List<File> inputSortedFilesToMerge = new ArrayList<>(sortedFiles);
try(BufferedWriter writer = createWriter(sortedFile, compressionEnabled)) {
Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
ExternalSort.mergeSortedFiles(inputSortedFilesToMerge,
writer,
comparator,
charset,
true, //distinct
compressionEnabled, //useZip
func2,
func1
);
// inputSortedFilesToMerge.forEach(f -> {
// System.out.println(f.toString());
// try (BufferedReader br = new BufferedReader(new FileReader(f.getAbsolutePath()))) {
// String line;
// while ((line = br.readLine()) != null) {
// System.out.println(line);
// }
// } catch (FileNotFoundException e) {
// e.printStackTrace();
// } catch (IOException e) {
// e.printStackTrace();
// }
// });

//
// try(BufferedWriter writer = createWriter(sortedFile, compressionEnabled)) {
// Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
// Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
// ExternalSort.mergeSortedFiles(inputSortedFilesToMerge,
// writer,
// comparator,
// charset,
// true, //distinct
// compressionEnabled, //useZip
// func2,
// func1
// );
// }


// // ==========
// Path mergeDirPath = Paths.get(storeDir.getAbsolutePath(), "merge");
// Files.createDirectories(mergeDirPath);
//
// Path deserializeSortedPath = mergeDirPath.resolve("serialized-store-sorted.json");
// Path serializeSortedPath = mergeDirPath.resolve("store-sorted.json");

System.out.println("!!!======================= D");
List<String> commands = new ArrayList<String>();
Collections.addAll(commands, "/usr/bin/sort");
if (compressionEnabled) Collections.addAll(commands, "--compress-program", "gzip");
Collections.addAll(commands, "-T", storeDir.getAbsolutePath());
Collections.addAll(commands, "-S", "2G");
Collections.addAll(commands, "-k", "1");
Collections.addAll(commands, "-t", "|");
Collections.addAll(commands, "-o", serializedSortedFile.getAbsolutePath());
// PathUtils.concat(storeDir.getAbsolutePath(), "serialized-store-sorted.json"));
Collections.addAll(commands, "-m");
sortedFiles.forEach(f -> commands.add(f.getAbsolutePath()));
System.out.println("!!!======================= Running");
System.out.println(commands.toString());

System.out.println("!!!======================= E");
try {
ProcessBuilder pb = new ProcessBuilder(commands);
Process p = pb.start();
p.waitFor();
System.out.println("Script executed successfully");
} catch (Exception e) {
e.printStackTrace();
}

// read file line by line and deserialize https://www.baeldung.com/java-read-lines-large-file#commonsio
String UTF_8 = StandardCharsets.UTF_8.name();

LineIterator it = FileUtils.lineIterator(serializedSortedFile, UTF_8);
final String newLine = System.getProperty("line.separator");
System.out.println("!!!======================= F");
try {
while (it.hasNext()) {
String serializeLine = it.nextLine();

// deserialize
String deserializeLine = entryWriter.deserialize(serializeLine);

System.out.println(String.format("original: %s\ndeserial: %s\n", serializeLine, deserializeLine));

// write line by line
FileUtils.writeStringToFile(sortedFile, deserializeLine+newLine, UTF_8, true);
}
} catch (Exception e) {
System.out.println("something went wrong");
System.out.println(e);
} finally {
LineIterator.closeQuietly(it);
}

System.out.println("!!!======================= G");

log.info("Merging of sorted files completed in {}", w);
return sortedFile;
}
@@ -19,11 +19,15 @@

package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.base.Joiner;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
import org.apache.jackrabbit.oak.json.JsonSerializer;
@@ -36,6 +40,7 @@
public class NodeStateEntryWriter {
private static final String OAK_CHILD_ORDER = ":childOrder";
private static final String DELIMITER = "|";
private static final String SERIALIZE_DELIMITER = "]";
private final JsopBuilder jw = new JsopBuilder();
private final JsonSerializer serializer;
private final Joiner pathJoiner = Joiner.on('/');
@@ -70,14 +75,13 @@ public String toString(List<String> pathElements, String nodeStateAsJson) {
}

// To Format: <depth>/<prefer>path|{}
// /|{} => 0/1]|{}
// /content|{} => 1/1]content|{}
// /content/dam/test|{} => 3/1]content/1]dam/1]test|{}
// /content/dam/jcr:content|{} => 3/1]content/1]dam/0]jcr:content|{} # ex. jcr:content is preferred
public String toSerializedString(List<String> pathElements, String nodeStateAsJson, Set<String> preferred) {
// /|{} => 000/1]|{}
// /content|{} => 001/1]content|{}
// /content/dam/test|{} => 003/1]content/1]dam/1]test|{}
// /content/dam/jcr:content|{} => 003/1]content/1]dam/0]jcr:content|{} # ex. jcr:content is preferred
public String serialize(List<String> pathElements, String nodeStateAsJson, Set<String> preferred) {
int pathStringSize = pathElements.stream().mapToInt(String::length).sum();
String depth = String.valueOf(pathElements.size());
// 1]content/1]dam/1]test
String depth = String.format("%03d", pathElements.size());;
int serialLength = depth.length() + (2 * pathElements.size()) + pathElements.size() - 1;
int strSize = nodeStateAsJson.length() + pathStringSize + pathElements.size() + serialLength + 1;
StringBuilder sb = new StringBuilder(strSize);
@@ -89,16 +93,27 @@ public String toSerializedString(List<String> pathElements, String nodeStateAsJs
for (String element : pathElements) {
sb.append('/');
sb.append(preferred.contains(element) ? '0' : '1');
sb.append(']');
sb.append(SERIALIZE_DELIMITER);
sb.append(element);
}
}

pathJoiner.appendTo(sb, pathElements);
sb.append(DELIMITER).append(nodeStateAsJson);
return sb.toString();
}

public String deserialize(String content) {
String serializedStr = NodeStateEntryWriter.getPath(content);
String nodeStateAsJson = NodeStateEntryWriter.getNodeState(content);

PathUtils.elements(serializedStr);
List<String> list = new ArrayList<String>(Arrays.asList(serializedStr.split("/")));
list.remove(0);
List<String> pathElements = list.stream().map(
str -> str.split(SERIALIZE_DELIMITER)[1]).collect(Collectors.toList());
return toString(pathElements, nodeStateAsJson);
}

public String asJson(NodeState nodeState) {
jw.resetWriter();
jw.object();
@@ -123,6 +138,10 @@ public static String getPath(String entryLine){
return entryLine.substring(0, getDelimiterPosition(entryLine));
}

public static String getNodeState(String entryLine){
return entryLine.substring(getDelimiterPosition(entryLine) + 1);
}

public static String[] getParts(String line) {
int pos = getDelimiterPosition(line);
return new String[] {line.substring(0, pos), line.substring(pos + 1)};
@@ -30,9 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.*;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Phaser;
@@ -292,12 +290,28 @@ private void sortAndSaveBatch() throws IOException {
NodeStateHolder h = entryBatch.removeFirst();
//Here holder line only contains nodeState json
//String text = entryWriter.toString(h.getPathElements(), h.getLine());
String text = entryWriter.toSerializedString(h.getPathElements(), h.getLine(), preferred);
String text = entryWriter.serialize(h.getPathElements(), h.getLine(), preferred);
writer.write(text);
writer.newLine();
textSize += text.length() + 1;
}
}


System.out.println("!!! ================= ");
System.out.println(newtmpfile.getPath());
try (BufferedReader br = new BufferedReader(new FileReader(newtmpfile.getAbsolutePath()))) {
String line;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}


log.info("{} Sorted and stored batch of size {} (uncompressed {}) with {} entries in {}. Last entry id = {}", taskID,
humanReadableByteCount(newtmpfile.length()), humanReadableByteCount(textSize), size, w,
lastSavedNodeStateEntry.getId());
@@ -0,0 +1,78 @@
package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import java.util.*;

import static com.google.common.collect.ImmutableList.copyOf;
import static org.apache.jackrabbit.oak.commons.PathUtils.elements;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import static org.junit.Assert.assertEquals;

@RunWith(Parameterized.class)
public class NodeStateEntryWriterSerializeTest {
private BlobStore blobStore = new MemoryBlobStore();
private String inputPath;
private String propKey;
private String propVal;
private String preferredStr;
private String expectedSerialized;
private String expectedDeserialized;

public NodeStateEntryWriterSerializeTest(String inputPath, String propKey, String propVal, String preferredStr,
String expectedSerialized, String expectedDeserialized) {
this.inputPath = inputPath;
this.propKey = propKey;
this.propVal = propVal;
this.expectedSerialized = expectedSerialized;
this.preferredStr = preferredStr;
this.expectedDeserialized = expectedDeserialized;
}

@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
//inputPath propKey propVal preferredStr expectedSerialized expectedDeserialized
{ "/", "foo", "bar", "", "000/|{\"foo\":\"bar\"}", "/|{\"foo\":\"bar\"}" },
{ "/test", "foo", "bar", "", "001/1]test|{\"foo\":\"bar\"}", "/test|{\"foo\":\"bar\"}" },
{ "/dir/asset", "key", "value", "", "002/1]dir/1]asset|{\"key\":\"value\"}",
"/dir/asset|{\"key\":\"value\"}" },
{ "/content/dam/jcr:content", "foo", "bar", "jcr:content",
"003/1]content/1]dam/0]jcr:content|{\"foo\":\"bar\"}",
"/content/dam/jcr:content|{\"foo\":\"bar\"}" },
{ "/content/dam/jcr:content/test", "foo", "bar", "jcr:content,dam",
"004/1]content/0]dam/0]jcr:content/1]test|{\"foo\":\"bar\"}",
"/content/dam/jcr:content/test|{\"foo\":\"bar\"}" },
{ "/1/2/3/4/5/6/7/8/9/10/11/12", "12levels", "testcase", "jcr:content,dam",
"012/1]1/1]2/1]3/1]4/1]5/1]6/1]7/1]8/1]9/1]10/1]11/1]12|{\"12levels\":\"testcase\"}",
"/1/2/3/4/5/6/7/8/9/10/11/12|{\"12levels\":\"testcase\"}" },
});
}

@Test
public void test() {
NodeStateEntryWriter nw = new NodeStateEntryWriter(blobStore);
NodeBuilder b1 = EMPTY_NODE.builder();
b1.setProperty(propKey, propVal);

NodeStateEntry e1 = new NodeStateEntry.NodeStateEntryBuilder(b1.getNodeState(), inputPath).build();

String json = nw.asJson(e1.getNodeState());
List<String> pathElements = copyOf(elements(e1.getPath()));
Set<String> preferred = new HashSet<>(Arrays.asList(preferredStr.split(",")));

String serialized = nw.serialize(pathElements, json, preferred);
assertEquals(expectedSerialized, serialized);

String deserialized = nw.deserialize(serialized);
System.out.println(deserialized);
assertEquals(expectedDeserialized, deserialized);
}
}
@@ -42,7 +42,6 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

@RunWith(Parameterized.class)
public class NodeStateEntryWriterTest {
private BlobStore blobStore = new MemoryBlobStore();
private NodeBuilder builder = EMPTY_NODE.builder();
@@ -171,21 +170,4 @@ public void memUsage() {
assertTrue("Mem usage should increase with longer path", size2 > size1);
assertTrue("Mem usage should increase with bigger node state", size3 > size2);
}

@Test
public void toSerializedString() {
NodeStateEntryWriter nw = new NodeStateEntryWriter(blobStore);
NodeBuilder b1 = EMPTY_NODE.builder();
b1.setProperty("foo", "bar");

NodeStateEntry e1 = new NodeStateEntryBuilder(b1.getNodeState(), "/").build();

String json = nw.asJson(e1.getNodeState());
List<String> pathElements = copyOf(elements(e1.getPath()));
Set<String> preferred = new HashSet<String>();

String line = nw.toSerializedString(pathElements, json, preferred);

assertEquals("0/|{\"foo\":\"bar\"}", line );
}
}

0 comments on commit 2c845ba

Please sign in to comment.