Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HSEARCH-2761 Elasticsearch purges may fail when purging multiple classes of the same inheritance tree in the same transaction #1445

Merged
merged 8 commits into from Jun 8, 2017
Expand Up @@ -7,7 +7,6 @@
package org.hibernate.search.elasticsearch.impl;

import java.util.List;
import java.util.Set;

import org.apache.lucene.document.Document;
import org.apache.lucene.facet.FacetsConfig;
Expand Down Expand Up @@ -113,10 +112,11 @@ public ElasticsearchWork<?> visitPurgeAllWork(PurgeAllLuceneWork work, IndexingM
.luceneWork( work )
.markIndexDirty( refreshAfterWrite );

Set<Class<?>> typesToDelete = searchIntegrator.getIndexedTypesPolymorphic( new Class<?>[] { work.getEntityClass() } );
for ( Class<?> typeToDelete : typesToDelete ) {
builder.type( URLEncodedString.fromString( typeToDelete.getName() ) );
}
/*
* Deleting only the given type.
* Inheritance trees are handled at a higher level by creating multiple purge works.
*/
builder.type( URLEncodedString.fromString( work.getEntityClass().getName() ) );

return builder.build();
}
Expand Down
Expand Up @@ -102,7 +102,10 @@ public <T> T executeSyncUnsafe(ElasticsearchWork<T> work) {
* @param works The works to be executed.
*/
public void executeSyncSafe(Iterable<ElasticsearchWork<?>> works) {
executeSafe( works, true );
SequentialWorkExecutionContext context = new SequentialWorkExecutionContext(
client, gsonProvider, workFactory, this, errorHandler );
executeSafe( context, works, true );
context.flush();
}

/**
Expand Down Expand Up @@ -149,9 +152,8 @@ public void awaitAsyncProcessingCompletion() {
* @param nonBulkedWorks The works to be bulked (as much as possible) and executed
* @param refreshInBulkAPICall The parameter to pass to {@link #createRequestGroups(Iterable, boolean)}.
*/
private void executeSafe(Iterable<ElasticsearchWork<?>> nonBulkedWorks, boolean refreshInBulkAPICall) {
SequentialWorkExecutionContext context = new SequentialWorkExecutionContext(
client, gsonProvider, workFactory, this, errorHandler );
private void executeSafe(SequentialWorkExecutionContext context, Iterable<ElasticsearchWork<?>> nonBulkedWorks,
boolean refreshInBulkAPICall) {
ErrorContextBuilder errorContextBuilder = new ErrorContextBuilder();

for ( ElasticsearchWork<?> work : createRequestGroups( nonBulkedWorks, refreshInBulkAPICall ) ) {
Expand Down Expand Up @@ -183,8 +185,6 @@ private void executeSafe(Iterable<ElasticsearchWork<?>> nonBulkedWorks, boolean
break;
}
}

context.flush();
}

private void executeUnsafe(ElasticsearchWork<?> work, ElasticsearchWorkExecutionContext context) {
Expand Down Expand Up @@ -344,7 +344,7 @@ private void processAsyncWork() {
return;
}
Iterable<ElasticsearchWork<?>> flattenedWorks = CollectionHelper.flatten( works );
executeSafe( flattenedWorks, false );
executeSafe( context, flattenedWorks, false );
}
}
}
Expand Down
Expand Up @@ -17,6 +17,8 @@
import org.hibernate.search.elasticsearch.logging.impl.Log;
import org.hibernate.search.elasticsearch.util.impl.ElasticsearchClientUtils;
import org.hibernate.search.elasticsearch.work.impl.builder.DeleteByQueryWorkBuilder;
import org.hibernate.search.elasticsearch.work.impl.builder.RefreshWorkBuilder;
import org.hibernate.search.elasticsearch.work.impl.factory.ElasticsearchWorkFactory;
import org.hibernate.search.exception.AssertionFailure;
import org.hibernate.search.exception.SearchException;
import org.hibernate.search.util.logging.impl.LoggerFactory;
Expand All @@ -30,8 +32,19 @@
*/
public class ES2DeleteByQueryWork extends SimpleElasticsearchWork<Void> {

private final ElasticsearchWork<?> refreshWork;

protected ES2DeleteByQueryWork(Builder builder) {
super( builder );
this.refreshWork = builder.buildRefreshWork();
}

@Override
protected void beforeExecute(ElasticsearchWorkExecutionContext executionContext, ElasticsearchRequest request) {
/*
* Refresh the index so as to minimize the risk of version conflict
*/
refreshWork.execute( executionContext );
}

@Override
Expand All @@ -46,10 +59,13 @@ public static class Builder
private final JsonObject payload;
private final Set<URLEncodedString> typeNames = new HashSet<>();

public Builder(URLEncodedString indexName, JsonObject payload) {
private final RefreshWorkBuilder refreshWorkBuilder;

public Builder(URLEncodedString indexName, JsonObject payload, ElasticsearchWorkFactory workFactory) {
super( indexName, SuccessAssessor.INSTANCE );
this.indexName = indexName;
this.payload = payload;
this.refreshWorkBuilder = workFactory.refresh().index( indexName );
}

@Override
Expand All @@ -74,6 +90,10 @@ protected ElasticsearchRequest buildRequest() {
return builder.build();
}

protected ElasticsearchWork<?> buildRefreshWork() {
return refreshWorkBuilder.build();
}

@Override
public ES2DeleteByQueryWork build() {
return new ES2DeleteByQueryWork( this );
Expand Down
Expand Up @@ -14,6 +14,8 @@
import org.hibernate.search.elasticsearch.client.impl.Paths;
import org.hibernate.search.elasticsearch.client.impl.URLEncodedString;
import org.hibernate.search.elasticsearch.work.impl.builder.DeleteByQueryWorkBuilder;
import org.hibernate.search.elasticsearch.work.impl.builder.RefreshWorkBuilder;
import org.hibernate.search.elasticsearch.work.impl.factory.ElasticsearchWorkFactory;

import com.google.gson.JsonObject;

Expand All @@ -24,8 +26,19 @@
*/
public class ES5DeleteByQueryWork extends SimpleElasticsearchWork<Void> {

private final ElasticsearchWork<?> refreshWork;

protected ES5DeleteByQueryWork(Builder builder) {
super( builder );
this.refreshWork = builder.buildRefreshWork();
}

@Override
protected void beforeExecute(ElasticsearchWorkExecutionContext executionContext, ElasticsearchRequest request) {
/*
* Refresh the index so as to minimize the risk of version conflict
*/
refreshWork.execute( executionContext );
}

@Override
Expand All @@ -40,10 +53,13 @@ public static class Builder
private final JsonObject payload;
private final Set<URLEncodedString> typeNames = new HashSet<>();

public Builder(URLEncodedString indexName, JsonObject payload) {
private final RefreshWorkBuilder refreshWorkBuilder;

public Builder(URLEncodedString indexName, JsonObject payload, ElasticsearchWorkFactory workFactory) {
super( indexName, DefaultElasticsearchRequestSuccessAssessor.INSTANCE );
this.indexName = indexName;
this.payload = payload;
this.refreshWorkBuilder = workFactory.refresh().index( indexName );
}

@Override
Expand All @@ -56,7 +72,12 @@ public Builder type(URLEncodedString typeName) {
protected ElasticsearchRequest buildRequest() {
ElasticsearchRequest.Builder builder =
ElasticsearchRequest.post()
.pathComponent( indexName );
.pathComponent( indexName )
/*
* Ignore conflicts: if we wrote to a document concurrently,
* we just want to keep it as is.
*/
.param( "conflicts", "proceed" );

if ( !typeNames.isEmpty() ) {
builder.multiValuedPathComponent( typeNames );
Expand All @@ -68,6 +89,10 @@ protected ElasticsearchRequest buildRequest() {
return builder.build();
}

protected ElasticsearchWork<?> buildRefreshWork() {
return refreshWorkBuilder.build();
}

@Override
public ES5DeleteByQueryWork build() {
return new ES5DeleteByQueryWork( this );
Expand Down
Expand Up @@ -83,7 +83,7 @@ public DeleteWorkBuilder delete(URLEncodedString indexName, URLEncodedString typ

@Override
public DeleteByQueryWorkBuilder deleteByQuery(URLEncodedString indexName, JsonObject payload) {
return new ES2DeleteByQueryWork.Builder( indexName, payload );
return new ES2DeleteByQueryWork.Builder( indexName, payload, this );
}

@Override
Expand Down
Expand Up @@ -38,7 +38,7 @@ public OptimizeWorkBuilder optimize() {

@Override
public DeleteByQueryWorkBuilder deleteByQuery(URLEncodedString indexName, JsonObject payload) {
return new ES5DeleteByQueryWork.Builder( indexName, payload );
return new ES5DeleteByQueryWork.Builder( indexName, payload, this );
}

}