Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.hibernate.search.engine.common.timing.spi.TimingSource;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.engine.search.loading.spi.SearchLoadingContextBuilder;
import org.hibernate.search.util.common.impl.SuppressingCloser;
import org.hibernate.search.util.common.reporting.EventContext;

import org.apache.lucene.search.similarities.Similarity;
Expand Down Expand Up @@ -196,38 +195,19 @@ LuceneIndexSchemaManager createSchemaManager(SchemaManagementIndexManagerContext
return new LuceneIndexSchemaManager( workFactory, context );
}

Shard createShard(LuceneIndexModel model, EventContext shardEventContext, DirectoryHolder directoryHolder,
IOStrategy ioStrategy, ConfigurationPropertySource propertySource) {
LuceneParallelWorkOrchestratorImpl managementOrchestrator;
LuceneSerialWorkOrchestratorImpl indexingOrchestrator;
IndexAccessorImpl indexAccessor = null;
IndexAccessorImpl createIndexAccessor(LuceneIndexModel model, EventContext shardEventContext,
DirectoryHolder directoryHolder, IOStrategy ioStrategy,
ConfigurationPropertySource propertySource) {
String indexName = model.hibernateSearchName();
IndexWriterConfigSource writerConfigSource = IndexWriterConfigSource.create(
similarity, model.getIndexingAnalyzer(), propertySource, shardEventContext
);

try {
indexAccessor = ioStrategy.createIndexAccessor(
indexName, shardEventContext, directoryHolder, writerConfigSource
);
managementOrchestrator = createIndexManagementOrchestrator( shardEventContext, indexAccessor );
indexingOrchestrator = createIndexingOrchestrator( shardEventContext, indexAccessor );

Shard shard = new Shard(
shardEventContext, indexAccessor,
managementOrchestrator, indexingOrchestrator
);
return shard;
}
catch (RuntimeException e) {
new SuppressingCloser( e )
// No need to stop the orchestrators, we didn't start them
.push( indexAccessor );
throw e;
}
return ioStrategy.createIndexAccessor(
indexName, shardEventContext, directoryHolder, writerConfigSource
);
}

private LuceneParallelWorkOrchestratorImpl createIndexManagementOrchestrator(EventContext eventContext,
LuceneParallelWorkOrchestratorImpl createIndexManagementOrchestrator(EventContext eventContext,
IndexAccessorImpl indexAccessor) {
return new LuceneParallelWorkOrchestratorImpl(
"Lucene index management orchestrator for " + eventContext.render(),
Expand All @@ -237,7 +217,7 @@ private LuceneParallelWorkOrchestratorImpl createIndexManagementOrchestrator(Eve
);
}

private LuceneSerialWorkOrchestratorImpl createIndexingOrchestrator(EventContext eventContext,
LuceneSerialWorkOrchestratorImpl createIndexingOrchestrator(EventContext eventContext,
IndexAccessorImpl indexAccessor) {
return new LuceneSerialWorkOrchestratorImpl(
"Lucene indexing orchestrator for " + eventContext.render(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.hibernate.search.engine.backend.scope.spi.IndexScopeBuilder;
import org.hibernate.search.engine.backend.session.spi.BackendSessionContext;
import org.hibernate.search.engine.backend.session.spi.DetachedBackendSessionContext;
import org.hibernate.search.engine.common.resources.spi.SavedState;
import org.hibernate.search.engine.backend.work.execution.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.work.execution.DocumentRefreshStrategy;
import org.hibernate.search.engine.backend.work.execution.spi.IndexIndexer;
Expand All @@ -47,6 +48,8 @@ public class LuceneIndexManagerImpl
implements IndexManagerImplementor, LuceneIndexManager,
LuceneScopeIndexManagerContext {

private static final SavedState.Key<SavedState> SHARD_HOLDER_KEY = SavedState.key( "shard_holder" );

private static final Log log = LoggerFactory.make( Log.class, MethodHandles.lookup() );

private final IndexManagerBackendContext backendContext;
Expand Down Expand Up @@ -80,6 +83,18 @@ public String toString() {
.toString();
}

@Override
public SavedState saveForRestart() {
return SavedState.builder()
.put( SHARD_HOLDER_KEY, shardHolder.saveForRestart() )
.build();
}

@Override
public void preStart(IndexManagerStartContext context, SavedState savedState) {
shardHolder.preStart( context, savedState.get( SHARD_HOLDER_KEY ).orElse( SavedState.empty() ) );
}

@Override
public void start(IndexManagerStartContext context) {
shardHolder.start( context );
Expand Down Expand Up @@ -135,7 +150,7 @@ public IndexScopeBuilder createScopeBuilder(BackendMappingContext mappingContext

@Override
public void addTo(IndexScopeBuilder builder) {
if ( ! ( builder instanceof LuceneIndexScopeBuilder ) ) {
if ( !( builder instanceof LuceneIndexScopeBuilder ) ) {
throw log.cannotMixLuceneScopeWithOtherType(
builder, this, backendContext.getEventContext()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,56 +8,116 @@

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.hibernate.search.backend.lucene.cfg.LuceneIndexSettings;
import org.hibernate.search.backend.lucene.document.model.impl.LuceneIndexModel;
import org.hibernate.search.backend.lucene.logging.impl.Log;
import org.hibernate.search.backend.lucene.lowlevel.directory.impl.DirectoryCreationContextImpl;
import org.hibernate.search.backend.lucene.lowlevel.directory.spi.DirectoryCreationContext;
import org.hibernate.search.backend.lucene.lowlevel.directory.spi.DirectoryHolder;
import org.hibernate.search.backend.lucene.lowlevel.directory.spi.DirectoryProvider;
import org.hibernate.search.backend.lucene.lowlevel.index.impl.IOStrategy;
import org.hibernate.search.backend.lucene.lowlevel.index.impl.IndexAccessorImpl;
import org.hibernate.search.backend.lucene.orchestration.impl.LuceneParallelWorkOrchestrator;
import org.hibernate.search.backend.lucene.orchestration.impl.LuceneParallelWorkOrchestratorImpl;
import org.hibernate.search.backend.lucene.orchestration.impl.LuceneSerialWorkOrchestrator;
import org.hibernate.search.backend.lucene.orchestration.impl.LuceneSerialWorkOrchestratorImpl;
import org.hibernate.search.engine.cfg.spi.ConfigurationProperty;
import org.hibernate.search.engine.common.resources.spi.SavedState;
import org.hibernate.search.engine.cfg.ConfigurationPropertySource;
import org.hibernate.search.engine.environment.bean.BeanHolder;
import org.hibernate.search.engine.environment.bean.BeanReference;
import org.hibernate.search.engine.environment.bean.BeanResolver;
import org.hibernate.search.engine.reporting.spi.EventContexts;
import org.hibernate.search.util.common.impl.Closer;
import org.hibernate.search.util.common.impl.SuppressingCloser;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;
import org.hibernate.search.util.common.reporting.EventContext;

import org.apache.lucene.index.DirectoryReader;

public final class Shard {

private static final ConfigurationProperty<BeanReference<? extends DirectoryProvider>> DIRECTORY_TYPE =
ConfigurationProperty.forKey( LuceneIndexSettings.DIRECTORY_TYPE )
.asBeanReference( DirectoryProvider.class )
.withDefault( BeanReference.of( DirectoryProvider.class, LuceneIndexSettings.Defaults.DIRECTORY_TYPE ) )
.build();

private static final SavedState.Key<DirectoryHolder> DIRECTORY_HOLDER_KEY = SavedState.key( "directory_holder" );

private static final Log log = LoggerFactory.make( Log.class, MethodHandles.lookup() );

private final EventContext eventContext;
private final IndexAccessorImpl indexAccessor;
private final LuceneParallelWorkOrchestratorImpl managementOrchestrator;
private final LuceneSerialWorkOrchestratorImpl indexingOrchestrator;

Shard(EventContext eventContext, IndexAccessorImpl indexAccessor,
LuceneParallelWorkOrchestratorImpl managementOrchestrator,
LuceneSerialWorkOrchestratorImpl indexingOrchestrator) {
this.eventContext = eventContext;
this.indexAccessor = indexAccessor;
this.managementOrchestrator = managementOrchestrator;
this.indexingOrchestrator = indexingOrchestrator;
private final Optional<String> shardId;
private final IndexManagerBackendContext backendContext;
private final LuceneIndexModel model;

private DirectoryHolder directoryHolder;
private IndexAccessorImpl indexAccessor;
private LuceneParallelWorkOrchestratorImpl managementOrchestrator;
private LuceneSerialWorkOrchestratorImpl indexingOrchestrator;

private boolean savedForRestart = false;

Shard(Optional<String> shardId, IndexManagerBackendContext backendContext, LuceneIndexModel model) {
this.shardId = shardId;
this.backendContext = backendContext;
this.model = model;
}

public SavedState saveForRestart() {
try {
return SavedState.builder()
.put( DIRECTORY_HOLDER_KEY, directoryHolder, DirectoryHolder::close )
.build();
}
finally {
savedForRestart = true;
}
}

void preStart(ConfigurationPropertySource propertySource, BeanResolver beanResolver, SavedState savedState) {
Optional<DirectoryHolder> savedDirectoryHolder = savedState.get( Shard.DIRECTORY_HOLDER_KEY );
try {
if ( savedDirectoryHolder.isPresent() ) {
directoryHolder = savedDirectoryHolder.get();
}
else {
try ( BeanHolder<? extends DirectoryProvider> directoryProviderHolder =
DIRECTORY_TYPE.getAndTransform( propertySource, beanResolver::resolve ) ) {
String indexName = model.hibernateSearchName();
EventContext indexAndShardEventContext = EventContexts.fromIndexNameAndShardId( indexName, shardId );
DirectoryCreationContext context = new DirectoryCreationContextImpl( indexAndShardEventContext,
indexName, shardId, beanResolver,
propertySource.withMask( "directory" ) );
directoryHolder = directoryProviderHolder.get().createDirectoryHolder( context );
}
directoryHolder.start();
}
}
catch (IOException | RuntimeException e) {
throw log.unableToStartShard( e.getMessage(), e );
}
}

void start(ConfigurationPropertySource propertySource) {
String indexName = model.hibernateSearchName();
EventContext indexAndShardEventContext = EventContexts.fromIndexNameAndShardId( indexName, shardId );
try {
indexAccessor.start();
IOStrategy ioStrategy = backendContext.createIOStrategy( propertySource );
indexAccessor = backendContext.createIndexAccessor( model, indexAndShardEventContext, directoryHolder,
ioStrategy, propertySource );
managementOrchestrator =
backendContext.createIndexManagementOrchestrator( indexAndShardEventContext, indexAccessor );
indexingOrchestrator =
backendContext.createIndexingOrchestrator( indexAndShardEventContext, indexAccessor );

managementOrchestrator.start( propertySource );
indexingOrchestrator.start( propertySource );
}
catch (IOException | RuntimeException e) {
new SuppressingCloser( e )
.push( indexAccessor )
.push( LuceneSerialWorkOrchestratorImpl::stop, indexingOrchestrator )
.push( LuceneParallelWorkOrchestratorImpl::stop, managementOrchestrator );
throw log.unableToInitializeIndexDirectory(
e.getMessage(),
eventContext,
e
);
catch (RuntimeException e) {
throw log.unableToStartShard( e.getMessage(), e );
}
}

Expand All @@ -66,11 +126,21 @@ CompletableFuture<?> preStop() {
}

void stop() {
try ( Closer<RuntimeException> closer = new Closer<>() ) {
try ( Closer<IOException> closer = new Closer<>() ) {
closer.push( LuceneSerialWorkOrchestratorImpl::stop, indexingOrchestrator );
closer.push( LuceneParallelWorkOrchestratorImpl::stop, managementOrchestrator );
// Close the index writer after the orchestrators, when we're sure all works have been performed
closer.push( IndexAccessorImpl::close, indexAccessor );
if ( !savedForRestart ) {
closer.push( DirectoryHolder::close, directoryHolder );
}
}
catch (RuntimeException | IOException e) {
throw log.unableToShutdownShard(
e.getMessage(),
shardId.map( EventContexts::fromShardId ).orElse( null ),
e
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.hibernate.search.backend.lucene.cfg.LuceneIndexSettings;
import org.hibernate.search.backend.lucene.document.model.impl.LuceneIndexModel;
import org.hibernate.search.backend.lucene.index.spi.ShardingStrategy;
import org.hibernate.search.backend.lucene.lowlevel.reader.impl.DirectoryReaderCollector;
Expand All @@ -25,14 +28,18 @@
import org.hibernate.search.backend.lucene.schema.management.impl.SchemaManagementIndexManagerContext;
import org.hibernate.search.backend.lucene.work.execution.impl.WorkExecutionIndexManagerContext;
import org.hibernate.search.engine.backend.index.spi.IndexManagerStartContext;
import org.hibernate.search.engine.common.resources.spi.SavedState;
import org.hibernate.search.engine.cfg.ConfigurationPropertySource;
import org.hibernate.search.engine.environment.bean.BeanHolder;
import org.hibernate.search.engine.reporting.spi.EventContexts;
import org.hibernate.search.util.common.impl.Closer;
import org.hibernate.search.util.common.impl.SuppressingCloser;

class ShardHolder implements ReadIndexManagerContext, WorkExecutionIndexManagerContext,
SchemaManagementIndexManagerContext {

private static final SavedState.Key<Map<String, SavedState>> SHARDS_KEY = SavedState.key( "shards" );

private final IndexManagerBackendContext backendContext;
private final LuceneIndexModel model;

Expand All @@ -50,28 +57,74 @@ public String toString() {
return getClass().getSimpleName() + "[indexName=" + model.hibernateSearchName() + "]";
}

void start(IndexManagerStartContext startContext) {
ConfigurationPropertySource propertySource = startContext.configurationPropertySource();
public SavedState saveForRestart() {
HashMap<String, SavedState> states = new HashMap<>();
for ( Map.Entry<String, Shard> shard : shards.entrySet() ) {
states.put( shard.getKey(), shard.getValue().saveForRestart() );
}
return SavedState.builder().put( SHARDS_KEY, states ).build();
}

private ConfigurationPropertySource toShardPropertySource(ConfigurationPropertySource indexPropertySource, String shardIdOrNull) {
return shardIdOrNull != null
? indexPropertySource.withMask( LuceneIndexSettings.SHARDS ).withMask( shardIdOrNull )
.withFallback( indexPropertySource )
: indexPropertySource;
}

void preStart(IndexManagerStartContext startContext, SavedState savedState) {
ConfigurationPropertySource indexPropertySource = startContext.configurationPropertySource();
try {
ShardingStrategyInitializationContextImpl initializationContext =
new ShardingStrategyInitializationContextImpl( backendContext, model, startContext, propertySource );
new ShardingStrategyInitializationContextImpl( backendContext, model, startContext, indexPropertySource );
Map<String, SavedState> states = savedState.get( SHARDS_KEY ).orElse( Collections.emptyMap() );

this.shardingStrategyHolder = initializationContext.create( shards );

if ( startContext.failureCollector().hasFailure() ) {
// At least one shard creation failed; abort and don't even try to start shards.
return;
for ( Map.Entry<String, Shard> entry : shards.entrySet() ) {
String shardId = entry.getKey();
Shard shard = entry.getValue();
ConfigurationPropertySource shardPropertySource = toShardPropertySource( indexPropertySource, shardId );
try {
shard.preStart( shardPropertySource, startContext.beanResolver(),
states.getOrDefault( entry.getKey(), SavedState.empty() ) );
}
catch (RuntimeException e) {
startContext.failureCollector()
.withContext( shardId == null ? null : EventContexts.fromShardId( shardId ) )
.add( e );
}
}
}
catch (RuntimeException e) {
new SuppressingCloser( e )
.pushAll( Shard::stop, shards.values() );
shards.clear();
throw e;
}
}

for ( Shard shard : shards.values() ) {
shard.start( propertySource );
managementOrchestrators.add( shard.managementOrchestrator() );
void start(IndexManagerStartContext startContext) {
ConfigurationPropertySource indexPropertySource = startContext.configurationPropertySource();
try {
for ( Map.Entry<String, Shard> entry : shards.entrySet() ) {
String shardId = entry.getKey();
Shard shard = entry.getValue();
ConfigurationPropertySource shardPropertySource = toShardPropertySource( indexPropertySource, shardId );
try {
shard.start( shardPropertySource );
managementOrchestrators.add( shard.managementOrchestrator() );
}
catch (RuntimeException e) {
startContext.failureCollector()
.withContext( shardId == null ? null : EventContexts.fromShardId( shardId ) )
.add( e );
}
}
}
catch (RuntimeException e) {
new SuppressingCloser( e )
.pushAll( Shard::stop, shards.values() );
shards.clear();
managementOrchestrators.clear();
throw e;
}
Expand Down
Loading