Skip to content
Permalink
Browse files
OAK-9721: Add state information while importing index (#527)
* OAK-9721: Add state information while importing index
  • Loading branch information
tihom88 committed Mar 28, 2022
1 parent eed89c6 commit 42f1754e5756209da47299b9785f176948c885b7
Showing 7 changed files with 534 additions and 67 deletions.

Large diffs are not rendered by default.

@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@Version("0.2.0")
@Version("0.3.0")
package org.apache.jackrabbit.oak.plugins.index.importer;

import org.osgi.annotation.versioning.Version;
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.MessageFormat;
import java.util.Properties;
import java.util.Set;

@@ -33,6 +34,7 @@
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
@@ -64,6 +66,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.event.Level;

import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.collect.ImmutableSet.of;
@@ -89,6 +92,19 @@ public class IndexImporterTest {

private NodeStore store = new MemoryNodeStore();
private IndexEditorProvider provider = new PropertyIndexEditorProvider();
private LogCustomizer customizer;

@Before
public void setup(){
customizer = LogCustomizer.forLogger(IndexImporter.class.getName()).filter(Level.INFO).create();
customizer.starting();
}

@After
public void after() {
customizer.finished();
}


@Test(expected = IllegalArgumentException.class)
public void importIndex_NoMeta() throws Exception{
@@ -143,10 +159,11 @@ public void importData_CallbackInvoked() throws Exception{
NodeBuilder builder = store.getRoot().builder();
builder.child("idx-a").setProperty("type", "property");
builder.child("idx-a").setProperty("foo", "bar");
builder.child("idx-a").setProperty("async", "async");

store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);

createIndexDirs("/idx-a");
String checkpoint = createIndexDirs("/idx-a");

IndexImporter importer = new IndexImporter(store, temporaryFolder.getRoot(), provider, NOOP_LOCK);

@@ -164,10 +181,11 @@ public String getType() {
}
};
importer.addImporterProvider(provider);
importer.switchLanes();
importer.importIndexData();

assertTrue(store.getRoot().getChildNode("idx-a").getBoolean("imported"));
try{
importer.importIndex();
} catch (Exception e){
assertTrue(store.getRoot().getChildNode("idx-a").getBoolean("imported"));
}
}

@Test
@@ -264,6 +282,7 @@ public String getType() {
//It would not pickup /e as thats not yet indexed as part of last checkpoint
assertEquals(of("a", "b", "c", "d"), find(lookup, "foo", "abc", f));
assertNull(store.retrieve(checkpoint));
assertNull(store.getRoot().getChildNode("oak:index").getChildNode("fooIndex").getProperty(IndexImporter.INDEX_IMPORT_STATE_KEY));
}

@Test
@@ -538,4 +557,155 @@ private void dumpIndexDefinitions(String... indexPaths) throws IOException, Comm
printer.print(pw, Format.JSON, false);
Files.write(sw.toString(), file, UTF_8);
}

private String importDataIncrementalUpdateBeforeSetupMethod() throws IOException, CommitFailedException {
NodeBuilder builder = store.getRoot().builder();
createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
"fooIndex", true, false, ImmutableSet.of("foo"), null)
.setProperty(ASYNC_PROPERTY_NAME, "async");
builder.child("a").setProperty("foo", "abc");
builder.child("b").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);

new AsyncIndexUpdate("async", store, provider).run();

String checkpoint = createIndexDirs("/oak:index/fooIndex");
builder = store.getRoot().builder();
builder.child("c").setProperty("foo", "abc");
builder.child("d").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);

new AsyncIndexUpdate("async", store, provider).run();

FilterImpl f = createFilter(store.getRoot(), NT_BASE);
PropertyIndexLookup lookup = new PropertyIndexLookup(store.getRoot());
assertEquals(of("a", "b", "c", "d"), find(lookup, "foo", "abc", f));

builder = store.getRoot().builder();
builder.child("e").setProperty("foo", "abc");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);

return checkpoint;
}

private IndexImporterProvider getImporterProvider(String checkpoint) {
IndexImporterProvider importerProvider = new IndexImporterProvider() {
@Override
public void importIndex(NodeState root, NodeBuilder defn, File indexDir) {
assertEquals("fooIndex", indexDir.getName());
assertEquals(2, defn.getProperty(REINDEX_COUNT).getValue(Type.LONG).longValue());
defn.getChildNode(IndexConstants.INDEX_CONTENT_NODE_NAME).remove();

NodeState cpState = store.retrieve(checkpoint);
NodeState indexData = NodeStateUtils.getNode(cpState, "/oak:index/fooIndex/:index");
defn.setChildNode(IndexConstants.INDEX_CONTENT_NODE_NAME, indexData);
}

@Override
public String getType() {
return "property";
}
};
return importerProvider;
}

@Test
public void importDataIncrementalupdateTestFailureAtSwitchlaneState() throws Exception {
String checkpoint = importDataIncrementalUpdateBeforeSetupMethod();
try {
IndexImporter importer = new IndexImporter(store, temporaryFolder.getRoot(), provider, NOOP_LOCK) {
@Override
void switchLanes() throws CommitFailedException {
throw new CommitFailedException("dummy", 1, "Explicitly throw CommitFailedException");
}
};
importer.addImporterProvider(getImporterProvider(checkpoint));
importer.importIndex();
} finally {
NodeState idx = store.getRoot().getChildNode("oak:index").getChildNode("fooIndex");
assertEquals("async", idx.getString("async"));
FilterImpl f = createFilter(store.getRoot(), NT_BASE);
PropertyIndexLookup lookup = new PropertyIndexLookup(store.getRoot());
assertEquals(of("a", "b", "c", "d"), find(lookup, "foo", "abc", f));

lookup = new PropertyIndexLookup(store.getRoot());
//It would not pickup /e as thats not yet indexed as part of last checkpoint
assertEquals(of("a", "b", "c", "d"), find(lookup, "foo", "abc", f));
// checkpoint is not released because of failure
assertNotNull(store.retrieve(checkpoint));
// As failure is on switchlanes no update happened i.e. no state change got committed
assertEquals("State matching failed",
null,
store.getRoot().getChildNode("oak:index").getChildNode("fooIndex").getProperty(IndexImporter.INDEX_IMPORT_STATE_KEY));

// Test retry logic
String failureLog = MessageFormat.format("IndexImporterStepExecutor:{0} failed after {1} retries",
IndexImporter.IndexImportState.SWITCH_LANE, IndexImporter.RETRIES);
boolean failureLogPresent = false;
for (String log : customizer.getLogs()) {
if (log.equals(failureLog)) {
failureLogPresent = true;
break;
}
}
assertTrue(failureLogPresent);

assertEquals("State matching failed",
null,
store.getRoot().getChildNode("oak:index").getChildNode("fooIndex").getProperty(IndexImporter.INDEX_IMPORT_STATE_KEY));
}
}

@Test
public void importDataIncrementalUpdateTestFailureAtImportIndexDataState() throws Exception {
String checkpoint = importDataIncrementalUpdateBeforeSetupMethod();
try {
IndexImporter importer = new IndexImporter(store, temporaryFolder.getRoot(), provider, NOOP_LOCK) {
@Override
void importIndexData() throws CommitFailedException, IOException {
throw new IOException("Explicitly throw IOException");
}
};
;
importer.addImporterProvider(getImporterProvider(checkpoint));
importer.importIndex();
} finally {
NodeState idx = store.getRoot().getChildNode("oak:index").getChildNode("fooIndex");
assertEquals("async", idx.getString("async"));
FilterImpl f = createFilter(store.getRoot(), NT_BASE);
PropertyIndexLookup lookup = new PropertyIndexLookup(store.getRoot());
assertEquals(of("a", "b", "c", "d"), find(lookup, "foo", "abc", f));

lookup = new PropertyIndexLookup(store.getRoot());
//It would not pickup /e as thats not yet indexed as part of last checkpoint
assertEquals(of("a", "b", "c", "d"), find(lookup, "foo", "abc", f));
assertNotNull(store.retrieve(checkpoint));
assertEquals("State matching failed",
IndexImporter.IndexImportState.SWITCH_LANE.toString(),
store.getRoot().getChildNode("oak:index").getChildNode("fooIndex").getProperty(IndexImporter.INDEX_IMPORT_STATE_KEY).getValue(Type.STRING));
}
}

@Test
public void importDataIncrementalUpdateNoFailure() throws Exception {
String checkpoint = importDataIncrementalUpdateBeforeSetupMethod();
try {
IndexImporter importer = new IndexImporter(store, temporaryFolder.getRoot(), provider, NOOP_LOCK);
importer.addImporterProvider(getImporterProvider(checkpoint));
importer.importIndex();
NodeState idx = store.getRoot().getChildNode("oak:index").getChildNode("fooIndex");
assertEquals("async", idx.getString("async"));
} finally {
NodeState idx = store.getRoot().getChildNode("oak:index").getChildNode("fooIndex");
assertEquals("async", idx.getString("async"));
FilterImpl f = createFilter(store.getRoot(), NT_BASE);
PropertyIndexLookup lookup = new PropertyIndexLookup(store.getRoot());
//It would not pickup /e as thats not yet indexed as part of last checkpoint
assertEquals(of("a", "b", "c", "d"), find(lookup, "foo", "abc", f));
assertNull(store.retrieve(checkpoint));
assertEquals("State matching failed",
null,
store.getRoot().getChildNode("oak:index").getChildNode("fooIndex").getProperty(IndexImporter.INDEX_IMPORT_STATE_KEY));
}
}
}
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.index.lucene;

import org.apache.jackrabbit.JcrConstants;
import org.apache.jackrabbit.oak.InitialContentHelper;
import org.apache.jackrabbit.oak.api.ContentRepository;
import org.apache.jackrabbit.oak.api.Tree;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.index.FunctionIndexCommonTest;
import org.apache.jackrabbit.oak.plugins.index.IndexImporterReindexTest;
import org.apache.jackrabbit.oak.plugins.index.LuceneIndexOptions;
import org.apache.jackrabbit.oak.plugins.index.search.FulltextIndexConstants;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
import org.junit.After;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NODE_TYPE;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;

public class LuceneIndexImporterReindexTest extends IndexImporterReindexTest {

private ExecutorService executorService = Executors.newFixedThreadPool(2);
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder(new File("target"));

protected Tree createIndex(String name, Set<String> propNames) {
Tree index = root.getTree("/");
return createIndex(index, name, propNames);
}

protected Tree createIndex(Tree index, String name, Set<String> propNames) {
Tree def = index.addChild(INDEX_DEFINITIONS_NAME).addChild(name);
def.setProperty(JcrConstants.JCR_PRIMARYTYPE,
INDEX_DEFINITIONS_NODE_TYPE, Type.NAME);
def.setProperty(TYPE_PROPERTY_NAME, LuceneIndexConstants.TYPE_LUCENE);
def.setProperty(REINDEX_PROPERTY_NAME, true);
def.setProperty(FulltextIndexConstants.FULL_TEXT_ENABLED, false);
def.setProperty(PropertyStates.createProperty(FulltextIndexConstants.INCLUDE_PROPERTY_NAMES, propNames, Type.STRINGS));
def.setProperty(LuceneIndexConstants.SAVE_DIR_LISTING, true);
return index.getChild(INDEX_DEFINITIONS_NAME).getChild(name);
}

@Override
protected ContentRepository createRepository() {
LuceneTestRepositoryBuilder luceneTestRepositoryBuilder = new LuceneTestRepositoryBuilder(executorService, temporaryFolder);
luceneTestRepositoryBuilder.setNodeStore(new MemoryNodeStore(InitialContentHelper.INITIAL_CONTENT));
repositoryOptionsUtil = luceneTestRepositoryBuilder.build();
indexOptions = new LuceneIndexOptions();
return repositoryOptionsUtil.getOak()
.createContentRepository();
}

@Override
protected String getLoggerName() {
return LuceneIndexEditor.class.getName();
}

@After
public void shutdownExecutor() {
executorService.shutdown();
}

}
@@ -23,19 +23,23 @@
import org.apache.jackrabbit.oak.plugins.index.search.ReindexOperations;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;

public class ElasticIndexImporter implements IndexImporterProvider {

private static final Logger LOG = LoggerFactory.getLogger(ElasticIndexCleaner.class);

public ElasticIndexImporter(){
}

@Override
public void importIndex(NodeState root, NodeBuilder definitionBuilder, File indexDir) throws IOException, CommitFailedException {
// NOOP for elastic
LOG.info("No need to import data in case of elastic since this is a remote index. Exiting with NOOP");
}

@Override
@@ -19,6 +19,7 @@

package org.apache.jackrabbit.oak.plugins.index.search;

import org.apache.jackrabbit.oak.plugins.index.importer.IndexImporter;
import org.apache.jackrabbit.oak.plugins.index.search.util.NodeStateCloner;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -63,12 +64,14 @@ public ReindexOperations(NodeState root, NodeBuilder definitionBuilder, String i
public IndexDefinition apply(boolean useStateFromBuilder) {
IndexFormatVersion version = IndexDefinition.determineVersionForFreshIndex(definitionBuilder);
definitionBuilder.setProperty(IndexDefinition.INDEX_VERSION, version.getVersion());
definitionBuilder.removeProperty(IndexImporter.INDEX_IMPORT_STATE_KEY);

//Avoid obtaining the latest NodeState from builder as that would force purge of current transient state
//as index definition does not get modified as part of IndexUpdate run in most case we rely on base state
//For case where index definition is rewritten there we get fresh state
NodeState defnState = useStateFromBuilder ? definitionBuilder.getNodeState() : definitionBuilder.getBaseState();
if (storedIndexDefinitionEnabled) {
definitionBuilder.setChildNode(INDEX_DEFINITION_NODE, NodeStateCloner.cloneVisibleState(defnState));
definitionBuilder.setChildNode(INDEX_DEFINITION_NODE, NodeStateCloner.cloneVisibleState(defnState));
if (definitionBuilder.getChildNode(STATUS_NODE).exists()) {
definitionBuilder.getChildNode(STATUS_NODE).removeProperty(REINDEX_COMPLETION_TIMESTAMP);

0 comments on commit 42f1754

Please sign in to comment.