Skip to content

Commit

Permalink
HSEARCH-3314 Rework access to per-index resources (write orchestrator…
Browse files Browse the repository at this point in the history
…, readers) to prepare for sharding

Essentially we will need a design that allows work executors and queries
to dynamically retrieve a list of resources based on a set of strings, so
we can't just pass these resources directly to the work
executors/queries.
  • Loading branch information
yrodiere committed Aug 5, 2019
1 parent 8713204 commit 2d71831
Show file tree
Hide file tree
Showing 15 changed files with 184 additions and 45 deletions.
Expand Up @@ -31,14 +31,14 @@
import org.hibernate.search.backend.lucene.work.execution.impl.LuceneIndexWorkExecutor;
import org.hibernate.search.backend.lucene.work.execution.impl.LuceneIndexWorkPlan;
import org.hibernate.search.backend.lucene.work.execution.impl.WorkExecutionBackendContext;
import org.hibernate.search.backend.lucene.work.execution.impl.WorkExecutionIndexManagerContext;
import org.hibernate.search.engine.backend.work.execution.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.work.execution.DocumentRefreshStrategy;
import org.hibernate.search.engine.backend.work.execution.spi.IndexWorkExecutor;
import org.hibernate.search.engine.backend.work.execution.spi.IndexDocumentWorkExecutor;
import org.hibernate.search.engine.backend.work.execution.spi.IndexWorkPlan;
import org.hibernate.search.backend.lucene.document.impl.LuceneRootDocumentBuilder;
import org.hibernate.search.backend.lucene.multitenancy.impl.MultiTenancyStrategy;
import org.hibernate.search.backend.lucene.orchestration.impl.LuceneWriteWorkOrchestrator;
import org.hibernate.search.backend.lucene.work.impl.LuceneWorkFactory;
import org.hibernate.search.engine.common.spi.ErrorHandler;
import org.hibernate.search.engine.mapper.mapping.context.spi.MappingContextImplementor;
Expand Down Expand Up @@ -88,14 +88,15 @@ public String toString() {

@Override
public IndexWorkPlan<LuceneRootDocumentBuilder> createWorkPlan(
LuceneWriteWorkOrchestrator orchestrator,
String indexName, SessionContextImplementor sessionContext,
WorkExecutionIndexManagerContext indexManagerContext,
SessionContextImplementor sessionContext,
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
multiTenancyStrategy.checkTenantId( sessionContext.getTenantIdentifier(), eventContext );

return new LuceneIndexWorkPlan(
workFactory, multiTenancyStrategy, orchestrator,
indexName, sessionContext,
workFactory, multiTenancyStrategy,
indexManagerContext,
sessionContext,
commitStrategy, refreshStrategy
);
}
Expand All @@ -105,31 +106,36 @@ public LuceneWriteWorkOrchestratorImplementor createOrchestrator(String indexNam
IndexWriterDelegator indexWriterDelegator) {
return new LuceneBatchingWriteWorkOrchestrator(
"Lucene write work orchestrator for index " + indexName,
new LuceneWriteWorkProcessor( EventContexts.fromIndexName( indexName ), indexWriterDelegator, errorHandler ),
new LuceneWriteWorkProcessor(
EventContexts.fromIndexName( indexName ),
indexWriterDelegator,
errorHandler
),
errorHandler
);
}

@Override
public IndexDocumentWorkExecutor<LuceneRootDocumentBuilder> createDocumentWorkExecutor(
LuceneWriteWorkOrchestrator orchestrator,
String indexName, SessionContextImplementor sessionContext,
WorkExecutionIndexManagerContext indexManagerContext,
SessionContextImplementor sessionContext,
DocumentCommitStrategy commitStrategy) {
multiTenancyStrategy.checkTenantId( sessionContext.getTenantIdentifier(), eventContext );

return new LuceneIndexDocumentWorkExecutor(
workFactory, multiTenancyStrategy, orchestrator,
indexName, sessionContext,
workFactory, multiTenancyStrategy,
indexManagerContext,
sessionContext,
commitStrategy
);
}

@Override
public IndexWorkExecutor createWorkExecutor(LuceneWriteWorkOrchestrator orchestrator, String indexName,
public IndexWorkExecutor createWorkExecutor(WorkExecutionIndexManagerContext indexManagerContext,
DetachedSessionContextImplementor sessionContext) {
multiTenancyStrategy.checkTenantId( sessionContext.getTenantIdentifier(), eventContext );

return new LuceneIndexWorkExecutor( workFactory, orchestrator, sessionContext );
return new LuceneIndexWorkExecutor( workFactory, indexManagerContext, sessionContext );
}

@Override
Expand Down
Expand Up @@ -11,8 +11,11 @@

import org.hibernate.search.backend.lucene.index.LuceneIndexManager;
import org.hibernate.search.backend.lucene.lowlevel.index.impl.IndexAccessor;
import org.hibernate.search.backend.lucene.lowlevel.reader.spi.IndexReaderHolder;
import org.hibernate.search.backend.lucene.orchestration.impl.LuceneWriteWorkOrchestrator;
import org.hibernate.search.backend.lucene.orchestration.impl.LuceneWriteWorkOrchestratorImplementor;
import org.hibernate.search.backend.lucene.scope.model.impl.LuceneScopeIndexManagerContext;
import org.hibernate.search.backend.lucene.work.execution.impl.WorkExecutionIndexManagerContext;
import org.hibernate.search.engine.backend.work.execution.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.index.IndexManager;
import org.hibernate.search.engine.backend.index.spi.IndexManagerStartContext;
Expand All @@ -23,7 +26,6 @@
import org.hibernate.search.engine.backend.work.execution.spi.IndexWorkPlan;
import org.hibernate.search.backend.lucene.document.impl.LuceneRootDocumentBuilder;
import org.hibernate.search.backend.lucene.document.model.impl.LuceneIndexModel;
import org.hibernate.search.backend.lucene.index.spi.ReaderProvider;
import org.hibernate.search.backend.lucene.logging.impl.Log;
import org.hibernate.search.engine.backend.work.execution.DocumentRefreshStrategy;
import org.hibernate.search.engine.mapper.mapping.context.spi.MappingContextImplementor;
Expand All @@ -36,12 +38,9 @@

import org.apache.lucene.index.IndexReader;


public class LuceneIndexManagerImpl
implements IndexManagerImplementor<LuceneRootDocumentBuilder>, LuceneIndexManager,
LuceneScopeIndexManagerContext,
// TODO HSEARCH-3117 in the end the IndexManager won't implement ReaderProvider as it's far more complex than that
ReaderProvider {
LuceneScopeIndexManagerContext, WorkExecutionIndexManagerContext {

private static final Log log = LoggerFactory.make( Log.class, MethodHandles.lookup() );

Expand Down Expand Up @@ -94,11 +93,21 @@ public void close() {
}
}

@Override
public String getIndexName() {
return indexName;
}

@Override
public LuceneWriteWorkOrchestrator getWriteOrchestrator() {
return writeOrchestrator;
}

@Override
public IndexWorkPlan<LuceneRootDocumentBuilder> createWorkPlan(SessionContextImplementor sessionContext,
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
return backendContext.createWorkPlan(
writeOrchestrator, indexName, sessionContext,
this, sessionContext,
commitStrategy, refreshStrategy
);
}
Expand All @@ -107,14 +116,14 @@ public IndexWorkPlan<LuceneRootDocumentBuilder> createWorkPlan(SessionContextImp
public IndexDocumentWorkExecutor<LuceneRootDocumentBuilder> createDocumentWorkExecutor(
SessionContextImplementor sessionContext, DocumentCommitStrategy commitStrategy) {
return backendContext.createDocumentWorkExecutor(
writeOrchestrator, indexName, sessionContext,
this, sessionContext,
commitStrategy
);
}

@Override
public IndexWorkExecutor createWorkExecutor(DetachedSessionContextImplementor sessionContext) {
return backendContext.createWorkExecutor( writeOrchestrator, indexName, sessionContext );
return backendContext.createWorkExecutor( this, sessionContext );
}

@Override
Expand Down Expand Up @@ -156,6 +165,7 @@ public void closeIndexReader(IndexReader reader) {
}
}


@Override
public IndexManager toAPI() {
return this;
Expand Down
@@ -0,0 +1,17 @@
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.backend.lucene.lowlevel.reader.impl;

import org.hibernate.search.backend.lucene.index.spi.ReaderProvider;

/**
* An interface with knowledge of the index manager internals,
* able to retrieve components related to index reading.
*/
public interface ReadIndexManagerContext extends ReaderProvider {

}
@@ -0,0 +1,42 @@
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.backend.lucene.lowlevel.reader.spi;

import java.io.Closeable;
import java.io.IOException;

import org.apache.lucene.index.IndexReader;

public interface IndexReaderHolder extends Closeable {

/**
* Release any resource currently held by the {@link IndexReaderHolder},
* including (but not limiting to) the reader itself.
* <p>
* After this method has been called, the result of calling any other method on the same instance is undefined.
*
* @throws IOException If an error occurs while releasing resources.
* @throws RuntimeException If an error occurs while releasing resources.
*/
@Override
void close() throws IOException;

/**
* @return The reader held by this {@link IndexReaderHolder}.
*/
IndexReader get();

/**
* @param indexReader The {@link IndexReader} to hold.
* @return An {@link IndexReaderHolder} that returns the given directory when {@link #get()} is called
* and simply closes the directory when {@link #close()} is called.
*/
static IndexReaderHolder of(IndexReader indexReader) {
return new SimpleIndexReaderHolder( indexReader );
}

}
@@ -0,0 +1,29 @@
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.backend.lucene.lowlevel.reader.spi;

import java.io.IOException;

import org.apache.lucene.index.IndexReader;

final class SimpleIndexReaderHolder implements IndexReaderHolder {
private final IndexReader indexReader;

SimpleIndexReaderHolder(IndexReader indexReader) {
this.indexReader = indexReader;
}

@Override
public IndexReader get() {
return indexReader;
}

@Override
public void close() throws IOException {
indexReader.close();
}
}
Expand Up @@ -14,6 +14,6 @@

public interface LuceneReadWorkOrchestrator {

<T> T submit(Set<String> indexNames, Set<? extends ReaderProvider> readerProviders, LuceneReadWork<T> work);
<T> T submit(Set<String> indexNames, Set<? extends ReaderProvider> indexManagerContexts, LuceneReadWork<T> work);

}
Expand Up @@ -15,6 +15,7 @@
import org.hibernate.search.engine.backend.orchestration.spi.AbstractWorkOrchestrator;
import org.hibernate.search.engine.reporting.spi.EventContexts;
import org.hibernate.search.util.common.impl.SuppressingCloser;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;
import org.hibernate.search.util.common.reporting.EventContext;

import org.apache.lucene.index.IndexReader;
Expand Down
Expand Up @@ -6,8 +6,8 @@
*/
package org.hibernate.search.backend.lucene.scope.model.impl;

import org.hibernate.search.backend.lucene.index.spi.ReaderProvider;
import org.hibernate.search.backend.lucene.lowlevel.reader.impl.ReadIndexManagerContext;

public interface LuceneScopeIndexManagerContext extends ReaderProvider {
public interface LuceneScopeIndexManagerContext extends ReadIndexManagerContext {

}
Expand Up @@ -9,7 +9,7 @@
import java.util.Set;

import org.hibernate.search.backend.lucene.analysis.model.impl.LuceneAnalysisDefinitionRegistry;
import org.hibernate.search.backend.lucene.index.spi.ReaderProvider;
import org.hibernate.search.backend.lucene.lowlevel.reader.impl.ReadIndexManagerContext;
import org.hibernate.search.backend.lucene.multitenancy.impl.MultiTenancyStrategy;
import org.hibernate.search.backend.lucene.scope.model.impl.LuceneScopeModel;
import org.hibernate.search.engine.backend.types.converter.runtime.ToDocumentFieldValueConvertContext;
Expand Down Expand Up @@ -60,7 +60,7 @@ public Set<String> getIndexNames() {
return scopeModel.getIndexNames();
}

public Set<? extends ReaderProvider> getReaderProviders() {
public Set<? extends ReadIndexManagerContext> getIndexManagerContexts() {
return scopeModel.getIndexManagerContexts();
}

Expand Down
Expand Up @@ -92,7 +92,7 @@ public LuceneSearchResult<H> fetch(Integer limit, Integer offset) {
offset, limit,
luceneCollectorProvider, searchResultExtractor
);
return queryOrchestrator.submit( searchContext.getIndexNames(), searchContext.getReaderProviders(), work )
return doSubmit( work )
/*
* WARNING: the following call must run in the user thread.
* If we introduce async processing, we will have to add a loadAsync method here,
Expand All @@ -112,8 +112,7 @@ public long fetchTotalHitCount() {
( luceneCollectorBuilder -> { } ),
searchResultExtractor
);
return queryOrchestrator.submit( searchContext.getIndexNames(), searchContext.getReaderProviders(), work )
.getHitCount();
return doSubmit( work ).getHitCount();
}

@Override
Expand Down Expand Up @@ -141,6 +140,14 @@ public Explanation explain(String indexName, String id) {
return doExplain( indexName, id );
}

private <T> T doSubmit(LuceneReadWork<T> work) {
return queryOrchestrator.submit(
searchContext.getIndexNames(),
searchContext.getIndexManagerContexts(),
work
);
}

private Explanation doExplain(String indexName, String id) {
Query explainedDocumentQuery = new BooleanQuery.Builder()
.add( new TermQuery( new Term( LuceneFields.indexFieldName(), indexName ) ), BooleanClause.Occur.MUST )
Expand All @@ -153,6 +160,6 @@ private Explanation doExplain(String indexName, String id) {
LuceneReadWork<Explanation> work = workFactory.explain(
luceneQuery, indexName, id, explainedDocumentQuery
);
return queryOrchestrator.submit( searchContext.getIndexNames(), searchContext.getReaderProviders(), work );
return doSubmit( work );
}
}
Expand Up @@ -30,13 +30,13 @@ public class LuceneIndexDocumentWorkExecutor implements IndexDocumentWorkExecuto
private final DocumentCommitStrategy commitStrategy;

public LuceneIndexDocumentWorkExecutor(LuceneWorkFactory factory, MultiTenancyStrategy multiTenancyStrategy,
LuceneWriteWorkOrchestrator orchestrator,
String indexName, SessionContextImplementor sessionContext,
WorkExecutionIndexManagerContext indexManagerContext,
SessionContextImplementor sessionContext,
DocumentCommitStrategy commitStrategy) {
this.factory = factory;
this.multiTenancyStrategy = multiTenancyStrategy;
this.orchestrator = orchestrator;
this.indexName = indexName;
this.orchestrator = indexManagerContext.getWriteOrchestrator();
this.indexName = indexManagerContext.getIndexName();
this.tenantId = sessionContext.getTenantIdentifier();
this.commitStrategy = commitStrategy;
}
Expand Down
Expand Up @@ -22,10 +22,10 @@ public class LuceneIndexWorkExecutor implements IndexWorkExecutor {
private final DetachedSessionContextImplementor sessionContext;

public LuceneIndexWorkExecutor(LuceneWorkFactory factory,
LuceneWriteWorkOrchestrator orchestrator,
WorkExecutionIndexManagerContext indexManagerContext,
DetachedSessionContextImplementor sessionContext) {
this.factory = factory;
this.orchestrator = orchestrator;
this.orchestrator = indexManagerContext.getWriteOrchestrator();
this.sessionContext = sessionContext;
}

Expand Down
Expand Up @@ -38,13 +38,13 @@ public class LuceneIndexWorkPlan implements IndexWorkPlan<LuceneRootDocumentBuil
private final List<LuceneWriteWork<?>> works = new ArrayList<>();

public LuceneIndexWorkPlan(LuceneWorkFactory factory, MultiTenancyStrategy multiTenancyStrategy,
LuceneWriteWorkOrchestrator orchestrator,
String indexName, SessionContextImplementor sessionContext,
WorkExecutionIndexManagerContext indexManagerContext,
SessionContextImplementor sessionContext,
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
this.factory = factory;
this.multiTenancyStrategy = multiTenancyStrategy;
this.orchestrator = orchestrator;
this.indexName = indexName;
this.orchestrator = indexManagerContext.getWriteOrchestrator();
this.indexName = indexManagerContext.getIndexName();
this.tenantId = sessionContext.getTenantIdentifier();
this.commitStrategy = commitStrategy;
this.refreshStrategy = refreshStrategy;
Expand Down

0 comments on commit 2d71831

Please sign in to comment.