Skip to content

Commit

Permalink
HSEARCH-3269 Wait for all indexing futures before flush
Browse files Browse the repository at this point in the history
  • Loading branch information
fax4ever committed Oct 15, 2020
1 parent da010c7 commit a461aa8
Showing 1 changed file with 34 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.batch.api.BatchProperty;
import javax.batch.api.chunk.AbstractItemWriter;
import javax.batch.runtime.context.JobContext;
Expand All @@ -32,6 +33,7 @@
import org.hibernate.search.mapper.pojo.model.spi.PojoRawTypeIdentifier;
import org.hibernate.search.mapper.pojo.work.spi.PojoIndexer;
import org.hibernate.search.util.common.SearchException;
import org.hibernate.search.util.common.impl.Futures;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -105,9 +107,7 @@ public void writeItems(List<Object> entities) throws Exception {

PojoIndexer indexer = mappingContext.sessionContext( session ).createIndexer();

for ( Object entity : entities ) {
writeItem( indexer, entity );
}
indexAndWaitForCompletion( entities, indexer );

/*
* Flush after each write operation
Expand All @@ -131,23 +131,40 @@ public void writeItems(List<Object> entities) throws Exception {
log.closingEntityWriter( partitionIdStr, entityName );
}

private void writeItem(PojoIndexer indexer, Object entity) {
private void indexAndWaitForCompletion(List<Object> entities, PojoIndexer indexer) {
if ( entities == null || entities.isEmpty() ) {
return;
}

CompletableFuture<?>[] indexingFutures = new CompletableFuture<?>[entities.size()];
for ( int i = 0; i < entities.size(); i++ ) {
indexingFutures[i] = writeItem( indexer, entities.get( i ) );
}

try {
Futures.unwrappedExceptionGet( CompletableFuture.allOf( indexingFutures ) );
}
catch (InterruptedException e) {
// mark current thread interrupted and raise the exception to propagate the error up
Thread.currentThread().interrupt();
throw new RuntimeException( "Writer thread was interrupted", e );
}
}

private CompletableFuture<?> writeItem(PojoIndexer indexer, Object entity) {
log.processEntity( entity );

switch ( writeMode ) {
case ADD:
indexer.add( typeIdentifier, null, null, entity,
// Commit and refresh are handled globally after all documents are indexed.
DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
);
break;
case UPDATE:
indexer.addOrUpdate( typeIdentifier, null, null, entity,
// Commit and refresh are handled globally after all documents are indexed.
DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
);
break;
if ( WriteMode.ADD.equals( writeMode ) ) {
return indexer.add( typeIdentifier, null, null, entity,
// Commit and refresh are handled globally after all documents are indexed.
DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
);
}

return indexer.addOrUpdate( typeIdentifier, null, null, entity,
// Commit and refresh are handled globally after all documents are indexed.
DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
);
}

private enum WriteMode {
Expand Down

0 comments on commit a461aa8

Please sign in to comment.