diff --git a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/impl/ElasticsearchIndexWorkVisitor.java b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/impl/ElasticsearchIndexWorkVisitor.java index 5a9d8ca55a6..a5d3fbd915e 100644 --- a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/impl/ElasticsearchIndexWorkVisitor.java +++ b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/impl/ElasticsearchIndexWorkVisitor.java @@ -7,7 +7,6 @@ package org.hibernate.search.elasticsearch.impl; import java.util.List; -import java.util.Set; import org.apache.lucene.document.Document; import org.apache.lucene.facet.FacetsConfig; @@ -113,10 +112,11 @@ public ElasticsearchWork visitPurgeAllWork(PurgeAllLuceneWork work, IndexingM .luceneWork( work ) .markIndexDirty( refreshAfterWrite ); - Set> typesToDelete = searchIntegrator.getIndexedTypesPolymorphic( new Class[] { work.getEntityClass() } ); - for ( Class typeToDelete : typesToDelete ) { - builder.type( URLEncodedString.fromString( typeToDelete.getName() ) ); - } + /* + * Deleting only the given type. + * Inheritance trees are handled at a higher level by creating multiple purge works. + */ + builder.type( URLEncodedString.fromString( work.getEntityClass().getName() ) ); return builder.build(); } diff --git a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/ElasticsearchWorkProcessor.java b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/ElasticsearchWorkProcessor.java index a291df620ba..1aa4a2d17de 100644 --- a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/ElasticsearchWorkProcessor.java +++ b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/ElasticsearchWorkProcessor.java @@ -102,7 +102,10 @@ public T executeSyncUnsafe(ElasticsearchWork work) { * @param works The works to be executed. */ public void executeSyncSafe(Iterable> works) { - executeSafe( works, true ); + SequentialWorkExecutionContext context = new SequentialWorkExecutionContext( + client, gsonProvider, workFactory, this, errorHandler ); + executeSafe( context, works, true ); + context.flush(); } /** @@ -149,9 +152,8 @@ public void awaitAsyncProcessingCompletion() { * @param nonBulkedWorks The works to be bulked (as much as possible) and executed * @param refreshInBulkAPICall The parameter to pass to {@link #createRequestGroups(Iterable, boolean)}. */ - private void executeSafe(Iterable> nonBulkedWorks, boolean refreshInBulkAPICall) { - SequentialWorkExecutionContext context = new SequentialWorkExecutionContext( - client, gsonProvider, workFactory, this, errorHandler ); + private void executeSafe(SequentialWorkExecutionContext context, Iterable> nonBulkedWorks, + boolean refreshInBulkAPICall) { ErrorContextBuilder errorContextBuilder = new ErrorContextBuilder(); for ( ElasticsearchWork work : createRequestGroups( nonBulkedWorks, refreshInBulkAPICall ) ) { @@ -183,8 +185,6 @@ private void executeSafe(Iterable> nonBulkedWorks, boolean break; } } - - context.flush(); } private void executeUnsafe(ElasticsearchWork work, ElasticsearchWorkExecutionContext context) { @@ -344,7 +344,7 @@ private void processAsyncWork() { return; } Iterable> flattenedWorks = CollectionHelper.flatten( works ); - executeSafe( flattenedWorks, false ); + executeSafe( context, flattenedWorks, false ); } } } diff --git a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/ES2DeleteByQueryWork.java b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/ES2DeleteByQueryWork.java index cc566fbe327..0b68d697350 100644 --- a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/ES2DeleteByQueryWork.java +++ b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/ES2DeleteByQueryWork.java @@ -17,6 +17,8 @@ import org.hibernate.search.elasticsearch.logging.impl.Log; import org.hibernate.search.elasticsearch.util.impl.ElasticsearchClientUtils; import org.hibernate.search.elasticsearch.work.impl.builder.DeleteByQueryWorkBuilder; +import org.hibernate.search.elasticsearch.work.impl.builder.RefreshWorkBuilder; +import org.hibernate.search.elasticsearch.work.impl.factory.ElasticsearchWorkFactory; import org.hibernate.search.exception.AssertionFailure; import org.hibernate.search.exception.SearchException; import org.hibernate.search.util.logging.impl.LoggerFactory; @@ -30,8 +32,19 @@ */ public class ES2DeleteByQueryWork extends SimpleElasticsearchWork { + private final ElasticsearchWork refreshWork; + protected ES2DeleteByQueryWork(Builder builder) { super( builder ); + this.refreshWork = builder.buildRefreshWork(); + } + + @Override + protected void beforeExecute(ElasticsearchWorkExecutionContext executionContext, ElasticsearchRequest request) { + /* + * Refresh the index so as to minimize the risk of version conflict + */ + refreshWork.execute( executionContext ); } @Override @@ -46,10 +59,13 @@ public static class Builder private final JsonObject payload; private final Set typeNames = new HashSet<>(); - public Builder(URLEncodedString indexName, JsonObject payload) { + private final RefreshWorkBuilder refreshWorkBuilder; + + public Builder(URLEncodedString indexName, JsonObject payload, ElasticsearchWorkFactory workFactory) { super( indexName, SuccessAssessor.INSTANCE ); this.indexName = indexName; this.payload = payload; + this.refreshWorkBuilder = workFactory.refresh().index( indexName ); } @Override @@ -74,6 +90,10 @@ protected ElasticsearchRequest buildRequest() { return builder.build(); } + protected ElasticsearchWork buildRefreshWork() { + return refreshWorkBuilder.build(); + } + @Override public ES2DeleteByQueryWork build() { return new ES2DeleteByQueryWork( this ); diff --git a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/ES5DeleteByQueryWork.java b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/ES5DeleteByQueryWork.java index 3e516c3830c..01c45530d62 100644 --- a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/ES5DeleteByQueryWork.java +++ b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/ES5DeleteByQueryWork.java @@ -14,6 +14,8 @@ import org.hibernate.search.elasticsearch.client.impl.Paths; import org.hibernate.search.elasticsearch.client.impl.URLEncodedString; import org.hibernate.search.elasticsearch.work.impl.builder.DeleteByQueryWorkBuilder; +import org.hibernate.search.elasticsearch.work.impl.builder.RefreshWorkBuilder; +import org.hibernate.search.elasticsearch.work.impl.factory.ElasticsearchWorkFactory; import com.google.gson.JsonObject; @@ -24,8 +26,19 @@ */ public class ES5DeleteByQueryWork extends SimpleElasticsearchWork { + private final ElasticsearchWork refreshWork; + protected ES5DeleteByQueryWork(Builder builder) { super( builder ); + this.refreshWork = builder.buildRefreshWork(); + } + + @Override + protected void beforeExecute(ElasticsearchWorkExecutionContext executionContext, ElasticsearchRequest request) { + /* + * Refresh the index so as to minimize the risk of version conflict + */ + refreshWork.execute( executionContext ); } @Override @@ -40,10 +53,13 @@ public static class Builder private final JsonObject payload; private final Set typeNames = new HashSet<>(); - public Builder(URLEncodedString indexName, JsonObject payload) { + private final RefreshWorkBuilder refreshWorkBuilder; + + public Builder(URLEncodedString indexName, JsonObject payload, ElasticsearchWorkFactory workFactory) { super( indexName, DefaultElasticsearchRequestSuccessAssessor.INSTANCE ); this.indexName = indexName; this.payload = payload; + this.refreshWorkBuilder = workFactory.refresh().index( indexName ); } @Override @@ -56,7 +72,12 @@ public Builder type(URLEncodedString typeName) { protected ElasticsearchRequest buildRequest() { ElasticsearchRequest.Builder builder = ElasticsearchRequest.post() - .pathComponent( indexName ); + .pathComponent( indexName ) + /* + * Ignore conflicts: if we wrote to a document concurrently, + * we just want to keep it as is. + */ + .param( "conflicts", "proceed" ); if ( !typeNames.isEmpty() ) { builder.multiValuedPathComponent( typeNames ); @@ -68,6 +89,10 @@ protected ElasticsearchRequest buildRequest() { return builder.build(); } + protected ElasticsearchWork buildRefreshWork() { + return refreshWorkBuilder.build(); + } + @Override public ES5DeleteByQueryWork build() { return new ES5DeleteByQueryWork( this ); diff --git a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/factory/Elasticsearch2WorkFactory.java b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/factory/Elasticsearch2WorkFactory.java index 10062aebd2e..0052be913f3 100644 --- a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/factory/Elasticsearch2WorkFactory.java +++ b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/factory/Elasticsearch2WorkFactory.java @@ -83,7 +83,7 @@ public DeleteWorkBuilder delete(URLEncodedString indexName, URLEncodedString typ @Override public DeleteByQueryWorkBuilder deleteByQuery(URLEncodedString indexName, JsonObject payload) { - return new ES2DeleteByQueryWork.Builder( indexName, payload ); + return new ES2DeleteByQueryWork.Builder( indexName, payload, this ); } @Override diff --git a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/factory/Elasticsearch5WorkFactory.java b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/factory/Elasticsearch5WorkFactory.java index cda7b751b06..04de8ba3985 100644 --- a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/factory/Elasticsearch5WorkFactory.java +++ b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/work/impl/factory/Elasticsearch5WorkFactory.java @@ -38,7 +38,7 @@ public OptimizeWorkBuilder optimize() { @Override public DeleteByQueryWorkBuilder deleteByQuery(URLEncodedString indexName, JsonObject payload) { - return new ES5DeleteByQueryWork.Builder( indexName, payload ); + return new ES5DeleteByQueryWork.Builder( indexName, payload, this ); } } diff --git a/elasticsearch/src/test/java/org/hibernate/search/elasticsearch/test/PurgeIT.java b/elasticsearch/src/test/java/org/hibernate/search/elasticsearch/test/PurgeIT.java new file mode 100644 index 00000000000..b1f60ca601a --- /dev/null +++ b/elasticsearch/src/test/java/org/hibernate/search/elasticsearch/test/PurgeIT.java @@ -0,0 +1,252 @@ +/* + * Hibernate Search, full-text search for your domain model + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later + * See the lgpl.txt file in the root directory or . + */ +package org.hibernate.search.elasticsearch.test; + +import static org.junit.Assert.assertEquals; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Inheritance; +import javax.persistence.InheritanceType; + +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.hibernate.Transaction; +import org.hibernate.search.FullTextSession; +import org.hibernate.search.Search; +import org.hibernate.search.annotations.Field; +import org.hibernate.search.annotations.Indexed; +import org.hibernate.search.backend.FlushLuceneWork; +import org.hibernate.search.elasticsearch.cfg.ElasticsearchEnvironment; +import org.hibernate.search.elasticsearch.impl.JsonBuilder; +import org.hibernate.search.elasticsearch.testutil.TestElasticsearchClient; +import org.hibernate.search.indexes.spi.IndexManager; +import org.hibernate.search.test.SearchTestBase; +import org.hibernate.search.test.util.impl.ExpectedLog4jLog; +import org.hibernate.search.testsupport.TestForIssue; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) // Do not use the CustomRunner, it messes with rules (by reusing the same test instance) +public class PurgeIT extends SearchTestBase { + + @Rule + public ExpectedLog4jLog logged = ExpectedLog4jLog.create(); + + @Rule + public TestElasticsearchClient elasticsearchClient = new TestElasticsearchClient(); + + private FullTextSession fullTextSession; + + @Override + @Before + public void setUp() throws Exception { + // Make sure that no automatic refresh will occur during the test + elasticsearchClient.template( "no_automatic_refresh" ) + .create( + "*", + JsonBuilder.object() + .add( "index", JsonBuilder.object() + .addProperty( "refresh_interval", "-1" ) + ) + .build() + ); + + super.setUp(); + + createPersistAndIndexTestData(); + } + + /* + * Test executing multiple purges without explicitly refreshing in-between. + */ + @Test + @TestForIssue(jiraKey = "HSEARCH-2761") + public void multiplePurges() throws Exception { + flush(); + List all = getAll(); + assertEquals( "Wrong total number of entries", 3, all.size() ); + + // Expect 0 failure in the backend threads + logged.expectEventMissing( new TypeSafeMatcher() { + + @Override + public void describeTo(Description description) { + description.appendText( "a LoggingEvent with ERROR level or higher" ); + } + + @Override + protected boolean matchesSafely(LoggingEvent item) { + return item.getLevel().isGreaterOrEqual( Level.ERROR ); + } + } ); + + Transaction tx = fullTextSession.beginTransaction(); + + // Order is significant to reproduce the issue, see HSEARCH-2761 + fullTextSession.purgeAll( Level2.class ); + fullTextSession.purgeAll( Level3.class ); + fullTextSession.purgeAll( Level1.class ); + + tx.commit(); + + flush(); + all = getAll(); + assertEquals( "Wrong total number of entries. Index should be empty after purge.", 0, all.size() ); + + tx = fullTextSession.beginTransaction(); + fullTextSession.createIndexer() + .batchSizeToLoadObjects( 25 ) + .threadsToLoadObjects( 1 ) + .optimizeOnFinish( true ) + .startAndWait(); + tx.commit(); + + flush(); + all = getAll(); + assertEquals( "Wrong total number of entries.", 3, all.size() ); + } + + /* + * Test executing a purge after a write without explicitly refreshing in-between. + */ + @Test + public void writeThenPurge() throws Exception { + flush(); + List all = getAll(); + assertEquals( "Wrong total number of entries", 3, all.size() ); + + // Expect 0 failure in the backend threads + logged.expectEventMissing( new TypeSafeMatcher() { + + @Override + public void describeTo(Description description) { + description.appendText( "a LoggingEvent with ERROR level or higher" ); + } + + @Override + protected boolean matchesSafely(LoggingEvent item) { + return item.getLevel().isGreaterOrEqual( Level.ERROR ); + } + } ); + + Transaction tx = fullTextSession.beginTransaction(); + + fullTextSession.index( fullTextSession.get( Level1.class, 1L ) ); + + tx.commit(); + + tx = fullTextSession.beginTransaction(); + + fullTextSession.purgeAll( Level1.class ); + + tx.commit(); + + flush(); + all = getAll(); + assertEquals( "Wrong total number of entries. Index should be empty after purge.", 0, all.size() ); + + tx = fullTextSession.beginTransaction(); + fullTextSession.createIndexer() + .batchSizeToLoadObjects( 25 ) + .threadsToLoadObjects( 1 ) + .optimizeOnFinish( true ) + .startAndWait(); + tx.commit(); + + flush(); + all = getAll(); + assertEquals( "Wrong total number of entries.", 3, all.size() ); + } + + /** + * Perform a flush, which implies a refresh + */ + private void flush() { + IndexManager indexManager = getExtendedSearchIntegrator().getIndexBinding( Level1.class ).getIndexManagers()[0]; + indexManager.performOperations( Collections.singletonList( FlushLuceneWork.INSTANCE ), null ); + } + + private void createPersistAndIndexTestData() { + Level1 level1 = new Level1(); + level1.id = 1L; + level1.text = "Level 1"; + + Level2 level2 = new Level2(); + level2.id = 2L; + level2.text = "Level 2" ; + + Level3 level3 = new Level3(); + level3.id = 3L; + level3.text = "Level 3"; + + fullTextSession = Search.getFullTextSession( openSession() ); + + Transaction tx = fullTextSession.beginTransaction(); + fullTextSession.persist( level1 ); + fullTextSession.persist( level2 ); + fullTextSession.persist( level3 ); + tx.commit(); + + fullTextSession.clear(); + } + + @SuppressWarnings("unchecked") + private List getAll() { + Query query = new MatchAllDocsQuery(); + return fullTextSession.createFullTextQuery( query, Level1.class ).list(); + } + + @Override + public void configure(Map settings) { + // This test should work fine even without refreshes after writes + settings.put( "hibernate.search.default." + ElasticsearchEnvironment.REFRESH_AFTER_WRITE, "false" ); + } + + @Override + public Class[] getAnnotatedClasses() { + return new Class[] { + Level1.class, + Level2.class, + Level3.class + }; + } + + @Entity + @Inheritance(strategy = InheritanceType.JOINED) + @Indexed + private static class Level1 { + @Id + protected Long id; + + @Field + protected String text; + } + + @Entity + @Indexed + private static class Level2 extends Level1 { + + } + + @Entity + @Indexed + private static class Level3 extends Level2 { + + } +} diff --git a/engine/src/main/java/org/hibernate/search/engine/impl/WorkPlan.java b/engine/src/main/java/org/hibernate/search/engine/impl/WorkPlan.java index b4f95cff170..87010d43c1d 100644 --- a/engine/src/main/java/org/hibernate/search/engine/impl/WorkPlan.java +++ b/engine/src/main/java/org/hibernate/search/engine/impl/WorkPlan.java @@ -8,7 +8,7 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -51,7 +51,11 @@ public class WorkPlan { private static final Log log = LoggerFactory.make(); - private final HashMap, PerClassWork> byClass = new HashMap, PerClassWork>(); + /* + * Using a LinkedHashMap to ensure the order will be stable from one run to another. + * This changes everything when debugging... + */ + private final Map, PerClassWork> byClass = new LinkedHashMap, PerClassWork>(); private final ExtendedSearchIntegrator extendedIntegrator; @@ -164,11 +168,15 @@ class PerClassWork { /** * We further organize work per entity identifier so that we can cancel or adapt work being done * on the same entities. + *

* This map uses as key what we originally received as {@link Work#getId()} if the type * is annotated with @ProvidedId, otherwise it uses the value pointed to by * {@link org.hibernate.search.annotations.DocumentId} or as last attempt {@code javax.persistence.Id}. + *

+ * We use a LinkedHashMap to ensure the order will be stable from one run to another. + * This changes everything when debugging... */ - private final Map entityById = new HashMap(); + private final Map entityById = new LinkedHashMap(); /** * When a PurgeAll operation is send on the type, we can remove all previously scheduled work