Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-10423 - Improve logging of metrics in the indexing job #1093

Merged
merged 12 commits into from
Sep 8, 2023
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.index;

import org.apache.jackrabbit.guava.common.base.Stopwatch;

import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;

public class FormattingUtils {
public static String formatToSeconds(Stopwatch stopwatch) {
LocalTime seconds = LocalTime.ofSecondOfDay(stopwatch.elapsed(TimeUnit.SECONDS));
return DateTimeFormatter.ISO_TIME.format(seconds);
}

public static String formatToMillis(Stopwatch stopwatch) {
LocalTime nanoSeconds = LocalTime.ofNanoOfDay(stopwatch.elapsed(TimeUnit.MILLISECONDS)*1000000);
return DateTimeFormatter.ISO_TIME.format(nanoSeconds);
}
}
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.index;

import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.oak.commons.json.JsopBuilder;

public class MetricsFormatter {
private final JsopBuilder jsopBuilder = new JsopBuilder();
private boolean isWritable = true;
public static MetricsFormatter newBuilder() {
return new MetricsFormatter();
}

private MetricsFormatter() {
jsopBuilder.object();
}

public MetricsFormatter add(String key, String value) {
Preconditions.checkState(isWritable, "Formatter already built, in read-only mode");
jsopBuilder.key(key).value(value);
return this;
}

public MetricsFormatter add(String key, int value) {
Preconditions.checkState(isWritable, "Formatter already built, in read-only mode");
jsopBuilder.key(key).value(value);
return this;
}

public MetricsFormatter add(String key, long value) {
Preconditions.checkState(isWritable, "Formatter already built, in read-only mode");
jsopBuilder.key(key).value(value);
return this;
}

public MetricsFormatter add(String key, boolean value) {
Preconditions.checkState(isWritable, "Formatter already built, in read-only mode");
jsopBuilder.key(key).value(value);
return this;
}

public String build() {
if (isWritable){
jsopBuilder.endObject();
isWritable = false;
}
return jsopBuilder.toString();
}
}
Expand Up @@ -26,17 +26,21 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.collect.ArrayListMultimap;
import org.apache.jackrabbit.guava.common.collect.ListMultimap;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdate;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
import org.apache.jackrabbit.oak.plugins.index.IndexUtils;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.importer.AsyncIndexerLock.LockToken;
import org.apache.jackrabbit.oak.plugins.index.upgrade.IndexDisabler;
import org.apache.jackrabbit.oak.spi.commit.EditorDiff;
Expand All @@ -60,8 +64,8 @@ public class IndexImporter {
*/
static final String ASYNC_LANE_SYNC = "sync";
/*
* System property name for flag for preserve checkpoint. If this is set to true, then checkpoint cleanup will be skipped.
* Default is set to false.
* System property name for flag for preserve checkpoint. If this is set to true, then checkpoint cleanup will be skipped.
* Default is set to false.
*/
public static final String OAK_INDEX_IMPORTER_PRESERVE_CHECKPOINT = "oak.index.importer.preserveCheckpoint";

Expand Down Expand Up @@ -230,7 +234,7 @@ void importIndexData() throws CommitFailedException, IOException {

private void bringIndexUpToDate() throws CommitFailedException {
for (String laneName : asyncLaneToIndexMapping.keySet()) {
if (ASYNC_LANE_SYNC.equals(laneName)){
if (ASYNC_LANE_SYNC.equals(laneName)) {
continue; //TODO Handle sync indexes
}
bringAsyncIndexUpToDate(laneName, asyncLaneToIndexMapping.get(laneName));
Expand All @@ -247,7 +251,7 @@ private void bringAsyncIndexUpToDate(String laneName, List<IndexInfo> indexInfos
//TODO Support case where checkpoint got lost or complete reindexing is done

NodeState after = nodeStore.retrieve(checkpoint);
checkNotNull(after, "No state found for checkpoint [%s] for lane [%s]",checkpoint, laneName);
checkNotNull(after, "No state found for checkpoint [%s] for lane [%s]", checkpoint, laneName);
LOG.info("Proceeding to update imported indexes {} to checkpoint [{}] for lane [{}]",
indexInfos, checkpoint, laneName);

Expand Down Expand Up @@ -399,12 +403,11 @@ private static void copy(String propName, NodeState existing, NodeBuilder indexB
*
* @param indexPath path of index. Mostly used in reporting exception
* @param indexState nodeState for index at given path
*
* @return async lane name or null which would be the case for sync indexes
*/
static String getAsyncLaneName(String indexPath, NodeState indexState) {
PropertyState asyncPrevious = indexState.getProperty(AsyncLaneSwitcher.ASYNC_PREVIOUS);
if (asyncPrevious != null && !AsyncLaneSwitcher.isNone(asyncPrevious)){
if (asyncPrevious != null && !AsyncLaneSwitcher.isNone(asyncPrevious)) {
return IndexUtils.getAsyncLaneName(indexState, indexPath, asyncPrevious);
}
return IndexUtils.getAsyncLaneName(indexState, indexPath);
Expand All @@ -426,7 +429,7 @@ private void releaseCheckpoint() throws CommitFailedException {

private void incrementReIndexCount(NodeBuilder definition) {
long count = 0;
if(definition.hasProperty(REINDEX_COUNT)){
if (definition.hasProperty(REINDEX_COUNT)) {
count = definition.getProperty(REINDEX_COUNT).getValue(Type.LONG);
}
definition.setProperty(REINDEX_COUNT, count + 1);
Expand Down Expand Up @@ -463,10 +466,18 @@ interface IndexImporterStepExecutor {

void runWithRetry(int maxRetries, IndexImportState indexImportState, IndexImporterStepExecutor step) throws CommitFailedException, IOException {
int count = 1;
Stopwatch start = Stopwatch.createStarted();
while (count <= maxRetries) {
LOG.info("IndexImporterStepExecutor:{} ,count:{}", indexImportState, count);
LOG.info("IndexImporterStepExecutor:{}, count:{}", indexImportState, count);
LOG.info("[TASK:{}:START]", indexImportState.name());
try {
step.execute();
LOG.info("[TASK:{}:END] Metrics: {}", indexImportState.name(),
MetricsFormatter.newBuilder()
.add("duration", FormattingUtils.formatToSeconds(start))
.add("durationSeconds", start.elapsed(TimeUnit.SECONDS))
.build()
);
break;
} catch (CommitFailedException | IOException e) {
LOG.warn("IndexImporterStepExecutor:{} fail count: {}, retries left: {}", indexImportState, count, maxRetries - count, e);
Expand Down
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.index;

import org.junit.Test;

import static org.junit.Assert.*;

public class MetricsFormatterTest {

@Test
public void testAdd() {
MetricsFormatter metricsFormatter = MetricsFormatter.newBuilder();
String result = metricsFormatter.add("key", "value")
.add("key", 1)
.add("key", 1L)
.add("key", true)
.build();
assertEquals("{\"key\":\"value\",\"key\":1,\"key\":1,\"key\":true}", result);
assertThrows(IllegalStateException.class, () -> metricsFormatter.add("key", "value"));
assertEquals("{\"key\":\"value\",\"key\":1,\"key\":1,\"key\":true}", metricsFormatter.build());
}
}
Expand Up @@ -38,8 +38,10 @@
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.TraversingRange;
import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.NodeTraversalCallback;
import org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
import org.apache.jackrabbit.oak.plugins.index.progress.MetricRateEstimator;
Expand Down Expand Up @@ -68,6 +70,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

Expand Down Expand Up @@ -138,7 +141,7 @@ public NodeStateEntryTraverser create(TraversingRange traversingRange) {
}

private List<FlatFileStore> buildFlatFileStoreList(NodeState checkpointedState, CompositeIndexer indexer, Predicate<String> pathPredicate, Set<String> preferredPathElements,
boolean splitFlatFile, Set<IndexDefinition> indexDefinitions) throws IOException {
boolean splitFlatFile, Set<IndexDefinition> indexDefinitions) throws IOException {
List<FlatFileStore> storeList = new ArrayList<>();

Stopwatch flatFileStoreWatch = Stopwatch.createStarted();
Expand Down Expand Up @@ -205,7 +208,6 @@ private List<FlatFileStore> buildFlatFileStoreList(NodeState checkpointedState,
}

/**
*
* @return an Instance of FlatFileStore, whose getFlatFileStorePath() method can be used to get the absolute path to this store.
* @throws IOException
* @throws CommitFailedException
Expand All @@ -220,7 +222,7 @@ public FlatFileStore buildFlatFileStore() throws IOException, CommitFailedExcept

Predicate<String> predicate = s -> indexDefinitions.stream().anyMatch(indexDef -> indexDef.getPathFilter().filter(s) != PathFilter.Result.EXCLUDE);
FlatFileStore flatFileStore = buildFlatFileStoreList(checkpointedState, null, predicate,
preferredPathElements, IndexerConfiguration.parallelIndexEnabled(), indexDefinitions).get(0);
preferredPathElements, IndexerConfiguration.parallelIndexEnabled(), indexDefinitions).get(0);
log.info("FlatFileStore built at {}. To use this flatFileStore in a reindex step, set System Property-{} with value {}",
flatFileStore.getFlatFileStorePath(), OAK_INDEXER_SORTED_FILE_PATH, flatFileStore.getFlatFileStorePath());
return flatFileStore;
Expand All @@ -244,14 +246,15 @@ public void reindex() throws CommitFailedException, IOException {
closer.register(indexer);

List<FlatFileStore> flatFileStores = buildFlatFileStoreList(checkpointedState, indexer,
indexer::shouldInclude, null, IndexerConfiguration.parallelIndexEnabled(), getIndexDefinitions());
indexer::shouldInclude, null, IndexerConfiguration.parallelIndexEnabled(), getIndexDefinitions());

progressReporter.reset();

progressReporter.reindexingTraversalStart("/");

preIndexOpertaions(indexer.getIndexers());
preIndexOperations(indexer.getIndexers());

log.info("[TASK:INDEXING:START] Starting indexing");
Stopwatch indexerWatch = Stopwatch.createStarted();

if (flatFileStores.size() > 1) {
Expand All @@ -267,14 +270,24 @@ public void reindex() throws CommitFailedException, IOException {
progressReporter.reindexingTraversalEnd();
progressReporter.logReport();
log.info("Completed the indexing in {}", indexerWatch);
log.info("[TASK:INDEXING:END] Metrics: {}", MetricsFormatter.newBuilder()
.add("duration", FormattingUtils.formatToSeconds(indexerWatch))
.add("durationSeconds", indexerWatch.elapsed(TimeUnit.SECONDS))
.build());

log.info("[TASK:MERGE_NODE_STORE:START] Starting merge node store");
Stopwatch mergeNodeStoreWatch = Stopwatch.createStarted();
copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
log.info("[TASK:MERGE_NODE_STORE:END] Metrics: {}", MetricsFormatter.newBuilder()
.add("duration", FormattingUtils.formatToSeconds(mergeNodeStoreWatch))
.add("durationSeconds", mergeNodeStoreWatch.elapsed(TimeUnit.SECONDS))
.build());

indexerSupport.postIndexWork(copyOnWriteStore);
}

private void indexParallel(List<FlatFileStore> storeList, CompositeIndexer indexer, IndexingProgressReporter progressReporter)
throws IOException {
throws IOException {
ExecutorService service = Executors.newFixedThreadPool(IndexerConfiguration.indexThreadPoolSize());
List<Future> futureList = new ArrayList<>();

Expand Down Expand Up @@ -394,7 +407,7 @@ protected CompositeIndexer prepareIndexers(NodeStore copyOnWriteStore, NodeBuild

protected abstract List<NodeStateIndexerProvider> createProviders() throws IOException;

protected abstract void preIndexOpertaions(List<NodeStateIndexer> indexers);
protected abstract void preIndexOperations(List<NodeStateIndexer> indexers);

//TODO OAK-7098 - Taken from IndexUpdate. Refactor to abstract out common logic like this
private void removeIndexState(NodeBuilder definition) {
Expand Down