Skip to content

Commit

Permalink
HSEARCH-4218 When mass indexing, wait for ID producing threads to finish
Browse files Browse the repository at this point in the history
Previously we were only waiting on indexing threads.

Obviously those would never finish before the identifier producing
threads were done producing identifiers, but they could finish between
the end of the identifier production and the time where the ID producing
threads were actually done with some finalizing tasks.

This could result in some failure notifications to be pushed to the
failure handler *after* startAndWait() returned, which made
MassIndexingIncludedEntityMapHierarchyIT fail from time to time.

Reasons for this change:

1. This won't impact performance much: when indexing threads finish, ID
   producing threads are mostly done anyway (worst case they still need
   to report an exception, but that's all).
2. We will now wait for all threads to finish reporting errors before
   we consider indexing successful.
3. We will now propagate exceptions thrown by ID producing threads to
   the caller of `massIndexer.startAndWait()`, instead of considering
   indexing successful.
  • Loading branch information
yrodiere committed May 7, 2021
1 parent 14f36e2 commit 35cde2f
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 88 deletions.
Expand Up @@ -11,6 +11,7 @@
import static org.assertj.core.api.Fail.fail;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
Expand All @@ -24,6 +25,13 @@
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubLoadingContext;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubMassLoadingStrategy;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.util.rule.JavaBeanMappingSetupHelper;
import org.hibernate.search.mapper.javabean.loading.LoadingTypeGroup;
import org.hibernate.search.mapper.javabean.loading.MassEntityLoader;
import org.hibernate.search.mapper.javabean.loading.MassEntitySink;
import org.hibernate.search.mapper.javabean.loading.MassIdentifierLoader;
import org.hibernate.search.mapper.javabean.loading.MassIdentifierSink;
import org.hibernate.search.mapper.javabean.loading.MassLoadingOptions;
import org.hibernate.search.mapper.javabean.loading.MassLoadingStrategy;
import org.hibernate.search.mapper.javabean.mapping.SearchMapping;
import org.hibernate.search.mapper.javabean.massindexing.MassIndexer;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.DocumentId;
Expand Down Expand Up @@ -62,6 +70,26 @@ public abstract class AbstractMassIndexingErrorIT {

private final StubLoadingContext loadingContext = new StubLoadingContext();

@Test
public void identifierLoading() {
String errorMessage = "ID loading error";

SearchMapping mapping = setupWithThrowingIdentifierLoading( errorMessage );

expectNoFailureHandling();

doMassIndexingWithError(
mapping.scope( Object.class ).massIndexer(),
ThreadExpectation.CREATED_AND_TERMINATED,
throwable -> assertThat( throwable ).isInstanceOf( SimulatedError.class )
.hasMessage( errorMessage ),
expectIndexScaleWork( StubIndexScaleWork.Type.PURGE, ExecutionExpectation.SUCCEED ),
expectIndexScaleWork( StubIndexScaleWork.Type.MERGE_SEGMENTS, ExecutionExpectation.SUCCEED )
);

assertNoFailureHandling();
}

@Test
public void indexing() {
SearchMapping mapping = setup();
Expand Down Expand Up @@ -439,7 +467,52 @@ private Runnable expectIndexingWorks(ExecutionExpectation workTwoExecutionExpect
};
}

private SearchMapping setupWithThrowingIdentifierLoading(String errorMessage) {
return setup( new MassLoadingStrategy<Book, Integer>() {
@Override
public MassIdentifierLoader createIdentifierLoader(LoadingTypeGroup<Book> includedTypes,
MassIdentifierSink<Integer> sink, MassLoadingOptions options) {
return new MassIdentifierLoader() {
@Override
public void close() {
// Nothing to do
}

@Override
public long totalCount() {
return 100;
}

@Override
public void loadNext() {
throw new SimulatedError( errorMessage );
}
};
}

@Override
public MassEntityLoader<Integer> createEntityLoader(LoadingTypeGroup<Book> includedTypes,
MassEntitySink<Book> sink, MassLoadingOptions options) {
return new MassEntityLoader<Integer>() {
@Override
public void close() {
// Nothing to do
}

@Override
public void load(List<Integer> identifiers) {
throw new UnsupportedOperationException( "Should not be called" );
}
};
}
} );
}

private SearchMapping setup() {
return setup( new StubMassLoadingStrategy<>( Book.PERSISTENCE_KEY ) );
}

private SearchMapping setup(MassLoadingStrategy<Book, Integer> loadingStrategy) {
assertBeforeSetup();

backendMock.expectAnySchema( Book.NAME );
Expand All @@ -449,8 +522,7 @@ private SearchMapping setup() {
.withPropertyRadical( EngineSettings.Radicals.BACKGROUND_FAILURE_HANDLER, getBackgroundFailureHandlerReference() )
.withPropertyRadical( EngineSpiSettings.Radicals.THREAD_PROVIDER, threadSpy.getThreadProvider() )
.withConfiguration( b -> {
b.addEntityType( Book.class, c -> c
.massLoadingStrategy( new StubMassLoadingStrategy<>( Book.PERSISTENCE_KEY ) ) );
b.addEntityType( Book.class, c -> c.massLoadingStrategy( loadingStrategy ) );
} )
.setup( Book.class );

Expand Down
Expand Up @@ -11,6 +11,7 @@
import static org.assertj.core.api.Fail.fail;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
Expand All @@ -24,6 +25,13 @@
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubLoadingContext;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubMassLoadingStrategy;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.util.rule.JavaBeanMappingSetupHelper;
import org.hibernate.search.mapper.javabean.loading.LoadingTypeGroup;
import org.hibernate.search.mapper.javabean.loading.MassEntityLoader;
import org.hibernate.search.mapper.javabean.loading.MassEntitySink;
import org.hibernate.search.mapper.javabean.loading.MassIdentifierLoader;
import org.hibernate.search.mapper.javabean.loading.MassIdentifierSink;
import org.hibernate.search.mapper.javabean.loading.MassLoadingOptions;
import org.hibernate.search.mapper.javabean.loading.MassLoadingStrategy;
import org.hibernate.search.mapper.javabean.mapping.SearchMapping;
import org.hibernate.search.mapper.javabean.massindexing.MassIndexer;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.DocumentId;
Expand Down Expand Up @@ -64,6 +72,32 @@ public abstract class AbstractMassIndexingFailureIT {

private final StubLoadingContext loadingContext = new StubLoadingContext();

@Test
public void identifierLoading() {
String exceptionMessage = "ID loading error";

SearchMapping mapping = setupWithThrowingIdentifierLoading( exceptionMessage );

expectMassIndexerIdentifierLoadingFailureHandling(
SimulatedFailure.class, exceptionMessage,
"Fetching identifiers of entities to index for entity '" + Book.NAME + "' during mass indexing"
);

doMassIndexingWithFailure(
mapping.scope( Object.class ).massIndexer(),
ThreadExpectation.CREATED_AND_TERMINATED,
throwable -> assertThat( throwable ).isInstanceOf( SimulatedFailure.class )
.hasMessage( exceptionMessage ),
expectIndexScaleWork( StubIndexScaleWork.Type.PURGE, ExecutionExpectation.SUCCEED ),
expectIndexScaleWork( StubIndexScaleWork.Type.MERGE_SEGMENTS, ExecutionExpectation.SUCCEED )
);

assertMassIndexerIdentifierLoadingFailureHandling(
SimulatedFailure.class, exceptionMessage,
"Fetching identifiers of entities to index for entity '" + Book.NAME + "' during mass indexing"
);
}

@Test
public void indexing() {
SearchMapping mapping = setup();
Expand Down Expand Up @@ -451,6 +485,14 @@ protected abstract void assertMassIndexerOperationFailureHandling(
Class<? extends Throwable> exceptionType, String exceptionMessage,
String failingOperationAsString);

protected abstract void expectMassIndexerIdentifierLoadingFailureHandling(
Class<? extends Throwable> exceptionType, String exceptionMessage,
String failingOperationAsString);

protected abstract void assertMassIndexerIdentifierLoadingFailureHandling(
Class<? extends Throwable> exceptionType, String exceptionMessage,
String failingOperationAsString);

protected abstract void expectEntityIndexingAndMassIndexerOperationFailureHandling(
String entityName, String entityReferenceAsString,
String failingEntityIndexingExceptionMessage, String failingEntityIndexingOperationAsString,
Expand Down Expand Up @@ -636,7 +678,52 @@ private Runnable expectIndexingWorks(ExecutionExpectation workTwoExecutionExpect
};
}

private SearchMapping setupWithThrowingIdentifierLoading(String exceptionMessage) {
return setup( new MassLoadingStrategy<Book, Integer>() {
@Override
public MassIdentifierLoader createIdentifierLoader(LoadingTypeGroup<Book> includedTypes,
MassIdentifierSink<Integer> sink, MassLoadingOptions options) {
return new MassIdentifierLoader() {
@Override
public void close() {
// Nothing to do
}

@Override
public long totalCount() {
return 100;
}

@Override
public void loadNext() {
throw new SimulatedFailure( exceptionMessage );
}
};
}

@Override
public MassEntityLoader<Integer> createEntityLoader(LoadingTypeGroup<Book> includedTypes,
MassEntitySink<Book> sink, MassLoadingOptions options) {
return new MassEntityLoader<Integer>() {
@Override
public void close() {
// Nothing to do
}

@Override
public void load(List<Integer> identifiers) {
throw new UnsupportedOperationException( "Should not be called" );
}
};
}
} );
}

private SearchMapping setup() {
return setup( new StubMassLoadingStrategy<>( Book.PERSISTENCE_KEY ) );
}

private SearchMapping setup(MassLoadingStrategy<Book, Integer> loadingStrategy) {
assertBeforeSetup();

backendMock.expectAnySchema( Book.NAME );
Expand All @@ -646,8 +733,7 @@ private SearchMapping setup() {
.withPropertyRadical( EngineSettings.Radicals.BACKGROUND_FAILURE_HANDLER, getBackgroundFailureHandlerReference() )
.withPropertyRadical( EngineSpiSettings.Radicals.THREAD_PROVIDER, threadSpy.getThreadProvider() )
.withConfiguration( b -> {
b.addEntityType( Book.class, c -> c
.massLoadingStrategy( new StubMassLoadingStrategy<>( Book.PERSISTENCE_KEY ) ) );
b.addEntityType( Book.class, c -> c .massLoadingStrategy( loadingStrategy ) );
} )
.setup( Book.class );

Expand Down
Expand Up @@ -7,6 +7,7 @@
package org.hibernate.search.integrationtest.mapper.pojo.massindexing;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

Expand Down Expand Up @@ -155,6 +156,45 @@ protected void assertMassIndexerOperationFailureHandling(
.isEqualTo( failingOperationAsString );
}

@Override
protected void expectMassIndexerIdentifierLoadingFailureHandling(
Class<? extends Throwable> exceptionType, String exceptionMessage,
String failingOperationAsString) {
// We'll check in the assert*() method, see below.
}

@Override
protected void assertMassIndexerIdentifierLoadingFailureHandling(
Class<? extends Throwable> exceptionType, String exceptionMessage,
String failingOperationAsString) {
verify( failureHandler, times( 3 ) ).handle( genericFailureContextCapture.capture() );
verifyNoMoreInteractions( failureHandler );

// Original failure
FailureContext context = genericFailureContextCapture.getAllValues().get( 0 );
assertThat( context.throwable() )
.isInstanceOf( exceptionType )
.hasMessageContaining( exceptionMessage );
assertThat( context.failingOperation() ).asString()
.isEqualTo( failingOperationAsString );

// ... bubbling up to the workspace
context = genericFailureContextCapture.getAllValues().get( 1 );
assertThat( context.throwable() )
.isInstanceOf( exceptionType )
.hasMessageContaining( exceptionMessage );
assertThat( context.failingOperation() ).asString()
.isEqualTo( "MassIndexer operation" );

// ... bubbling up to the mass indexer
context = genericFailureContextCapture.getAllValues().get( 2 );
assertThat( context.throwable() )
.isInstanceOf( exceptionType )
.hasMessageContaining( exceptionMessage );
assertThat( context.failingOperation() ).asString()
.isEqualTo( "MassIndexer operation" );
}

@Override
protected void expectEntityIndexingAndMassIndexerOperationFailureHandling(String entityName,
String entityReferenceAsString,
Expand Down
Expand Up @@ -7,6 +7,7 @@
package org.hibernate.search.integrationtest.mapper.pojo.massindexing;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

Expand Down Expand Up @@ -155,6 +156,45 @@ protected void assertMassIndexerOperationFailureHandling(
.isEqualTo( failingOperationAsString );
}

@Override
protected void expectMassIndexerIdentifierLoadingFailureHandling(
Class<? extends Throwable> exceptionType, String exceptionMessage,
String failingOperationAsString) {
// We'll check in the assert*() method, see below.
}

@Override
protected void assertMassIndexerIdentifierLoadingFailureHandling(
Class<? extends Throwable> exceptionType, String exceptionMessage,
String failingOperationAsString) {
verify( failureHandler, times( 3 ) ).handle( genericFailureContextCapture.capture() );
verifyNoMoreInteractions( failureHandler );

// Original failure
MassIndexingFailureContext context = genericFailureContextCapture.getAllValues().get( 0 );
assertThat( context.throwable() )
.isInstanceOf( exceptionType )
.hasMessageContaining( exceptionMessage );
assertThat( context.failingOperation() ).asString()
.isEqualTo( failingOperationAsString );

// ... bubbling up to the workspace
context = genericFailureContextCapture.getAllValues().get( 1 );
assertThat( context.throwable() )
.isInstanceOf( exceptionType )
.hasMessageContaining( exceptionMessage );
assertThat( context.failingOperation() ).asString()
.isEqualTo( "MassIndexer operation" );

// ... bubbling up to the mass indexer
context = genericFailureContextCapture.getAllValues().get( 2 );
assertThat( context.throwable() )
.isInstanceOf( exceptionType )
.hasMessageContaining( exceptionMessage );
assertThat( context.failingOperation() ).asString()
.isEqualTo( "MassIndexer operation" );
}

@Override
protected void expectEntityIndexingAndMassIndexerOperationFailureHandling(String entityName,
String entityReferenceAsString,
Expand Down
Expand Up @@ -121,6 +121,35 @@ protected void assertMassIndexerOperationFailureHandling(
// If we get there, everything works fine.
}

@Override
protected void expectMassIndexerIdentifierLoadingFailureHandling(Class<? extends Throwable> exceptionType,
String exceptionMessage, String failingOperationAsString) {
// Original failure
logged.expectEvent(
Level.ERROR,
ExceptionMatcherBuilder.isException( exceptionType )
.withMessage( exceptionMessage )
.build(),
failingOperationAsString
)
.once();
// ... bubbling up to the workspace and mass indexer
logged.expectEvent(
Level.ERROR,
ExceptionMatcherBuilder.isException( exceptionType )
.withMessage( exceptionMessage )
.build(),
"MassIndexer operation"
)
.times( 2 );
}

@Override
protected void assertMassIndexerIdentifierLoadingFailureHandling(Class<? extends Throwable> exceptionType,
String exceptionMessage, String failingOperationAsString) {

}

@Override
protected void expectEntityIndexingAndMassIndexerOperationFailureHandling(String entityName,
String entityReferenceAsString,
Expand Down

0 comments on commit 35cde2f

Please sign in to comment.