Skip to content

Commit

Permalink
OAK-10423 - Improve logging of metrics in the indexing job (#1093)
Browse files Browse the repository at this point in the history
* Improve logging of metrics in the indexing job.

* Add integrity checks to the metrics formatter

* Clean up log messages

* Add logging of metrics to reindex, merge in node store and total job time.

* Improve logging

* Add missing license header.
Fix typo in class name.

* Simplify logic.

* Print elapsed time in hours:minutes instead of decimal minutes.

* Do not call .name() on reference that is potentially null.
  • Loading branch information
nfsantos authored and mbaedke committed Sep 19, 2023
1 parent 1f97769 commit ddda6d0
Show file tree
Hide file tree
Showing 14 changed files with 316 additions and 89 deletions.
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.index;

import org.apache.jackrabbit.guava.common.base.Stopwatch;

import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;

public class FormattingUtils {
public static String formatToSeconds(Stopwatch stopwatch) {
LocalTime seconds = LocalTime.ofSecondOfDay(stopwatch.elapsed(TimeUnit.SECONDS));
return DateTimeFormatter.ISO_TIME.format(seconds);
}

public static String formatToMillis(Stopwatch stopwatch) {
LocalTime nanoSeconds = LocalTime.ofNanoOfDay(stopwatch.elapsed(TimeUnit.MILLISECONDS)*1000000);
return DateTimeFormatter.ISO_TIME.format(nanoSeconds);
}
}
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.index;

import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.oak.commons.json.JsopBuilder;

public class MetricsFormatter {
private final JsopBuilder jsopBuilder = new JsopBuilder();
private boolean isWritable = true;
public static MetricsFormatter newBuilder() {
return new MetricsFormatter();
}

private MetricsFormatter() {
jsopBuilder.object();
}

public MetricsFormatter add(String key, String value) {
Preconditions.checkState(isWritable, "Formatter already built, in read-only mode");
jsopBuilder.key(key).value(value);
return this;
}

public MetricsFormatter add(String key, int value) {
Preconditions.checkState(isWritable, "Formatter already built, in read-only mode");
jsopBuilder.key(key).value(value);
return this;
}

public MetricsFormatter add(String key, long value) {
Preconditions.checkState(isWritable, "Formatter already built, in read-only mode");
jsopBuilder.key(key).value(value);
return this;
}

public MetricsFormatter add(String key, boolean value) {
Preconditions.checkState(isWritable, "Formatter already built, in read-only mode");
jsopBuilder.key(key).value(value);
return this;
}

public String build() {
if (isWritable){
jsopBuilder.endObject();
isWritable = false;
}
return jsopBuilder.toString();
}
}
Expand Up @@ -26,17 +26,21 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.collect.ArrayListMultimap;
import org.apache.jackrabbit.guava.common.collect.ListMultimap;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdate;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
import org.apache.jackrabbit.oak.plugins.index.IndexUtils;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.importer.AsyncIndexerLock.LockToken;
import org.apache.jackrabbit.oak.plugins.index.upgrade.IndexDisabler;
import org.apache.jackrabbit.oak.spi.commit.EditorDiff;
Expand All @@ -60,8 +64,8 @@ public class IndexImporter {
*/
static final String ASYNC_LANE_SYNC = "sync";
/*
* System property name for flag for preserve checkpoint. If this is set to true, then checkpoint cleanup will be skipped.
* Default is set to false.
* System property name for flag for preserve checkpoint. If this is set to true, then checkpoint cleanup will be skipped.
* Default is set to false.
*/
public static final String OAK_INDEX_IMPORTER_PRESERVE_CHECKPOINT = "oak.index.importer.preserveCheckpoint";

Expand Down Expand Up @@ -230,7 +234,7 @@ void importIndexData() throws CommitFailedException, IOException {

private void bringIndexUpToDate() throws CommitFailedException {
for (String laneName : asyncLaneToIndexMapping.keySet()) {
if (ASYNC_LANE_SYNC.equals(laneName)){
if (ASYNC_LANE_SYNC.equals(laneName)) {
continue; //TODO Handle sync indexes
}
bringAsyncIndexUpToDate(laneName, asyncLaneToIndexMapping.get(laneName));
Expand All @@ -247,7 +251,7 @@ private void bringAsyncIndexUpToDate(String laneName, List<IndexInfo> indexInfos
//TODO Support case where checkpoint got lost or complete reindexing is done

NodeState after = nodeStore.retrieve(checkpoint);
checkNotNull(after, "No state found for checkpoint [%s] for lane [%s]",checkpoint, laneName);
checkNotNull(after, "No state found for checkpoint [%s] for lane [%s]", checkpoint, laneName);
LOG.info("Proceeding to update imported indexes {} to checkpoint [{}] for lane [{}]",
indexInfos, checkpoint, laneName);

Expand Down Expand Up @@ -399,12 +403,11 @@ private static void copy(String propName, NodeState existing, NodeBuilder indexB
*
* @param indexPath path of index. Mostly used in reporting exception
* @param indexState nodeState for index at given path
*
* @return async lane name or null which would be the case for sync indexes
*/
static String getAsyncLaneName(String indexPath, NodeState indexState) {
PropertyState asyncPrevious = indexState.getProperty(AsyncLaneSwitcher.ASYNC_PREVIOUS);
if (asyncPrevious != null && !AsyncLaneSwitcher.isNone(asyncPrevious)){
if (asyncPrevious != null && !AsyncLaneSwitcher.isNone(asyncPrevious)) {
return IndexUtils.getAsyncLaneName(indexState, indexPath, asyncPrevious);
}
return IndexUtils.getAsyncLaneName(indexState, indexPath);
Expand All @@ -426,7 +429,7 @@ private void releaseCheckpoint() throws CommitFailedException {

private void incrementReIndexCount(NodeBuilder definition) {
long count = 0;
if(definition.hasProperty(REINDEX_COUNT)){
if (definition.hasProperty(REINDEX_COUNT)) {
count = definition.getProperty(REINDEX_COUNT).getValue(Type.LONG);
}
definition.setProperty(REINDEX_COUNT, count + 1);
Expand Down Expand Up @@ -463,10 +466,18 @@ interface IndexImporterStepExecutor {

void runWithRetry(int maxRetries, IndexImportState indexImportState, IndexImporterStepExecutor step) throws CommitFailedException, IOException {
int count = 1;
Stopwatch start = Stopwatch.createStarted();
while (count <= maxRetries) {
LOG.info("IndexImporterStepExecutor:{} ,count:{}", indexImportState, count);
LOG.info("IndexImporterStepExecutor:{}, count:{}", indexImportState, count);
LOG.info("[TASK:{}:START]", indexImportState);
try {
step.execute();
LOG.info("[TASK:{}:END] Metrics: {}", indexImportState,
MetricsFormatter.newBuilder()
.add("duration", FormattingUtils.formatToSeconds(start))
.add("durationSeconds", start.elapsed(TimeUnit.SECONDS))
.build()
);
break;
} catch (CommitFailedException | IOException e) {
LOG.warn("IndexImporterStepExecutor:{} fail count: {}, retries left: {}", indexImportState, count, maxRetries - count, e);
Expand Down
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.index;

import org.junit.Test;

import static org.junit.Assert.*;

public class MetricsFormatterTest {

@Test
public void testAdd() {
MetricsFormatter metricsFormatter = MetricsFormatter.newBuilder();
String result = metricsFormatter.add("key", "value")
.add("key", 1)
.add("key", 1L)
.add("key", true)
.build();
assertEquals("{\"key\":\"value\",\"key\":1,\"key\":1,\"key\":true}", result);
assertThrows(IllegalStateException.class, () -> metricsFormatter.add("key", "value"));
assertEquals("{\"key\":\"value\",\"key\":1,\"key\":1,\"key\":true}", metricsFormatter.build());
}
}
Expand Up @@ -38,8 +38,10 @@
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.TraversingRange;
import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.NodeTraversalCallback;
import org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
import org.apache.jackrabbit.oak.plugins.index.progress.MetricRateEstimator;
Expand Down Expand Up @@ -68,6 +70,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

Expand Down Expand Up @@ -138,7 +141,7 @@ public NodeStateEntryTraverser create(TraversingRange traversingRange) {
}

private List<FlatFileStore> buildFlatFileStoreList(NodeState checkpointedState, CompositeIndexer indexer, Predicate<String> pathPredicate, Set<String> preferredPathElements,
boolean splitFlatFile, Set<IndexDefinition> indexDefinitions) throws IOException {
boolean splitFlatFile, Set<IndexDefinition> indexDefinitions) throws IOException {
List<FlatFileStore> storeList = new ArrayList<>();

Stopwatch flatFileStoreWatch = Stopwatch.createStarted();
Expand Down Expand Up @@ -205,7 +208,6 @@ private List<FlatFileStore> buildFlatFileStoreList(NodeState checkpointedState,
}

/**
*
* @return an Instance of FlatFileStore, whose getFlatFileStorePath() method can be used to get the absolute path to this store.
* @throws IOException
* @throws CommitFailedException
Expand All @@ -220,7 +222,7 @@ public FlatFileStore buildFlatFileStore() throws IOException, CommitFailedExcept

Predicate<String> predicate = s -> indexDefinitions.stream().anyMatch(indexDef -> indexDef.getPathFilter().filter(s) != PathFilter.Result.EXCLUDE);
FlatFileStore flatFileStore = buildFlatFileStoreList(checkpointedState, null, predicate,
preferredPathElements, IndexerConfiguration.parallelIndexEnabled(), indexDefinitions).get(0);
preferredPathElements, IndexerConfiguration.parallelIndexEnabled(), indexDefinitions).get(0);
log.info("FlatFileStore built at {}. To use this flatFileStore in a reindex step, set System Property-{} with value {}",
flatFileStore.getFlatFileStorePath(), OAK_INDEXER_SORTED_FILE_PATH, flatFileStore.getFlatFileStorePath());
return flatFileStore;
Expand All @@ -244,14 +246,15 @@ public void reindex() throws CommitFailedException, IOException {
closer.register(indexer);

List<FlatFileStore> flatFileStores = buildFlatFileStoreList(checkpointedState, indexer,
indexer::shouldInclude, null, IndexerConfiguration.parallelIndexEnabled(), getIndexDefinitions());
indexer::shouldInclude, null, IndexerConfiguration.parallelIndexEnabled(), getIndexDefinitions());

progressReporter.reset();

progressReporter.reindexingTraversalStart("/");

preIndexOpertaions(indexer.getIndexers());
preIndexOperations(indexer.getIndexers());

log.info("[TASK:INDEXING:START] Starting indexing");
Stopwatch indexerWatch = Stopwatch.createStarted();

if (flatFileStores.size() > 1) {
Expand All @@ -267,14 +270,24 @@ public void reindex() throws CommitFailedException, IOException {
progressReporter.reindexingTraversalEnd();
progressReporter.logReport();
log.info("Completed the indexing in {}", indexerWatch);
log.info("[TASK:INDEXING:END] Metrics: {}", MetricsFormatter.newBuilder()
.add("duration", FormattingUtils.formatToSeconds(indexerWatch))
.add("durationSeconds", indexerWatch.elapsed(TimeUnit.SECONDS))
.build());

log.info("[TASK:MERGE_NODE_STORE:START] Starting merge node store");
Stopwatch mergeNodeStoreWatch = Stopwatch.createStarted();
copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
log.info("[TASK:MERGE_NODE_STORE:END] Metrics: {}", MetricsFormatter.newBuilder()
.add("duration", FormattingUtils.formatToSeconds(mergeNodeStoreWatch))
.add("durationSeconds", mergeNodeStoreWatch.elapsed(TimeUnit.SECONDS))
.build());

indexerSupport.postIndexWork(copyOnWriteStore);
}

private void indexParallel(List<FlatFileStore> storeList, CompositeIndexer indexer, IndexingProgressReporter progressReporter)
throws IOException {
throws IOException {
ExecutorService service = Executors.newFixedThreadPool(IndexerConfiguration.indexThreadPoolSize());
List<Future> futureList = new ArrayList<>();

Expand Down Expand Up @@ -394,7 +407,7 @@ protected CompositeIndexer prepareIndexers(NodeStore copyOnWriteStore, NodeBuild

protected abstract List<NodeStateIndexerProvider> createProviders() throws IOException;

protected abstract void preIndexOpertaions(List<NodeStateIndexer> indexers);
protected abstract void preIndexOperations(List<NodeStateIndexer> indexers);

//TODO OAK-7098 - Taken from IndexUpdate. Refactor to abstract out common logic like this
private void removeIndexState(NodeBuilder definition) {
Expand Down

0 comments on commit ddda6d0

Please sign in to comment.