Skip to content
Permalink
Browse files
OAK-9690 | Adding bringIndexUptoDate support for elastic index post o…
…ffline reindexing (#492)
  • Loading branch information
nit0906 committed Feb 23, 2022
1 parent 48edee5 commit bfa326d467074656711174c60ef0ba390b77a9c1
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 62 deletions.
@@ -77,6 +77,10 @@ public static void switchLane(NodeBuilder idxBuilder, String laneName) {
idxBuilder.setProperty(newAsyncState);
}

public static boolean isLaneSwitched(NodeBuilder idxBuilder) {
return idxBuilder.hasProperty(ASYNC_PREVIOUS);
}

public static String getTempLaneName(String laneName){
return TEMP_LANE_PREFIX + laneName;
}
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@Version("0.1.0")
package org.apache.jackrabbit.oak.plugins.index.importer;

import org.osgi.annotation.versioning.Version;
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.index;

import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.importer.AsyncIndexerLock;
import org.apache.jackrabbit.oak.plugins.index.importer.ClusterNodeStoreLock;
import org.apache.jackrabbit.oak.plugins.index.importer.IndexImporter;
import org.apache.jackrabbit.oak.spi.state.Clusterable;
import org.apache.jackrabbit.oak.spi.state.NodeStore;

import java.io.File;
import java.io.IOException;

public abstract class IndexImporterSupportBase {

protected final NodeStore nodeStore;
protected final IndexHelper indexHelper;

public IndexImporterSupportBase(IndexHelper indexHelper) {
this.nodeStore = indexHelper.getNodeStore();
this.indexHelper = indexHelper;
}

public void importIndex(File importDir) throws IOException, CommitFailedException {
IndexImporter importer = new IndexImporter(nodeStore, importDir, createIndexEditorProvider(), createLock());
addImportProviders(importer);
importer.importIndex();
}

private AsyncIndexerLock createLock() {
if (nodeStore instanceof Clusterable) {
return new ClusterNodeStoreLock(nodeStore);
}
//For oak-run usage with non Clusterable NodeStore indicates that NodeStore is not
//active. So we can use a noop lock implementation as there is no concurrent run
return AsyncIndexerLock.NOOP_LOCK;
}

protected abstract IndexEditorProvider createIndexEditorProvider() throws IOException;

protected abstract void addImportProviders(IndexImporter importer);


}
@@ -62,7 +62,7 @@ public class IndexerSupport {
*/
private static final String REINDEX_LANE = "offline-reindex-async";
private Map<String, String> checkpointInfo = Collections.emptyMap();
private final IndexHelper indexHelper;
protected final IndexHelper indexHelper;
private File localIndexDir;
private File indexDefinitions;
private String checkpoint;
@@ -119,7 +119,7 @@ private void updateIndexDefinitions(NodeBuilder rootBuilder) throws IOException,
}
}

private void dumpIndexDefinitions(NodeStore nodeStore) throws IOException, CommitFailedException {
protected void dumpIndexDefinitions(NodeStore nodeStore) throws IOException {
IndexDefinitionPrinter printer = new IndexDefinitionPrinter(nodeStore, indexHelper.getIndexPathService());
printer.setFilter("{\"properties\":[\"*\", \"-:childOrder\"],\"nodes\":[\"*\", \"-:index-definition\"]}");
PrinterDumper dumper = new PrinterDumper(getLocalIndexDir(), IndexDefinitionUpdater.INDEX_DEFINITIONS_JSON,
@@ -150,7 +150,7 @@ public void postIndexWork(NodeStore copyOnWriteStore) throws CommitFailedExcepti
dumpIndexDefinitions(copyOnWriteStore);
}

private void switchIndexLanesBack(NodeStore copyOnWriteStore) throws CommitFailedException, IOException {
protected void switchIndexLanesBack(NodeStore copyOnWriteStore) throws CommitFailedException {
NodeState root = copyOnWriteStore.getRoot();
NodeBuilder builder = root.builder();

@@ -31,7 +31,6 @@
import org.apache.jackrabbit.oak.plugins.commit.AnnotatingConflictHandler;
import org.apache.jackrabbit.oak.plugins.commit.ConflictHook;
import org.apache.jackrabbit.oak.plugins.commit.ConflictValidatorProvider;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
import org.apache.jackrabbit.oak.run.cli.CommonOptions;
import org.apache.jackrabbit.oak.run.cli.NodeStoreFixture;
@@ -47,7 +46,6 @@
import org.apache.jackrabbit.oak.spi.commit.SimpleCommitContext;
import org.apache.jackrabbit.oak.spi.commit.ResetCommitAttributeHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,9 +135,10 @@ private void execute(NodeStoreFixture fixture, ElasticIndexOptions indexOpts, Cl
//dumpIndexStats(indexOpts, indexHelper);
//dumpIndexDefinitions(indexOpts, indexHelper);
reindexOperation(indexOpts, indexHelper);
// This will not work with --doc-traversal mode, since that only works with read only mode and apply index def needs read write mode
// read write requirement - logic handled in applyIndexDefOperation
applyIndexDefOperation(indexOpts, indexHelper);

// For elastic implementation - this applies the newly created elastic definition to the repo and brings the index up to date with the
// current state for async lane for this index.
importIndexOperation(indexOpts, indexHelper);
}

private IndexHelper createIndexHelper(NodeStoreFixture fixture,
@@ -170,28 +169,19 @@ private List<String> computeIndexPaths(ElasticIndexOptions indexOpts) throws IOE
return new ArrayList<>(indexPaths);
}

private void applyIndexDefOperation(ElasticIndexOptions indexOpts, IndexHelper indexHelper) throws IOException, CommitFailedException {
// return if index opts don't contain --applyIndexDef option or --read-write is not present
if (!indexOpts.isApplyIndexDef() || !opts.getCommonOpts().isReadWrite()) {
log.info("Index def not applied to repo. Run index command with --applyIndexDef and --read-write without the --reindex " +
"opts to apply the index def");
return;
private void importIndexOperation(IndexOptions indexOpts, IndexHelper indexHelper) throws IOException, CommitFailedException {
if (indexOpts.isImportIndex()) {
File importDir = indexOpts.getIndexImportDir();
importIndex(indexHelper, importDir);
}
applyIndexDef(indexOpts, indexHelper);
}

private void applyIndexDef(ElasticIndexOptions indexOpts, IndexHelper indexHelper) throws IOException, CommitFailedException {
File definitions = indexOpts.getIndexDefinitionsFile();
if (definitions != null) {
Preconditions.checkArgument(definitions.exists(), "Index definitions file [%s] not found", getPath(definitions));
NodeStore store = indexHelper.getNodeStore();
NodeState root = store.getRoot();
NodeBuilder rootBuilder = root.builder();
new IndexDefinitionUpdater(definitions).apply(rootBuilder);
mergeWithConcurrentCheck(store, rootBuilder);
} else {
log.warn("No index definitions file provided");
}
private void importIndex(IndexHelper indexHelper, File importDir) throws IOException, CommitFailedException {
try (ElasticIndexImporterSupport elasticIndexImporterSupport = new ElasticIndexImporterSupport(indexHelper, indexOpts.getIndexPrefix(),
indexOpts.getElasticScheme(), indexOpts.getElasticHost(),
indexOpts.getElasticPort(), indexOpts.getApiKeyId(), indexOpts.getApiKeySecret())) {
elasticIndexImporterSupport.importIndex(importDir);
}
}

private void reindexOperation(ElasticIndexOptions indexOpts, IndexHelper indexHelper) throws IOException, CommitFailedException {
@@ -216,33 +206,27 @@ private void reindex(ElasticIndexOptions indexOpts, IndexHelper indexHelper, Str
indexOpts.getElasticScheme(), indexOpts.getElasticHost(),
indexOpts.getElasticPort(), indexOpts.getApiKeyId(), indexOpts.getApiKeySecret())) {
indexer.reindex();
// Wait for default flush interval before exiting the try block
// to make sure the client is not closed before the last flush
// TODO : See if this can be handled in a better manner
Thread.sleep(ElasticIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT * 2);
} catch (InterruptedException e) {
log.debug("Exception while waiting for Elastic connection to close", e);
}
} else {
try (ElasticOutOfBandIndexer indexer = new ElasticOutOfBandIndexer(indexHelper, indexerSupport, indexOpts.getIndexPrefix(),
indexOpts.getElasticScheme(), indexOpts.getElasticHost(),
indexOpts.getElasticPort(), indexOpts.getApiKeyId(), indexOpts.getApiKeySecret())) {

indexer.reindex();
// Wait for default flush interval before exiting the try block
// to make sure the client is not closed before the last flush
Thread.sleep(ElasticIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT * 2);
} catch (InterruptedException e) {
log.debug("Exception while waiting for Elastic connection to close", e);
}
}
indexerSupport.writeMetaInfo(checkpoint);
log.info("Indexing completed for indexes {} in {} ({} ms)",
indexHelper.getIndexPaths(), w, w.elapsed(TimeUnit.MILLISECONDS));

// This will copy the metadata files (consisting of the checkpoint info and indexes that have been re-indexed)
// to the o/p directory. We need to do this because the working dir where they have been created would be cleaned up.
// In case of lucene, even the index files are created here and copied as part of this, but for elastic - it's just metadata.
File destDir = indexerSupport.copyIndexFilesToOutput();
log.info("Indexing completed for indexes {} in {} ({} ms) and index metadata files are copied to {}",
indexHelper.getIndexPaths(), w, w.elapsed(TimeUnit.MILLISECONDS), ElasticIndexCommand.getPath(destDir));
}

private IndexerSupport createIndexerSupport(IndexHelper indexHelper, String checkpoint) {
IndexerSupport indexerSupport = new IndexerSupport(indexHelper, checkpoint);
IndexerSupport indexerSupport = new ElasticIndexerSupport(indexHelper, checkpoint);

File definitions = indexOpts.getIndexDefinitionsFile();
if (definitions != null) {
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.index;

import com.google.common.io.Closer;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.plugins.index.CompositeIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexImporter;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexTracker;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticMetricHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.importer.IndexImporter;
import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.reference.ReferenceEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;

import java.io.Closeable;
import java.io.IOException;

public class ElasticIndexImporterSupport extends IndexImporterSupportBase implements Closeable {
private final String indexPrefix;
private final String scheme;
private final String host;
private final int port;
private final String apiKeyId;
private final String apiSecretId;
protected final Closer closer = Closer.create();

public ElasticIndexImporterSupport(IndexHelper indexHelper ,String indexPrefix, String scheme,
String host, int port,
String apiKeyId, String apiSecretId) {
super(indexHelper);
this.indexPrefix = indexPrefix;
this.scheme = scheme;
this.host = host;
this.port = port;
this.apiKeyId = apiKeyId;
this.apiSecretId = apiSecretId;
}

@Override
protected IndexEditorProvider createIndexEditorProvider() {
MountInfoProvider mip = indexHelper.getMountInfoProvider();
return new CompositeIndexEditorProvider(
createElasticEditorProvider(),
new PropertyIndexEditorProvider().with(mip),
new ReferenceEditorProvider().with(mip)
);
}

@Override
protected void addImportProviders(IndexImporter importer) {
importer.addImporterProvider(new ElasticIndexImporter());
}

private IndexEditorProvider createElasticEditorProvider() {
final ElasticConnection.Builder.BuildStep buildStep = ElasticConnection.newBuilder()
.withIndexPrefix(indexPrefix)
.withConnectionParameters(
scheme,
host,
port
);
final ElasticConnection connection;
if (apiKeyId != null && apiSecretId != null) {
connection = buildStep.withApiKeys(apiKeyId, apiSecretId).build();
} else {
connection = buildStep.build();
}
closer.register(connection);
ElasticIndexTracker indexTracker = new ElasticIndexTracker(connection,
new ElasticMetricHandler(StatisticsProvider.NOOP));
ElasticIndexEditorProvider editorProvider = new ElasticIndexEditorProvider(indexTracker, connection,
new ExtractedTextCache(10 * FileUtils.ONE_MB, 100));
return editorProvider;
}

@Override
public void close() throws IOException {
closer.close();
}
}
@@ -35,7 +35,6 @@ public class ElasticIndexOptions extends IndexOptions {
private final OptionSpec<String> apiKeySecret;
private final OptionSpec<Integer> port;
private final OptionSpec<String> indexPrefix;
private final OptionSpec<Void> applyIndexDef;


public ElasticIndexOptions(OptionParser parser) {
@@ -52,7 +51,6 @@ public ElasticIndexOptions(OptionParser parser) {
.withRequiredArg().ofType(String.class);
indexPrefix = parser.accepts("indexPrefix", "Elastic indexPrefix")
.withRequiredArg().ofType(String.class);
applyIndexDef = parser.accepts("applyIndexDef", "Apply Index Definitions to repo specified by --index-definitions-file");
}

public String getElasticScheme() {
@@ -79,7 +77,4 @@ public String getIndexPrefix() {
return indexPrefix.value(options);
}

public boolean isApplyIndexDef() {
return options.has(applyIndexDef);
}
}

0 comments on commit bfa326d

Please sign in to comment.