Skip to content

Commit

Permalink
Percolator: Registering (indexing) a new percolator query will still …
Browse files Browse the repository at this point in the history
…be stored in memory if actually indexing it fails, closes elastic#1965.
  • Loading branch information
kimchy committed May 19, 2012
1 parent 1d7562b commit b5aa1cf
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 14 deletions.
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.merge.policy.EnableMergePolicy;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
Expand Down Expand Up @@ -92,6 +93,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {

private final ThreadPool threadPool;

private final ShardIndexingService indexingService;

private final IndexSettingsService indexSettingsService;

private final Store store;
Expand Down Expand Up @@ -152,7 +155,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {

@Inject
public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
IndexSettingsService indexSettingsService,
IndexSettingsService indexSettingsService, ShardIndexingService indexingService,
Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler,
AnalysisService analysisService, SimilarityService similarityService,
Expand All @@ -170,6 +173,7 @@ public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, Threa

this.threadPool = threadPool;
this.indexSettingsService = indexSettingsService;
this.indexingService = indexingService;
this.store = store;
this.deletionPolicy = deletionPolicy;
this.translog = translog;
Expand Down Expand Up @@ -464,6 +468,8 @@ private void innerCreate(Create create, IndexWriter writer) throws IOException {
Translog.Location translogLocation = translog.add(new Translog.Create(create));

versionMap.put(create.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));

indexingService.postCreateUnderLock(create);
}
}

Expand Down Expand Up @@ -576,6 +582,8 @@ private void innerIndex(Index index, IndexWriter writer) throws IOException {
Translog.Location translogLocation = translog.add(new Translog.Index(index));

versionMap.put(index.uid().text(), new VersionValue(updatedVersion, false, threadPool.estimatedTimeInMillis(), translogLocation));

indexingService.postIndexUnderLock(index);
}
}

Expand Down Expand Up @@ -678,6 +686,8 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
versionMap.put(delete.uid().text(), new VersionValue(updatedVersion, true, threadPool.estimatedTimeInMillis(), translogLocation));
}

indexingService.postDeleteUnderLock(delete);
}
}

Expand Down
Expand Up @@ -22,30 +22,78 @@
import org.elasticsearch.index.engine.Engine;

/**
*
* An indexing listener for indexing, delete, events.
*/
public abstract class IndexingOperationListener {

/**
* Called before the indexing occurs.
*/
public Engine.Create preCreate(Engine.Create create) {
return create;
}

/**
* Called after the indexing occurs, under a locking scheme to maintain
* concurrent updates to the same doc.
* <p/>
* Note, long operations should not occur under this callback.
*/
public void postCreateUnderLock(Engine.Create create) {

}

/**
* Called after the indexing operation occurred.
*/
public void postCreate(Engine.Create create) {

}

/**
* Called before the indexing occurs.
*/
public Engine.Index preIndex(Engine.Index index) {
return index;
}

/**
* Called after the indexing occurs, under a locking scheme to maintain
* concurrent updates to the same doc.
* <p/>
* Note, long operations should not occur under this callback.
*/
public void postIndexUnderLock(Engine.Index index) {

}

/**
* Called after the indexing operation occurred.
*/
public void postIndex(Engine.Index index) {

}

/**
* Called before the delete occurs.
*/
public Engine.Delete preDelete(Engine.Delete delete) {
return delete;
}

/**
* Called after the delete occurs, under a locking scheme to maintain
* concurrent updates to the same doc.
* <p/>
* Note, long operations should not occur under this callback.
*/
public void postDeleteUnderLock(Engine.Delete delete) {

}

/**
* Called after the delete operation occurred.
*/
public void postDelete(Engine.Delete delete) {

}
Expand Down
Expand Up @@ -103,13 +103,29 @@ public Engine.Create preCreate(Engine.Create create) {
return create;
}

public void postCreateUnderLock(Engine.Create create) {
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
try {
listener.postCreateUnderLock(create);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
}
}
}
}

public void postCreate(Engine.Create create) {
long took = create.endTime() - create.startTime();
totalStats.indexMetric.inc(took);
typeStats(create.type()).indexMetric.inc(took);
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
listener.postCreate(create);
try {
listener.postCreate(create);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
}
}
}
}
Expand All @@ -125,6 +141,18 @@ public Engine.Index preIndex(Engine.Index index) {
return index;
}

public void postIndexUnderLock(Engine.Index index) {
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
try {
listener.postIndexUnderLock(index);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
}
}
}
}

public void postIndex(Engine.Index index) {
long took = index.endTime() - index.startTime();
totalStats.indexMetric.inc(took);
Expand All @@ -134,7 +162,11 @@ public void postIndex(Engine.Index index) {
typeStats.indexCurrent.dec();
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
listener.postIndex(index);
try {
listener.postIndex(index);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
}
}
}
}
Expand All @@ -155,6 +187,18 @@ public Engine.Delete preDelete(Engine.Delete delete) {
return delete;
}

public void postDeleteUnderLock(Engine.Delete delete) {
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
try {
listener.postDeleteUnderLock(delete);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
}
}
}
}

public void postDelete(Engine.Delete delete) {
long took = delete.endTime() - delete.startTime();
totalStats.deleteMetric.inc(took);
Expand All @@ -164,7 +208,11 @@ public void postDelete(Engine.Delete delete) {
typeStats.deleteCurrent.dec();
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
listener.postDelete(delete);
try {
listener.postDelete(delete);
} catch (Exception e) {
logger.warn("post listener [{}] failed", e, listener);
}
}
}
}
Expand Down
Expand Up @@ -223,7 +223,7 @@ public Query parseQuery(String name, byte[] source, int sourceOffset, int source
String currentFieldName = null;
XContentParser.Token token = parser.nextToken(); // move the START_OBJECT
if (token != XContentParser.Token.START_OBJECT) {
throw new ElasticSearchException("Failed to add query [" + name + "], not starting with OBJECT");
throw new ElasticSearchException("failed to parse query [" + name + "], not starting with OBJECT");
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
Expand All @@ -240,8 +240,8 @@ public Query parseQuery(String name, byte[] source, int sourceOffset, int source
}
}
return query;
} catch (IOException e) {
throw new ElasticSearchException("Failed to add query [" + name + "]", e);
} catch (Exception e) {
throw new ElasticSearchException("failed to parse query [" + name + "]", e);
} finally {
if (parser != null) {
parser.close();
Expand Down
Expand Up @@ -257,26 +257,44 @@ class RealTimePercolatorOperationListener extends IndexingOperationListener {

@Override
public Engine.Create preCreate(Engine.Create create) {
// validate the query here, before we index
if (create.type().equals(index().name())) {
percolator.addQuery(create.id(), create.source(), create.sourceOffset(), create.sourceLength());
percolator.parseQuery(create.id(), create.source(), create.sourceOffset(), create.sourceLength());
}
return create;
}

@Override
public void postCreateUnderLock(Engine.Create create) {
// add the query under a doc lock
if (create.type().equals(index().name())) {
percolator.addQuery(create.id(), create.source(), create.sourceOffset(), create.sourceLength());
}
}

@Override
public Engine.Index preIndex(Engine.Index index) {
// validate the query here, before we index
if (index.type().equals(index().name())) {
percolator.addQuery(index.id(), index.source(), index.sourceOffset(), index.sourceLength());
percolator.parseQuery(index.id(), index.source(), index.sourceOffset(), index.sourceLength());
}
return index;
}

@Override
public Engine.Delete preDelete(Engine.Delete delete) {
public void postIndexUnderLock(Engine.Index index) {
// add the query under a doc lock
if (index.type().equals(index().name())) {
percolator.addQuery(index.id(), index.source(), index.sourceOffset(), index.sourceLength());
}
}

@Override
public void postDeleteUnderLock(Engine.Delete delete) {
// remove the query under a lock
if (delete.type().equals(index().name())) {
percolator.removeQuery(delete.id());
}
return delete;
}
}
}
Expand Up @@ -21,13 +21,14 @@

import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.bloom.none.NoneBloomCache;
import org.elasticsearch.test.unit.index.engine.AbstractSimpleEngineTests;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.robin.RobinEngine;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.unit.index.engine.AbstractSimpleEngineTests;
import org.elasticsearch.threadpool.ThreadPool;

import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
Expand All @@ -38,7 +39,7 @@
public class SimpleRobinEngineTests extends AbstractSimpleEngineTests {

protected Engine createEngine(Store store, Translog translog) {
return new RobinEngine(shardId, EMPTY_SETTINGS, new ThreadPool(), new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
return new RobinEngine(shardId, EMPTY_SETTINGS, new ThreadPool(), new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), new ShardIndexingService(shardId, EMPTY_SETTINGS), store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NoneBloomCache(shardId.index()));
}
}

0 comments on commit b5aa1cf

Please sign in to comment.