Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-10423 - Improve logging of metrics in the indexing job #1093

Merged
merged 12 commits into from Sep 8, 2023
@@ -0,0 +1,19 @@
package org.apache.jackrabbit.oak.plugins.index;

import org.apache.jackrabbit.guava.common.base.Stopwatch;

import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;

public class FormatingUtils {
public static String formatToSeconds(Stopwatch stopwatch) {
LocalTime seconds = LocalTime.ofSecondOfDay(stopwatch.elapsed(TimeUnit.SECONDS));
return DateTimeFormatter.ISO_TIME.format(seconds);
}

public static String formatToMillis(Stopwatch stopwatch) {
LocalTime nanoSeconds = LocalTime.ofNanoOfDay(stopwatch.elapsed(TimeUnit.MILLISECONDS)*1000000);
return DateTimeFormatter.ISO_TIME.format(nanoSeconds);
}
}
@@ -0,0 +1,48 @@
package org.apache.jackrabbit.oak.plugins.index;

import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.oak.commons.json.JsopBuilder;

public class MetricsFormatter {
private final JsopBuilder jsopBuilder = new JsopBuilder();
private boolean isWritable = true;
public static MetricsFormatter newBuilder() {
return new MetricsFormatter();
}

private MetricsFormatter() {
jsopBuilder.object();
}

public MetricsFormatter add(String key, String value) {
Preconditions.checkState(isWritable, "Formatter already built, in read-only mode");
jsopBuilder.key(key).value(value);
return this;
}

public MetricsFormatter add(String key, int value) {
Preconditions.checkState(isWritable, "Formatter already built, in read-only mode");
jsopBuilder.key(key).value(value);
return this;
}

public MetricsFormatter add(String key, long value) {
Preconditions.checkState(isWritable, "Formatter already built, in read-only mode");
jsopBuilder.key(key).value(value);
return this;
}

public MetricsFormatter add(String key, boolean value) {
Preconditions.checkState(isWritable, "Formatter already built, in read-only mode");
jsopBuilder.key(key).value(value);
return this;
}

public String build() {
if (isWritable){
jsopBuilder.endObject();
isWritable = false;
}
return jsopBuilder.toString();
}
}
Expand Up @@ -26,17 +26,21 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.collect.ArrayListMultimap;
import org.apache.jackrabbit.guava.common.collect.ListMultimap;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.index.FormatingUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdate;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
import org.apache.jackrabbit.oak.plugins.index.IndexUtils;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.importer.AsyncIndexerLock.LockToken;
import org.apache.jackrabbit.oak.plugins.index.upgrade.IndexDisabler;
import org.apache.jackrabbit.oak.spi.commit.EditorDiff;
Expand All @@ -60,8 +64,8 @@ public class IndexImporter {
*/
static final String ASYNC_LANE_SYNC = "sync";
/*
* System property name for flag for preserve checkpoint. If this is set to true, then checkpoint cleanup will be skipped.
* Default is set to false.
* System property name for flag for preserve checkpoint. If this is set to true, then checkpoint cleanup will be skipped.
* Default is set to false.
*/
public static final String OAK_INDEX_IMPORTER_PRESERVE_CHECKPOINT = "oak.index.importer.preserveCheckpoint";

Expand Down Expand Up @@ -230,7 +234,7 @@ void importIndexData() throws CommitFailedException, IOException {

private void bringIndexUpToDate() throws CommitFailedException {
for (String laneName : asyncLaneToIndexMapping.keySet()) {
if (ASYNC_LANE_SYNC.equals(laneName)){
if (ASYNC_LANE_SYNC.equals(laneName)) {
continue; //TODO Handle sync indexes
}
bringAsyncIndexUpToDate(laneName, asyncLaneToIndexMapping.get(laneName));
Expand All @@ -247,7 +251,7 @@ private void bringAsyncIndexUpToDate(String laneName, List<IndexInfo> indexInfos
//TODO Support case where checkpoint got lost or complete reindexing is done

NodeState after = nodeStore.retrieve(checkpoint);
checkNotNull(after, "No state found for checkpoint [%s] for lane [%s]",checkpoint, laneName);
checkNotNull(after, "No state found for checkpoint [%s] for lane [%s]", checkpoint, laneName);
LOG.info("Proceeding to update imported indexes {} to checkpoint [{}] for lane [{}]",
indexInfos, checkpoint, laneName);

Expand Down Expand Up @@ -399,12 +403,11 @@ private static void copy(String propName, NodeState existing, NodeBuilder indexB
*
* @param indexPath path of index. Mostly used in reporting exception
* @param indexState nodeState for index at given path
*
* @return async lane name or null which would be the case for sync indexes
*/
static String getAsyncLaneName(String indexPath, NodeState indexState) {
PropertyState asyncPrevious = indexState.getProperty(AsyncLaneSwitcher.ASYNC_PREVIOUS);
if (asyncPrevious != null && !AsyncLaneSwitcher.isNone(asyncPrevious)){
if (asyncPrevious != null && !AsyncLaneSwitcher.isNone(asyncPrevious)) {
return IndexUtils.getAsyncLaneName(indexState, indexPath, asyncPrevious);
}
return IndexUtils.getAsyncLaneName(indexState, indexPath);
Expand All @@ -426,7 +429,7 @@ private void releaseCheckpoint() throws CommitFailedException {

private void incrementReIndexCount(NodeBuilder definition) {
long count = 0;
if(definition.hasProperty(REINDEX_COUNT)){
if (definition.hasProperty(REINDEX_COUNT)) {
count = definition.getProperty(REINDEX_COUNT).getValue(Type.LONG);
}
definition.setProperty(REINDEX_COUNT, count + 1);
Expand Down Expand Up @@ -463,10 +466,18 @@ interface IndexImporterStepExecutor {

void runWithRetry(int maxRetries, IndexImportState indexImportState, IndexImporterStepExecutor step) throws CommitFailedException, IOException {
int count = 1;
Stopwatch start = Stopwatch.createStarted();
while (count <= maxRetries) {
LOG.info("IndexImporterStepExecutor:{} ,count:{}", indexImportState, count);
LOG.info("IndexImporterStepExecutor:{}, count:{}", indexImportState, count);
LOG.info("[TASK:{}:START]", indexImportState.name());
try {
step.execute();
LOG.info("[TASK:{}:END] Metrics: {}", indexImportState.name(),
MetricsFormatter.newBuilder()
.add("duration", FormatingUtils.formatToSeconds(start))
.add("durationSeconds", start.elapsed(TimeUnit.SECONDS))
.build()
);
break;
} catch (CommitFailedException | IOException e) {
LOG.warn("IndexImporterStepExecutor:{} fail count: {}, retries left: {}", indexImportState, count, maxRetries - count, e);
Expand Down
@@ -0,0 +1,21 @@
package org.apache.jackrabbit.oak.plugins.index;

import org.junit.Test;

import static org.junit.Assert.*;

public class MetricsFormatterTest {

@Test
public void testAdd() {
MetricsFormatter metricsFormatter = MetricsFormatter.newBuilder();
metricsFormatter.add("key", "value");
metricsFormatter.add("key", 1);
metricsFormatter.add("key", 1L);
metricsFormatter.add("key", true);
String result = metricsFormatter.build();
assertEquals("{\"key\":\"value\",\"key\":1,\"key\":1,\"key\":true}", result);
assertThrows(IllegalStateException.class, () -> metricsFormatter.add("key", "value"));
assertEquals("{\"key\":\"value\",\"key\":1,\"key\":1,\"key\":true}", metricsFormatter.build());
}
}
Expand Up @@ -38,8 +38,10 @@
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.TraversingRange;
import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
import org.apache.jackrabbit.oak.plugins.index.FormatingUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.NodeTraversalCallback;
import org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
import org.apache.jackrabbit.oak.plugins.index.progress.MetricRateEstimator;
Expand Down Expand Up @@ -68,6 +70,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

Expand Down Expand Up @@ -141,7 +144,7 @@ public NodeStateEntryTraverser create(TraversingRange traversingRange) {
}

private List<FlatFileStore> buildFlatFileStoreList(NodeState checkpointedState, CompositeIndexer indexer, Predicate<String> pathPredicate, Set<String> preferredPathElements,
boolean splitFlatFile, Set<IndexDefinition> indexDefinitions) throws IOException {
boolean splitFlatFile, Set<IndexDefinition> indexDefinitions) throws IOException {
List<FlatFileStore> storeList = new ArrayList<>();

Stopwatch flatFileStoreWatch = Stopwatch.createStarted();
Expand Down Expand Up @@ -207,7 +210,6 @@ private List<FlatFileStore> buildFlatFileStoreList(NodeState checkpointedState,
}

/**
*
* @return an Instance of FlatFileStore, whose getFlatFileStorePath() method can be used to get the absolute path to this store.
* @throws IOException
* @throws CommitFailedException
Expand All @@ -221,7 +223,7 @@ public FlatFileStore buildFlatFileStore() throws IOException, CommitFailedExcept
}
Predicate<String> predicate = s -> indexDefinitions.stream().anyMatch(indexDef -> indexDef.getPathFilter().filter(s) != PathFilter.Result.EXCLUDE);
FlatFileStore flatFileStore = buildFlatFileStoreList(checkpointedState, null, predicate,
preferredPathElements, IndexerConfiguration.parallelIndexEnabled(), indexDefinitions).get(0);
preferredPathElements, IndexerConfiguration.parallelIndexEnabled(), indexDefinitions).get(0);
log.info("FlatFileStore built at {}. To use this flatFileStore in a reindex step, set System Property-{} with value {}",
flatFileStore.getFlatFileStorePath(), OAK_INDEXER_SORTED_FILE_PATH, flatFileStore.getFlatFileStorePath());
return flatFileStore;
Expand All @@ -245,14 +247,15 @@ public void reindex() throws CommitFailedException, IOException {
closer.register(indexer);

List<FlatFileStore> flatFileStores = buildFlatFileStoreList(checkpointedState, indexer,
indexer::shouldInclude, null, IndexerConfiguration.parallelIndexEnabled(), getIndexDefinitions());
indexer::shouldInclude, null, IndexerConfiguration.parallelIndexEnabled(), getIndexDefinitions());

progressReporter.reset();

progressReporter.reindexingTraversalStart("/");

preIndexOpertaions(indexer.getIndexers());
preIndexOperations(indexer.getIndexers());

log.info("[TASK:INDEXING:START] Starting indexing");
Stopwatch indexerWatch = Stopwatch.createStarted();

if (flatFileStores.size() > 1) {
Expand All @@ -268,14 +271,24 @@ public void reindex() throws CommitFailedException, IOException {
progressReporter.reindexingTraversalEnd();
progressReporter.logReport();
log.info("Completed the indexing in {}", indexerWatch);
log.info("[TASK:INDEXING:END] Metrics: {}", MetricsFormatter.newBuilder()
.add("duration", FormatingUtils.formatToSeconds(indexerWatch))
.add("durationSeconds", indexerWatch.elapsed(TimeUnit.SECONDS))
.build());

log.info("[TASK:MERGE_NODE_STORE:START] Starting merge node store");
Stopwatch mergeNodeStoreWatch = Stopwatch.createStarted();
copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
log.info("[TASK:MERGE_NODE_STORE:END] Metrics: {}", MetricsFormatter.newBuilder()
.add("duration", FormatingUtils.formatToSeconds(mergeNodeStoreWatch))
.add("durationSeconds", mergeNodeStoreWatch.elapsed(TimeUnit.SECONDS))
.build());

indexerSupport.postIndexWork(copyOnWriteStore);
}

private void indexParallel(List<FlatFileStore> storeList, CompositeIndexer indexer, IndexingProgressReporter progressReporter)
throws IOException {
throws IOException {
ExecutorService service = Executors.newFixedThreadPool(IndexerConfiguration.indexThreadPoolSize());
List<Future> futureList = new ArrayList<>();

Expand Down Expand Up @@ -396,7 +409,7 @@ protected CompositeIndexer prepareIndexers(NodeStore copyOnWriteStore, NodeBuild

protected abstract List<NodeStateIndexerProvider> createProviders() throws IOException;

protected abstract void preIndexOpertaions(List<NodeStateIndexer> indexers);
protected abstract void preIndexOperations(List<NodeStateIndexer> indexers);

//TODO OAK-7098 - Taken from IndexUpdate. Refactor to abstract out common logic like this
private void removeIndexState(NodeBuilder definition) {
Expand Down
Expand Up @@ -21,6 +21,8 @@
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.Compression;
import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.FormatingUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,8 +32,10 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCountBin;
Expand Down Expand Up @@ -70,6 +74,8 @@ public int getFilesMerged() {

private static final Logger LOG = LoggerFactory.getLogger(PipelinedMergeSortTask.class);

private static final String THREAD_NAME = "mongo-merge-sort-files";

private final File storeDir;
private final Comparator<NodeStateHolder> comparator;
private final Compression algorithm;
Expand All @@ -89,17 +95,28 @@ public PipelinedMergeSortTask(File storeDir,
@Override
public Result call() throws Exception {
String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("mongo-merge-sort-files");
Thread.currentThread().setName(THREAD_NAME);
try {
LOG.info("Starting merge sort thread");
LOG.info("[TASK:{}:START] Starting merge sort task", THREAD_NAME.toUpperCase(Locale.ROOT));
while (true) {
LOG.info("Waiting for next intermediate sorted file");
File sortedIntermediateFile = sortedFilesQueue.take();
if (sortedIntermediateFile == SENTINEL_SORTED_FILES_QUEUE) {
LOG.info("Going to sort {} files, total size {}", sortedFiles.size(), humanReadableByteCountBin(sizeOf(sortedFiles)));
long sortedFilesSizeBytes = sizeOf(sortedFiles);
LOG.info("Going to sort {} files, total size {}", sortedFiles.size(), humanReadableByteCountBin(sortedFilesSizeBytes));
Stopwatch w = Stopwatch.createStarted();
File flatFileStore = sortStoreFile(sortedFiles);
LOG.info("Terminating sort task. Merged {} files to create the FFS: {} of size {}",
sortedFiles.size(), flatFileStore.getAbsolutePath(), humanReadableByteCountBin(flatFileStore.length()));
LOG.info("Final merge completed in {}. Created file: {}", FormatingUtils.formatToSeconds(w), flatFileStore.getAbsolutePath());
long ffsSizeBytes = flatFileStore.length();
String metrics = MetricsFormatter.newBuilder()
.add("duration", FormatingUtils.formatToSeconds(w))
.add("durationSeconds", w.elapsed(TimeUnit.SECONDS))
.add("filesMerged", sortedFiles.size())
.add("ffsSizeBytes", ffsSizeBytes)
.add("ffsSize", humanReadableByteCountBin(ffsSizeBytes))
.build();

LOG.info("[TASK:{}:END] Metrics: {}", THREAD_NAME.toUpperCase(Locale.ROOT), metrics);
return new Result(flatFileStore, sortedFiles.size());
}
sortedFiles.add(sortedIntermediateFile);
Expand All @@ -111,15 +128,14 @@ sortedIntermediateFile, humanReadableByteCountBin(sortedIntermediateFile.length(
LOG.warn("Thread interrupted", t);
throw t;
} catch (Throwable t) {
LOG.warn("Thread terminating with exception.", t);
LOG.warn("Thread terminating with exception", t);
throw t;
} finally {
Thread.currentThread().setName(originalName);
}
}

private File sortStoreFile(List<File> sortedFilesBatch) throws IOException {
Stopwatch w = Stopwatch.createStarted();
File sortedFile = new File(storeDir, getSortedStoreFileName(algorithm));
try (BufferedWriter writer = createWriter(sortedFile, algorithm)) {
Function<String, NodeStateHolder> stringToType = (line) -> line == null ? null : new NodeStateHolder(line);
Expand All @@ -134,7 +150,6 @@ private File sortStoreFile(List<File> sortedFilesBatch) throws IOException {
stringToType
);
}
LOG.info("Merging of sorted files completed in {}", w);
return sortedFile;
}
}