Skip to content
Permalink
Browse files
Merge pull request #564 from Ewocker/fix-path-predicate
OAK-9769: PathPredicate not being used properly when building FlatFileStore
  • Loading branch information
thomasmueller committed May 12, 2022
2 parents 5ac5b17 + 1883718 commit ad592290b790cd12579208956f4efff81e520874
Showing 6 changed files with 66 additions and 63 deletions.
@@ -105,24 +105,14 @@ private static class MongoNodeStateEntryTraverserFactory implements NodeStateEnt
private final MongoDocumentStore documentStore;
private final Logger traversalLogger;
private final CompositeIndexer indexer;
private final Predicate<String> pathPredicate;


private MongoNodeStateEntryTraverserFactory(RevisionVector rootRevision, DocumentNodeStore documentNodeStore,
MongoDocumentStore documentStore, Logger traversalLogger,
CompositeIndexer indexer) {
this(rootRevision, documentNodeStore, documentStore, traversalLogger, indexer, null);
}

private MongoNodeStateEntryTraverserFactory(RevisionVector rootRevision, DocumentNodeStore documentNodeStore,
MongoDocumentStore documentStore, Logger traversalLogger, CompositeIndexer indexer,
Predicate<String> pathPredicate) {
MongoDocumentStore documentStore, Logger traversalLogger, CompositeIndexer indexer) {
this.rootRevision = rootRevision;
this.documentNodeStore = documentNodeStore;
this.documentStore = documentStore;
this.traversalLogger = traversalLogger;
this.indexer = indexer;
this.pathPredicate = pathPredicate;
}

@Override
@@ -145,10 +135,6 @@ public NodeStateEntryTraverser create(MongoDocumentTraverser.TraversingRange tra
}
}

private FlatFileStore buildFlatFileStore(NodeState checkpointedState, CompositeIndexer indexer) throws IOException {
return buildFlatFileStore(checkpointedState, indexer, null, null);
}

private FlatFileStore buildFlatFileStore(NodeState checkpointedState, CompositeIndexer indexer, Predicate<String> pathPredicate, Set<String> preferredPathElements) throws IOException {

Stopwatch flatFileStoreWatch = Stopwatch.createStarted();
@@ -172,8 +158,9 @@ private FlatFileStore buildFlatFileStore(NodeState checkpointedState, CompositeI
.withBlobStore(indexHelper.getGCBlobStore())
.withPreferredPathElements((preferredPathElements != null) ? preferredPathElements : indexer.getRelativeIndexedNodeNames())
.addExistingDataDumpDir(indexerSupport.getExistingDataDumpDir())
.withPathPredicate(pathPredicate)
.withNodeStateEntryTraverserFactory(new MongoNodeStateEntryTraverserFactory(rootDocumentState.getRootRevision(),
nodeStore, getMongoDocumentStore(), traversalLog, indexer, pathPredicate));
nodeStore, getMongoDocumentStore(), traversalLog, indexer));
for (File dir : previousDownloadDirs) {
builder.addExistingDataDumpDir(dir);
}
@@ -251,7 +238,7 @@ public void reindex() throws CommitFailedException, IOException {

closer.register(indexer);

FlatFileStore flatFileStore = buildFlatFileStore(checkpointedState, indexer);
FlatFileStore flatFileStore = buildFlatFileStore(checkpointedState, indexer, indexer::shouldInclude, null);

progressReporter.reset();
if (flatFileStore.getEntryCount() > 0){
@@ -211,10 +211,10 @@ SortStrategy createSortStrategy(File dir) throws IOException {
switch (sortStrategyType) {
case STORE_AND_SORT:
log.info("Using StoreAndSortStrategy");
return new StoreAndSortStrategy(nodeStateEntryTraverserFactory, comparator, entryWriter, dir, useZip);
return new StoreAndSortStrategy(nodeStateEntryTraverserFactory, comparator, entryWriter, dir, useZip, pathPredicate);
case TRAVERSE_WITH_SORT:
log.info("Using TraverseWithSortStrategy");
return new TraverseWithSortStrategy(nodeStateEntryTraverserFactory, comparator, entryWriter, dir, useZip);
return new TraverseWithSortStrategy(nodeStateEntryTraverserFactory, comparator, entryWriter, dir, useZip, pathPredicate);
case MULTITHREADED_TRAVERSE_WITH_SORT:
log.info("Using MultithreadedTraverseWithSortStrategy");
return new MultithreadedTraverseWithSortStrategy(nodeStateEntryTraverserFactory, lastModifiedBreakPoints, comparator,
@@ -19,20 +19,22 @@

package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;

import com.google.common.base.Stopwatch;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverser;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser;
import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.util.function.Predicate;

import static com.google.common.base.StandardSystemProperty.LINE_SEPARATOR;
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_MAX_SORT_MEMORY_IN_GB;
@@ -53,15 +55,17 @@ class StoreAndSortStrategy implements SortStrategy {
private boolean deleteOriginal = Boolean.parseBoolean(System.getProperty(OAK_INDEXER_DELETE_ORIGINAL, "true"));
private int maxMemory = Integer.getInteger(OAK_INDEXER_MAX_SORT_MEMORY_IN_GB, OAK_INDEXER_MAX_SORT_MEMORY_IN_GB_DEFAULT);
private long textSize;
private Predicate<String> pathPredicate;


public StoreAndSortStrategy(NodeStateEntryTraverserFactory nodeStatesFactory, PathElementComparator comparator,
NodeStateEntryWriter entryWriter, File storeDir, boolean compressionEnabled) {
NodeStateEntryWriter entryWriter, File storeDir, boolean compressionEnabled, Predicate<String> pathPredicate) {
this.nodeStatesFactory = nodeStatesFactory;
this.comparator = comparator;
this.entryWriter = entryWriter;
this.storeDir = storeDir;
this.compressionEnabled = compressionEnabled;
this.pathPredicate = pathPredicate;
}

@Override
@@ -101,11 +105,14 @@ private File writeToStore(NodeStateEntryTraverser nodeStates, File dir, String f
Stopwatch sw = Stopwatch.createStarted();
try (BufferedWriter w = FlatFileStoreUtils.createWriter(file, compressionEnabled)) {
for (NodeStateEntry e : nodeStates) {
String line = entryWriter.toString(e);
w.append(line);
w.newLine();
textSize += line.length() + LINE_SEP_LENGTH;
entryCount++;
String path = e.getPath();
if (!NodeStateUtils.isHiddenPath(path) && pathPredicate.test(path)) {
String line = entryWriter.toString(e);
w.append(line);
w.newLine();
textSize += line.length() + LINE_SEP_LENGTH;
entryCount++;
}
}
}
String sizeStr = compressionEnabled ? String.format("compressed/%s actual size", humanReadableByteCount(textSize)) : "";
@@ -19,6 +19,22 @@

package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import com.google.common.base.Stopwatch;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverser;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser;
import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.management.Notification;
import javax.management.NotificationEmitter;
import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
@@ -31,22 +47,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import javax.management.Notification;
import javax.management.NotificationEmitter;
import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;

import com.google.common.base.Stopwatch;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
import org.apache.jackrabbit.oak.index.indexer.document.LastModifiedRange;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverser;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Predicate;

import static com.google.common.base.Charsets.UTF_8;
import static java.lang.management.ManagementFactory.getMemoryMXBean;
@@ -90,15 +91,17 @@ class TraverseWithSortStrategy implements SortStrategy {
private File sortWorkDir;
private List<File> sortedFiles = new ArrayList<>();
private ArrayList<NodeStateHolder> entryBatch = new ArrayList<>();
private Predicate<String> pathPredicate;


TraverseWithSortStrategy(NodeStateEntryTraverserFactory nodeStatesFactory, PathElementComparator pathComparator,
NodeStateEntryWriter entryWriter, File storeDir, boolean compressionEnabled) {
NodeStateEntryWriter entryWriter, File storeDir, boolean compressionEnabled, Predicate<String> pathPredicate) {
this.nodeStatesFactory = nodeStatesFactory;
this.entryWriter = entryWriter;
this.storeDir = storeDir;
this.compressionEnabled = compressionEnabled;
this.comparator = (e1, e2) -> pathComparator.compare(e1.getPathElements(), e2.getPathElements());
this.pathPredicate = pathPredicate;
}

@Override
@@ -164,13 +167,15 @@ private void addEntry(NodeStateEntry e) throws IOException {
reset();
}

String jsonText = entryWriter.asJson(e.getNodeState());
//Here logic differs from NodeStateEntrySorter in sense that
//Holder line consist only of json and not 'path|json'
NodeStateHolder h = new StateInBytesHolder(e.getPath(), jsonText);
entryBatch.add(h);
updateMemoryUsed(h);

String path = e.getPath();
if (!NodeStateUtils.isHiddenPath(path) && pathPredicate.test(path)) {
String jsonText = entryWriter.asJson(e.getNodeState());
//Here logic differs from NodeStateEntrySorter in sense that
//Holder line consist only of json and not 'path|json'
NodeStateHolder h = new StateInBytesHolder(path, jsonText);
entryBatch.add(h);
updateMemoryUsed(h);
}
}

private void reset() {
@@ -47,6 +47,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -73,6 +74,7 @@ public class FlatFileStoreTest {
public TemporaryFolder folder = new TemporaryFolder(new File(BUILD_TARGET_FOLDER));

private Set<String> preferred = singleton("jcr:content");
private Predicate<String> pathPredicate = path -> !path.equals("/remove");

private static final String EXCEPTION_MESSAGE = "Framed exception.";

@@ -82,6 +84,7 @@ private void runBasicTest() throws Exception {
FlatFileStore flatStore = spyBuilder.withBlobStore(new MemoryBlobStore())
.withPreferredPathElements(preferred)
.withLastModifiedBreakPoints(Collections.singletonList(0L))
.withPathPredicate(pathPredicate)
.withNodeStateEntryTraverserFactory(new NodeStateEntryTraverserFactory() {
@Override
public NodeStateEntryTraverser create(MongoDocumentTraverser.TraversingRange range) {
@@ -101,6 +104,7 @@ public NodeStateEntryTraverser create(MongoDocumentTraverser.TraversingRange ran
.collect(Collectors.toList());

List<String> sortedPaths = TestUtils.sortPaths(paths, preferred);
sortedPaths = TestUtils.extractPredicatePaths(sortedPaths, pathPredicate);

assertEquals(sortedPaths, entryPaths);
}
@@ -505,7 +509,7 @@ int getTotalProvidedDocCount() {
}

private List<String> createTestPaths() {
return asList("/a", "/b", "/c", "/a/b w", "/a/jcr:content", "/a/b", "/", "/b/l");
return asList("/a", "/b", "/c", "/a/b w", "/a/jcr:content", "/a/b", "/", "/b/l", "/remove");
}

static Iterable<NodeStateEntry> createEntriesFromMongoDocs(List<TestMongoDoc> mongoDocs) {
@@ -19,22 +19,18 @@

package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry.NodeStateEntryBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;

import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;

import static com.google.common.collect.ImmutableList.copyOf;
import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toList;
@@ -51,6 +47,10 @@ static List<String> sortPaths(List<String> paths, Set<String> preferredElements)
return sortPaths(paths, new PathElementComparator(preferredElements));
}

static List<String> extractPredicatePaths(List<String> paths, Predicate<String> pathPredicate) {
return paths.stream().filter(pathPredicate).collect(toList());
}

static List<String> sortPaths(List<String> paths, Comparator<Iterable<String>> comparator) {
List<Iterable<String>> copy = paths.stream()
.map(p -> copyOf(elements(p)))

0 comments on commit ad59229

Please sign in to comment.