Skip to content

Commit

Permalink
HSEARCH-4321 Mass indexing with multiple tenants at the same time
Browse files Browse the repository at this point in the history
  • Loading branch information
marko-bekhta authored and yrodiere committed Jan 3, 2023
1 parent 40f24c0 commit ac2c71a
Show file tree
Hide file tree
Showing 23 changed files with 234 additions and 104 deletions.
Expand Up @@ -18,7 +18,6 @@
import org.hibernate.engine.spi.SessionFactoryImplementor;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.search.engine.backend.session.spi.DetachedBackendSessionContext;
import org.hibernate.search.mapper.orm.loading.impl.HibernateOrmEntityLoadingStrategy;
import org.hibernate.search.mapper.orm.loading.impl.HibernateOrmMassEntityLoader;
import org.hibernate.search.mapper.orm.loading.impl.HibernateOrmMassIdentifierLoader;
Expand All @@ -41,8 +40,6 @@ public final class HibernateOrmMassIndexingContext
implements PojoMassIndexingContext, HibernateOrmMassLoadingOptions {
private final HibernateOrmMassIndexingMappingContext mappingContext;
private final HibernateOrmSessionTypeContextProvider typeContextProvider;
private final DetachedBackendSessionContext sessionContext;

private final Map<Class<?>, ConditionalExpression> conditionalExpressions = new HashMap<>();
private CacheMode cacheMode = CacheMode.IGNORE;
private Integer idLoadingTransactionTimeout;
Expand All @@ -51,11 +48,9 @@ public final class HibernateOrmMassIndexingContext
private long objectsLimit = 0; //means no limit at all

public HibernateOrmMassIndexingContext(HibernateOrmMassIndexingMappingContext mappingContext,
HibernateOrmSessionTypeContextProvider typeContextContainer,
DetachedBackendSessionContext sessionContext) {
HibernateOrmSessionTypeContextProvider typeContextContainer) {
this.mappingContext = mappingContext;
this.typeContextProvider = typeContextContainer;
this.sessionContext = sessionContext;
}

@Override
Expand Down Expand Up @@ -179,7 +174,7 @@ public PojoMassIdentifierLoader createIdentifierLoader(PojoMassIndexingIdentifie
typeContexts, conditionalExpression );
SharedSessionContractImplementor session = (SharedSessionContractImplementor) sessionFactory
.withStatelessOptions()
.tenantIdentifier( sessionContext.tenantIdentifier() )
.tenantIdentifier( context.tenantIdentifier() )
.openStatelessSession();
try {
PojoMassIdentifierSink<I> sink = context.createSink();
Expand All @@ -203,7 +198,7 @@ public PojoMassEntityLoader<I> createEntityLoader(PojoMassIndexingEntityLoadingC
typeContexts, conditionalExpression );
SessionImplementor session = (SessionImplementor) sessionFactory
.withOptions()
.tenantIdentifier( sessionContext.tenantIdentifier() )
.tenantIdentifier( context.tenantIdentifier() )
.openSession();
try {
session.setHibernateFlushMode( FlushMode.MANUAL );
Expand Down
Expand Up @@ -6,6 +6,7 @@
*/
package org.hibernate.search.mapper.orm.scope;

import java.util.Collection;
import java.util.Set;
import java.util.function.Function;

Expand Down Expand Up @@ -158,6 +159,19 @@ public interface SearchScope<E> {
*/
MassIndexer massIndexer(String tenantId);

/**
* Create a {@link MassIndexer} for the indexes mapped to types in this scope, or to any of their sub-types.
* <p>
* This method works for both single- and multi-tenant applications.
* If multi-tenancy is disabled, simply keep the list of tenants empty.
* <p>
* {@link MassIndexer} instances cannot be reused.
*
* @param tenantIds The tenants identifiers whose index content should be targeted. If empty, all tenants will be targeted.
* @return A {@link MassIndexer}.
*/
MassIndexer massIndexer(Collection<String> tenantIds);

/**
* @return A set containing one {@link SearchIndexedEntity} for each indexed entity in this scope.
*/
Expand Down
Expand Up @@ -6,7 +6,11 @@
*/
package org.hibernate.search.mapper.orm.scope.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.hibernate.search.engine.backend.scope.IndexScopeExtension;
import org.hibernate.search.engine.backend.session.spi.DetachedBackendSessionContext;
Expand All @@ -18,9 +22,9 @@
import org.hibernate.search.mapper.orm.common.EntityReference;
import org.hibernate.search.mapper.orm.entity.SearchIndexedEntity;
import org.hibernate.search.mapper.orm.loading.impl.HibernateOrmSelectionLoadingContext;
import org.hibernate.search.mapper.orm.massindexing.impl.HibernateOrmMassIndexingContext;
import org.hibernate.search.mapper.orm.massindexing.MassIndexer;
import org.hibernate.search.mapper.orm.massindexing.impl.HibernateOrmMassIndexer;
import org.hibernate.search.mapper.orm.massindexing.impl.HibernateOrmMassIndexingContext;
import org.hibernate.search.mapper.orm.schema.management.SearchSchemaManager;
import org.hibernate.search.mapper.orm.schema.management.impl.SearchSchemaManagerImpl;
import org.hibernate.search.mapper.orm.scope.SearchScope;
Expand Down Expand Up @@ -89,20 +93,30 @@ public SearchWorkspace workspace(DetachedBackendSessionContext detachedSessionCo

@Override
public MassIndexer massIndexer() {
return massIndexer( (String) null );
return massIndexer( Collections.<String>emptyList() );
}

@Override
public MassIndexer massIndexer(String tenantId) {
return massIndexer( DetachedBackendSessionContext.of( mappingContext, tenantId ) );
return massIndexer( Collections.singletonList( tenantId ) );
}

@Override
public MassIndexer massIndexer(Collection<String> tenantIds) {
return massIndexer( tenantIds.isEmpty() ?
Collections.singletonList( DetachedBackendSessionContext.of( mappingContext, null ) ) :
tenantIds.stream()
.map( id -> DetachedBackendSessionContext.of( mappingContext, id ) )
.collect( Collectors.toList() )
);
}

public MassIndexer massIndexer(DetachedBackendSessionContext detachedSessionContext) {
public MassIndexer massIndexer(List<DetachedBackendSessionContext> detachedSessionContexts) {
HibernateOrmMassIndexingContext massIndexingContext = new HibernateOrmMassIndexingContext( mappingContext,
mappingContext.typeContextProvider(), detachedSessionContext );
mappingContext.typeContextProvider() );

PojoMassIndexer massIndexerDelegate = delegate
.massIndexer( massIndexingContext, detachedSessionContext );
.massIndexer( massIndexingContext, detachedSessionContexts );

return new HibernateOrmMassIndexer( massIndexerDelegate, massIndexingContext );
}
Expand Down
Expand Up @@ -7,6 +7,7 @@
package org.hibernate.search.mapper.orm.session.impl;

import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collection;
import javax.persistence.EntityManager;
import javax.transaction.Synchronization;
Expand Down Expand Up @@ -154,7 +155,7 @@ public SearchWorkspace workspace(Collection<? extends Class<?>> types) {

@Override
public MassIndexer massIndexer(Collection<? extends Class<?>> types) {
return scope( types ).massIndexer( DetachedBackendSessionContext.of( this ) );
return scope( types ).massIndexer( Arrays.asList( DetachedBackendSessionContext.of( this ) ) );
}

@Override
Expand Down
Expand Up @@ -7,6 +7,7 @@
package org.hibernate.search.mapper.pojo.massindexing.impl;

import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -16,12 +17,11 @@
import org.hibernate.search.mapper.pojo.logging.impl.Log;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingFailureHandler;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor;
import org.hibernate.search.mapper.pojo.massindexing.spi.PojoMassIndexer;
import org.hibernate.search.mapper.pojo.massindexing.spi.PojoMassIndexingContext;
import org.hibernate.search.mapper.pojo.massindexing.spi.PojoMassIndexingMappingContext;
import org.hibernate.search.mapper.pojo.massindexing.spi.PojoMassIndexer;

import org.hibernate.search.mapper.pojo.schema.management.spi.PojoScopeSchemaManager;
import org.hibernate.search.mapper.pojo.work.spi.PojoScopeWorkspace;
import org.hibernate.search.mapper.pojo.scope.spi.PojoScopeDelegate;
import org.hibernate.search.util.common.impl.Futures;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;

Expand All @@ -41,8 +41,8 @@ public class PojoDefaultMassIndexer implements PojoMassIndexer {
private final PojoMassIndexingTypeContextProvider typeContextProvider;
private final Set<? extends PojoMassIndexingIndexedTypeContext<?>> targetedIndexedTypes;
private final PojoScopeSchemaManager scopeSchemaManager;
private final DetachedBackendSessionContext detachedSession;
private final PojoScopeWorkspace scopeWorkspace;
private final Collection<DetachedBackendSessionContext> detachedSessions;
private final PojoScopeDelegate<?, ?, ?> pojoScopeDelegate;

// default settings defined here:
private int typesToIndexInParallel = 1;
Expand All @@ -60,15 +60,15 @@ public PojoDefaultMassIndexer(PojoMassIndexingContext indexingContext,
PojoMassIndexingTypeContextProvider typeContextProvider,
Set<? extends PojoMassIndexingIndexedTypeContext<?>> targetedIndexedTypes,
PojoScopeSchemaManager scopeSchemaManager,
DetachedBackendSessionContext detachedSession,
PojoScopeWorkspace scopeWorkspace) {
Collection<DetachedBackendSessionContext> detachedSessions,
PojoScopeDelegate<?, ?, ?> pojoScopeDelegate) {
this.indexingContext = indexingContext;
this.mappingContext = mappingContext;
this.typeContextProvider = typeContextProvider;
this.targetedIndexedTypes = targetedIndexedTypes;
this.scopeSchemaManager = scopeSchemaManager;
this.detachedSession = detachedSession;
this.scopeWorkspace = scopeWorkspace;
this.detachedSessions = detachedSessions;
this.pojoScopeDelegate = pojoScopeDelegate;
}

@Override
Expand Down Expand Up @@ -169,7 +169,8 @@ private PojoMassIndexingBatchCoordinator createCoordinator() {
return new PojoMassIndexingBatchCoordinator(
mappingContext,
notifier,
typeGroupsToIndex, scopeSchemaManager, detachedSession, scopeWorkspace,
typeGroupsToIndex, scopeSchemaManager,
detachedSessions, pojoScopeDelegate,
typesToIndexInParallel, documentBuilderThreads,
mergeSegmentsOnFinish,
// false by default:
Expand Down

0 comments on commit ac2c71a

Please sign in to comment.