Skip to content

Commit

Permalink
OAK - 9536 | Add support in oak run for incremental indexing (apache#340
Browse files Browse the repository at this point in the history
)
  • Loading branch information
nit0906 authored and Joscorbe committed Sep 3, 2021
1 parent 7264dfe commit b55d37d
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 17 deletions.
Expand Up @@ -57,7 +57,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

public class IndexHelper implements Closeable{
public class IndexHelper implements Closeable {
private final Logger log = LoggerFactory.getLogger(getClass());
protected final NodeStore store;
protected final File outputDir;
Expand All @@ -67,7 +67,7 @@ public class IndexHelper implements Closeable{
protected final List<String> indexPaths;
private final Whiteboard whiteboard;
private Executor executor;
private final Closer closer = Closer.create();
protected final Closer closer = Closer.create();
private final BlobStore blobStore;
private final StatisticsProvider statisticsProvider;
private IndexInfoServiceImpl indexInfoService;
Expand Down
Expand Up @@ -50,17 +50,22 @@ public class IndexOptions implements OptionsBean {
private final OptionSpec<Void> definitions;
private final OptionSpec<Void> dumpIndex;
private final OptionSpec<Void> reindex;
private final OptionSpec<Void> asyncIndex;
private final OptionSpec<Void> importIndex;
private final OptionSpec<Void> docTraversal;
private final OptionSpec<Void> enableCowCor;
private final OptionSpec<Integer> consistencyCheck;
private final OptionSpec<Long> asyncDelay;
protected OptionSet options;
protected final Set<OptionSpec> actionOpts;
private final OptionSpec<String> indexPaths;
private final OptionSpec<String> checkpoint;
private final OptionSpec<String> asyncIndexLanes;
private final Set<String> operationNames;
private final OptionSpec<File> existingDataDumpDirOpt;



public IndexOptions(OptionParser parser){
workDirOpt = parser.accepts("index-temp-dir", "Directory used for storing temporary files")
.withRequiredArg().ofType(File.class).defaultsTo(new File("temp"));
Expand Down Expand Up @@ -88,12 +93,21 @@ public IndexOptions(OptionParser parser){
"only Lucene indexes are supported. Possible values 1 - Basic check, 2 - Full check (slower)")
.withOptionalArg().ofType(Integer.class).defaultsTo(1);

asyncDelay = parser.accepts("async-delay", "Delay (in seconds) between the execution of async cycles for a given lane")
.withOptionalArg().ofType(Long.class).defaultsTo(5L);

dumpIndex = parser.accepts("index-dump", "Dumps index content");
reindex = parser.accepts("reindex", "Reindex the indexes specified by --index-paths or --index-definitions-file");
asyncIndex = parser.accepts("async-index", "Runs async index cycle");

asyncIndexLanes = parser.accepts("async-index-lanes", "Comma separated list of async index lanes for which the " +
"async index cycles would run")
.withRequiredArg().ofType(String.class).withValuesSeparatedBy(",");

importIndex = parser.accepts("index-import", "Imports index");
docTraversal = parser.accepts("doc-traversal-mode", "Use Document traversal mode for reindex in " +
"DocumentNodeStore setups. This may provide better performance in some cases (experimental)");
enableCowCor = parser.accepts("enable-cow-cor", "Enables COW/COR during async indexing using oak-run");

indexImportDir = parser.accepts("index-import-dir", "Directory containing index files. This " +
"is required when --index-import operation is selected")
Expand Down Expand Up @@ -183,10 +197,18 @@ public int consistencyCheckLevel(){
return consistencyCheck.value(options);
}

public long aysncDelay() {
return asyncDelay.value(options);
}

public boolean isReindex() {
return options.has(reindex);
}

public boolean isAsyncIndex() {
return options.has(asyncIndex);
}

public boolean isImportIndex() {
return options.has(importIndex);
}
Expand All @@ -195,6 +217,10 @@ public boolean isDocTraversalMode() {
return options.has(docTraversal);
}

public boolean isCowCorEnabled() {
return options.has(enableCowCor);
}

public String getCheckpoint(){
return checkpoint.value(options);
}
Expand All @@ -203,6 +229,10 @@ public List<String> getIndexPaths(){
return options.has(indexPaths) ? trim(indexPaths.values(options)) : Collections.emptyList();
}

public List<String> getAsyncLanes(){
return options.has(asyncIndexLanes) ? trim(asyncIndexLanes.values(options)) : Collections.emptyList();
}

private boolean anyActionSelected(){
for (OptionSpec spec : actionOpts){
if (options.has(spec)){
Expand Down
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.index.async;

import com.google.common.io.Closer;
import org.apache.jackrabbit.oak.index.IndexHelper;
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public abstract class AsyncIndexerBase implements Closeable {

private static final Logger log = LoggerFactory.getLogger(AsyncIndexerBase.class);
private final IndexHelper indexHelper;
protected final Closer closer;
private final List<String> names;
private static final long INIT_DELAY = 0;
private final long delay;
private final ScheduledExecutorService pool;
private final CountDownLatch latch;

public AsyncIndexerBase(IndexHelper indexHelper, Closer closer, List<String> names, long delay) {
this.indexHelper = indexHelper;
this.closer = closer;
this.names = names;
this.delay = delay;
pool = Executors.newScheduledThreadPool(names.size());
latch = new CountDownLatch(1);
}

public void execute() throws InterruptedException, IOException {
addShutDownHook();
IndexEditorProvider editorProvider = getIndexEditorProvider();
// This can be null in case of any exception while initializing index copier in lucene.
if (editorProvider == null) {
log.error("EditorProvider is null, can't proceed further. Exiting");
closer.close();
}
// Register async tasks for all lanes to the ScheduledThreadPoolExecutor
for (String name : names) {
log.info("Setting up Async executor for lane - " + name);
AsyncIndexUpdate task = new AsyncIndexUpdate(name, indexHelper.getNodeStore(),
editorProvider, StatisticsProvider.NOOP, false);
closer.register(task);

pool.scheduleWithFixedDelay(task, INIT_DELAY, delay, TimeUnit.SECONDS);
}
// Make the main thread wait now, since we want this to run continuously
// Although ScheduledExecutorService would still keep executing even if we let the main thread exit
// but it will cleanup logging resources and other closeables and create problems.
latch.await();
}

@Override
public void close() throws IOException {
log.info("Closing down Async Indexer Service...");
latch.countDown();
pool.shutdown();
}

/**
Since this would be running continuously in a loop, we can't possibly call closures in a normal conventional manner
otherwise resources would be closed from the main thread and spawned off threads will still be running and will fail.
So we handle closures as part of shut down hooks in case of SIGINT, SIGTERM etc.
**/
private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
closer.close();
} catch (IOException e) {
log.error("Exception during cleanup ", e);
}
}
});
}

public abstract IndexEditorProvider getIndexEditorProvider();

}



6 changes: 6 additions & 0 deletions oak-run-elastic/pom.xml
Expand Up @@ -183,6 +183,12 @@
<artifactId>oak-search-elastic</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-blob-cloud-azure</artifactId>
<version>${project.version}</version>
<optional>true</optional>
</dependency>
<!-- Nullability annotations -->
<dependency>
<groupId>org.jetbrains</groupId>
Expand Down
Expand Up @@ -27,6 +27,7 @@
import joptsimple.OptionParser;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.index.async.AsyncIndexerElastic;
import org.apache.jackrabbit.oak.plugins.commit.AnnotatingConflictHandler;
import org.apache.jackrabbit.oak.plugins.commit.ConflictHook;
import org.apache.jackrabbit.oak.plugins.commit.ConflictValidatorProvider;
Expand All @@ -38,7 +39,13 @@
import org.apache.jackrabbit.oak.run.cli.Options;
import org.apache.jackrabbit.oak.run.commons.Command;
import org.apache.jackrabbit.oak.run.commons.LoggingInitializer;
import org.apache.jackrabbit.oak.spi.commit.*;
import org.apache.jackrabbit.oak.spi.commit.CompositeEditorProvider;
import org.apache.jackrabbit.oak.spi.commit.CompositeHook;
import org.apache.jackrabbit.oak.spi.commit.EditorHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.CommitContext;
import org.apache.jackrabbit.oak.spi.commit.SimpleCommitContext;
import org.apache.jackrabbit.oak.spi.commit.ResetCommitAttributeHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
Expand Down Expand Up @@ -90,12 +97,22 @@ public void execute(String... args) throws Exception {

boolean success = false;
try {
try (Closer closer = Closer.create()) {
if (indexOpts.isAsyncIndex()) {
Closer closer = Closer.create();
NodeStoreFixture fixture = NodeStoreFixtureProvider.create(opts);
IndexHelper indexHelper = createIndexHelper(fixture, indexOpts, closer);
AsyncIndexerElastic asyncIndexerService = new AsyncIndexerElastic(indexHelper, closer,
indexOpts.getAsyncLanes(), indexOpts.aysncDelay(), indexOpts);
closer.register(asyncIndexerService);
closer.register(fixture);
execute(fixture, indexOpts, closer);
asyncIndexerService.execute();
} else {
try (Closer closer = Closer.create()) {
NodeStoreFixture fixture = NodeStoreFixtureProvider.create(opts);
closer.register(fixture);
execute(fixture, indexOpts, closer);
}
}

success = true;
} catch (Throwable e) {
log.error("Error occurred while performing index tasks", e);
Expand Down Expand Up @@ -153,7 +170,7 @@ private List<String> computeIndexPaths(ElasticIndexOptions indexOpts) throws IOE
return new ArrayList<>(indexPaths);
}

private void applyIndexDefOperation(ElasticIndexOptions indexOpts, IndexHelper indexHelper) throws IOException, CommitFailedException {
private void applyIndexDefOperation(ElasticIndexOptions indexOpts, IndexHelper indexHelper) throws IOException, CommitFailedException {
// return if index opts don't contain --applyIndexDef option or --read-write is not present
if (!indexOpts.isApplyIndexDef() || !opts.getCommonOpts().isReadWrite()) {
log.info("Index def not applied to repo. Run index command with --applyIndexDef and --read-write without the --reindex " +
Expand Down
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.index.async;

import com.google.common.io.Closer;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.index.ElasticIndexOptions;
import org.apache.jackrabbit.oak.index.IndexHelper;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
import java.util.List;

public class AsyncIndexerElastic extends AsyncIndexerBase {

private final ElasticIndexOptions indexOpts;
public AsyncIndexerElastic(IndexHelper indexHelper, Closer closer, List<String> names,
long delay, ElasticIndexOptions indexOpts) {
super(indexHelper, closer, names, delay);
this.indexOpts = indexOpts;
}

@Override
public IndexEditorProvider getIndexEditorProvider() {
final ElasticConnection.Builder.BuildStep buildStep = ElasticConnection.newBuilder()
.withIndexPrefix(indexOpts.getIndexPrefix())
.withConnectionParameters(
indexOpts.getElasticScheme(),
indexOpts.getElasticHost(),
indexOpts.getElasticPort()
);
final ElasticConnection coordinate;
if (indexOpts.getApiKeyId() != null && indexOpts.getApiKeySecret() != null) {
coordinate = buildStep.withApiKeys(indexOpts.getApiKeyId(), indexOpts.getApiKeySecret()).build();
} else {
coordinate = buildStep.build();
}
closer.register(coordinate);

return new ElasticIndexEditorProvider(coordinate,
new ExtractedTextCache(10 * FileUtils.ONE_MB, 100));
}
}
3 changes: 3 additions & 0 deletions oak-run-elastic/src/main/resources/logback-indexing.xml
Expand Up @@ -40,6 +40,9 @@
<logger name="org.apache.jackrabbit.oak.plugins.index.importer" level="INFO">
<appender-ref ref="STDOUT" />
</logger>
<logger name="org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate" level="DEBUG">
<appender-ref ref="index" />
</logger>
<logger name="org.apache.jackrabbit.oak.plugins.index.IndexUpdate" level="INFO">
<appender-ref ref="STDOUT" />
</logger>
Expand Down
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.jackrabbit.oak.index;

import com.google.common.io.Closer;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexInfoServiceImpl;
import org.apache.jackrabbit.oak.plugins.index.datastore.DataStoreTextWriter;
Expand All @@ -29,17 +28,15 @@
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class ExtendedIndexHelper extends IndexHelper implements Closeable {
public class ExtendedIndexHelper extends IndexHelper {

private LuceneIndexHelper luceneIndexHelper;
private ExtractedTextCache extractedTextCache;
private final Closer closer = Closer.create();

public ExtendedIndexHelper(NodeStore store, BlobStore blobStore, Whiteboard whiteboard,
File outputDir, File workDir, List<String> indexPaths) {
Expand Down

0 comments on commit b55d37d

Please sign in to comment.