Skip to content

Commit

Permalink
HSEARCH-2632 Improve deserialization inside the batch job
Browse files Browse the repository at this point in the history
  • Loading branch information
mincong-h authored and Sanne committed Oct 25, 2017
1 parent 7d99a9b commit 9a8662d
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 46 deletions.
Expand Up @@ -18,8 +18,11 @@
import org.hibernate.search.jsr352.massindexing.MassIndexingJobParameters;
import org.hibernate.search.jsr352.massindexing.impl.JobContextData;
import org.hibernate.search.jsr352.massindexing.impl.util.PersistenceUtil;
import org.hibernate.search.jsr352.massindexing.impl.util.SerializationUtil;
import org.hibernate.search.util.logging.impl.LoggerFactory;

import static org.hibernate.search.jsr352.massindexing.MassIndexingJobParameters.OPTIMIZE_ON_FINISH;

/**
* Enhancements after the chunk step {@code produceLuceneDoc} (lucene document production)
*
Expand All @@ -34,7 +37,7 @@ public class AfterChunkBatchlet extends AbstractBatchlet {

@Inject
@BatchProperty(name = MassIndexingJobParameters.OPTIMIZE_ON_FINISH)
private String optimizeOnFinish;
private String serializedOptimizeOnFinish;

@Inject
@BatchProperty(name = MassIndexingJobParameters.TENANT_ID)
Expand All @@ -44,7 +47,9 @@ public class AfterChunkBatchlet extends AbstractBatchlet {

@Override
public String process() throws Exception {
if ( Boolean.parseBoolean( this.optimizeOnFinish ) ) {
boolean optimizeOnFinish = SerializationUtil.parseBooleanParameter( OPTIMIZE_ON_FINISH, serializedOptimizeOnFinish );

if ( optimizeOnFinish ) {
log.startOptimization();

JobContextData jobData = (JobContextData) jobContext.getTransientUserData();
Expand Down
Expand Up @@ -20,8 +20,12 @@
import org.hibernate.search.jsr352.massindexing.MassIndexingJobParameters;
import org.hibernate.search.jsr352.massindexing.impl.JobContextData;
import org.hibernate.search.jsr352.massindexing.impl.util.PersistenceUtil;
import org.hibernate.search.jsr352.massindexing.impl.util.SerializationUtil;
import org.hibernate.search.util.logging.impl.LoggerFactory;

import static org.hibernate.search.jsr352.massindexing.MassIndexingJobParameters.OPTIMIZE_AFTER_PURGE;
import static org.hibernate.search.jsr352.massindexing.MassIndexingJobParameters.PURGE_ALL_ON_START;

/**
* Enhancements before the chunk step {@code produceLuceneDoc} (lucene document production)
*
Expand All @@ -36,11 +40,11 @@ public class BeforeChunkBatchlet extends AbstractBatchlet {

@Inject
@BatchProperty(name = MassIndexingJobParameters.PURGE_ALL_ON_START)
private String purgeAllOnStart;
private String serializedPurgeAllOnStart;

@Inject
@BatchProperty(name = MassIndexingJobParameters.OPTIMIZE_AFTER_PURGE)
private String optimizeAfterPurge;
private String serializedOptimizeAfterPurge;

@Inject
@BatchProperty(name = MassIndexingJobParameters.TENANT_ID)
Expand All @@ -51,14 +55,17 @@ public class BeforeChunkBatchlet extends AbstractBatchlet {

@Override
public String process() throws Exception {
if ( Boolean.parseBoolean( this.purgeAllOnStart ) ) {
boolean purgeAllOnStart = SerializationUtil.parseBooleanParameter( PURGE_ALL_ON_START, serializedPurgeAllOnStart );
boolean optimizeAfterPurge = SerializationUtil.parseBooleanParameter( OPTIMIZE_AFTER_PURGE, serializedOptimizeAfterPurge );

if ( purgeAllOnStart ) {
JobContextData jobData = (JobContextData) jobContext.getTransientUserData();
EntityManagerFactory emf = jobData.getEntityManagerFactory();
session = PersistenceUtil.openSession( emf, tenantId );
fts = Search.getFullTextSession( session );
jobData.getEntityTypes().forEach( clz -> fts.purgeAll( clz ) );

if ( Boolean.parseBoolean( this.optimizeAfterPurge ) ) {
if ( optimizeAfterPurge ) {
log.startOptimization();
ContextHelper.getSearchIntegrator( session ).optimize();
}
Expand Down
Expand Up @@ -13,6 +13,9 @@
import javax.inject.Inject;

import org.hibernate.search.jsr352.massindexing.MassIndexingJobParameters;
import org.hibernate.search.jsr352.massindexing.impl.util.SerializationUtil;

import static org.hibernate.search.jsr352.massindexing.MassIndexingJobParameters.CHECKPOINT_INTERVAL;

/**
* This checkpoint algorithm is used to provide a checkpoint decision based on the item count N given by the user. So,
Expand All @@ -31,14 +34,15 @@ public class CheckpointAlgorithm extends AbstractCheckpointAlgorithm {

@Inject
@BatchProperty(name = MassIndexingJobParameters.CHECKPOINT_INTERVAL)
private String itemCount;
private String serializedCheckpointInterval;

@Override
public boolean isReadyToCheckpoint() throws Exception {
int checkpointInterval = SerializationUtil.parseIntegerParameter( CHECKPOINT_INTERVAL, serializedCheckpointInterval );
Metric[] metrics = stepContext.getMetrics();
for ( final Metric m : metrics ) {
if ( m.getType().equals( Metric.MetricType.READ_COUNT ) ) {
return m.getValue() % Integer.parseInt( itemCount ) == 0;
return m.getValue() % checkpointInterval == 0;
}
}
throw new Exception( "Metric READ_COUNT not found" );
Expand Down
Expand Up @@ -37,6 +37,11 @@
import org.hibernate.search.jsr352.massindexing.impl.util.SerializationUtil;
import org.hibernate.search.util.logging.impl.LoggerFactory;

import static org.hibernate.search.jsr352.massindexing.MassIndexingJobParameters.CACHEABLE;
import static org.hibernate.search.jsr352.massindexing.MassIndexingJobParameters.CUSTOM_QUERY_LIMIT;
import static org.hibernate.search.jsr352.massindexing.MassIndexingJobParameters.FETCH_SIZE;
import static org.hibernate.search.jsr352.massindexing.impl.util.MassIndexingPartitionProperties.PARTITION_ID;

/**
* Entity reader reads entities from database. During the open of the read stream, this reader builds a scrollable
* result. Then, it scrolls from one entity to another at each reading. An entity reader reaches its end when there’s no
Expand Down Expand Up @@ -76,7 +81,7 @@ public class EntityReader extends AbstractItemReader {

@Inject
@BatchProperty(name = MassIndexingJobParameters.ENTITY_TYPES)
private String entityTypes;
private String serializedEntityTypes;

@Inject
@BatchProperty(name = MassIndexingJobParameters.CUSTOM_QUERY_CRITERIA)
Expand All @@ -90,31 +95,31 @@ public class EntityReader extends AbstractItemReader {

@Inject
@BatchProperty(name = MassIndexingJobParameters.CACHEABLE)
private String cacheable;
private String serializedCacheable;

@Inject
@BatchProperty(name = MassIndexingPartitionProperties.ENTITY_NAME)
private String entityName;

@Inject
@BatchProperty(name = MassIndexingJobParameters.FETCH_SIZE)
private String fetchSize;
private String serializedFetchSize;

@Inject
@BatchProperty(name = MassIndexingJobParameters.CUSTOM_QUERY_HQL)
private String customQueryHql;

@Inject
@BatchProperty(name = MassIndexingJobParameters.CUSTOM_QUERY_LIMIT)
private String customQueryLimit;
private String serializedCustomQueryLimit;

@Inject
@BatchProperty(name = MassIndexingJobParameters.TENANT_ID)
private String tenantId;

@Inject
@BatchProperty(name = MassIndexingPartitionProperties.PARTITION_ID)
private String partitionIdStr;
private String serializedPartitionId;

@Inject
@BatchProperty(name = MassIndexingPartitionProperties.LOWER_BOUND)
Expand All @@ -138,20 +143,20 @@ public EntityReader() {
/**
* Constructor for unit test TODO should it be done in this way?
*/
EntityReader(String cacheable,
EntityReader(String serializedCacheable,
String entityName,
String fetchSize,
String serializedFetchSize,
String hql,
String maxResults,
String serializedCustomQueryLimit,
String partitionIdStr,
String serializedLowerBound,
String serializedUpperBound) {
this.cacheable = cacheable;
this.serializedCacheable = serializedCacheable;
this.entityName = entityName;
this.fetchSize = fetchSize;
this.serializedFetchSize = serializedFetchSize;
this.customQueryHql = hql;
this.customQueryLimit = maxResults;
this.partitionIdStr = partitionIdStr;
this.serializedCustomQueryLimit = serializedCustomQueryLimit;
this.serializedPartitionId = partitionIdStr;
this.serializedLowerBound = serializedLowerBound;
this.serializedUpperBound = serializedUpperBound;
}
Expand All @@ -177,7 +182,7 @@ public Serializable checkpointInfo() throws Exception {
*/
@Override
public void close() throws Exception {
log.closingReader( partitionIdStr, entityName );
log.closingReader( serializedPartitionId, entityName );
try {
scroll.close();
}
Expand Down Expand Up @@ -211,9 +216,9 @@ public void close() throws Exception {
*/
@Override
public void open(Serializable checkpointId) throws Exception {
log.openingReader( partitionIdStr, entityName );
log.openingReader( serializedPartitionId, entityName );

final int partitionId = Integer.parseInt( partitionIdStr );
final int partitionId = SerializationUtil.parseIntegerParameter( PARTITION_ID, serializedPartitionId );

JobContextData jobData = getJobContextData();

Expand Down Expand Up @@ -261,7 +266,7 @@ public void open(Serializable checkpointId) throws Exception {
private JobContextData getJobContextData() throws ClassNotFoundException, IOException {
return JobContextUtil.getOrCreateData( jobContext,
emfRegistry, entityManagerFactoryScope, entityManagerFactoryReference,
entityTypes, serializedCustomQueryCriteria );
serializedEntityTypes, serializedCustomQueryCriteria );
}

private PartitionBound getPartitionBound(JobContextData jobContextData) throws IOException, ClassNotFoundException {
Expand All @@ -272,16 +277,22 @@ private PartitionBound getPartitionBound(JobContextData jobContextData) throws I
}

private ScrollableResults buildScrollUsingHQL(StatelessSession ss, String HQL) {
boolean cacheable = SerializationUtil.parseBooleanParameter( CACHEABLE, serializedCacheable );
int fetchSize = SerializationUtil.parseIntegerParameter( FETCH_SIZE, serializedFetchSize );
int maxResults = SerializationUtil.parseIntegerParameter( CUSTOM_QUERY_LIMIT, serializedCustomQueryLimit );
return ss.createQuery( HQL )
.setReadOnly( true )
.setCacheable( Boolean.parseBoolean( cacheable ) )
.setFetchSize( Integer.parseInt( fetchSize ) )
.setMaxResults( Integer.parseInt( customQueryLimit ) )
.setCacheable( cacheable )
.setFetchSize( fetchSize )
.setMaxResults( maxResults )
.scroll( ScrollMode.FORWARD_ONLY );
}

private ScrollableResults buildScrollUsingCriteria(StatelessSession ss,
PartitionBound unit, Object checkpointId, JobContextData jobData) {
boolean cacheable = SerializationUtil.parseBooleanParameter( CACHEABLE, serializedCacheable );
int fetchSize = SerializationUtil.parseIntegerParameter( FETCH_SIZE, serializedFetchSize );
int maxResults = SerializationUtil.parseIntegerParameter( CUSTOM_QUERY_LIMIT, serializedCustomQueryLimit );
Class<?> entityType = unit.getEntityType();
String idName = sessionFactory.getClassMetadata( entityType )
.getIdentifierPropertyName();
Expand Down Expand Up @@ -313,9 +324,9 @@ else if ( unit.isLastPartition() ) {

return criteria.addOrder( Order.asc( idName ) )
.setReadOnly( true )
.setCacheable( Boolean.parseBoolean( cacheable ) )
.setFetchSize( Integer.parseInt( fetchSize ) )
.setMaxResults( Integer.parseInt( customQueryLimit ) )
.setCacheable( cacheable )
.setFetchSize( fetchSize )
.setMaxResults( maxResults )
.scroll( ScrollMode.FORWARD_ONLY );
}

Expand Down
Expand Up @@ -35,6 +35,10 @@
import org.hibernate.search.jsr352.massindexing.impl.util.SerializationUtil;
import org.hibernate.search.util.logging.impl.LoggerFactory;

import static org.hibernate.search.jsr352.massindexing.MassIndexingJobParameters.FETCH_SIZE;
import static org.hibernate.search.jsr352.massindexing.MassIndexingJobParameters.MAX_THREADS;
import static org.hibernate.search.jsr352.massindexing.MassIndexingJobParameters.ROWS_PER_PARTITION;

/**
* This partition mapper provides a dynamic partition plan for chunk processing.
* <p>
Expand All @@ -56,19 +60,19 @@ private enum Type {

@Inject
@BatchProperty(name = MassIndexingJobParameters.FETCH_SIZE)
private String fetchSize;
private String serializedFetchSize;

@Inject
@BatchProperty(name = MassIndexingJobParameters.CUSTOM_QUERY_HQL)
private String customQueryHql;

@Inject
@BatchProperty(name = MassIndexingJobParameters.MAX_THREADS)
private String maxThreads;
private String serializedMaxThreads;

@Inject
@BatchProperty(name = MassIndexingJobParameters.ROWS_PER_PARTITION)
private String rowsPerPartition;
private String serializedRowsPerPartition;

@Inject
@BatchProperty(name = MassIndexingJobParameters.TENANT_ID)
Expand All @@ -83,21 +87,18 @@ public PartitionMapper() {
* Constructor for unit test. TODO should it be done in this way?
*
* @param emf
* @param fetchSize
* @param customQueryHql
* @param maxThreads
* @param rowsPerPartition
*/
PartitionMapper(EntityManagerFactory emf,
String fetchSize,
String serializedFetchSize,
String customQueryHql,
String rowsPerPartition,
String maxThreads) {
String serializedRowsPerPartition,
String serializedMaxThreads) {
this.emf = emf;
this.fetchSize = fetchSize;
this.serializedFetchSize = serializedFetchSize;
this.customQueryHql = customQueryHql;
this.maxThreads = maxThreads;
this.rowsPerPartition = rowsPerPartition;
this.serializedMaxThreads = serializedMaxThreads;
this.serializedRowsPerPartition = serializedRowsPerPartition;
}

@Override
Expand Down Expand Up @@ -138,7 +139,7 @@ public PartitionPlan mapPartitions() throws Exception {
}

// Build partition plan
final int threads = Integer.valueOf( maxThreads );
final int threads = SerializationUtil.parseIntegerParameter( MAX_THREADS, serializedMaxThreads );
final int partitions = partitionBounds.size();
final Properties[] props = new Properties[partitions];
log.partitionsPlan( partitions, threads );
Expand Down Expand Up @@ -196,7 +197,7 @@ else if ( criterions != null && criterions.size() > 0 ) {

private List<PartitionBound> buildPartitionUnitsFrom(ScrollableResults scroll, Class<?> clazz) {
List<PartitionBound> partitionUnits = new ArrayList<>();
final int rowsPerPartition = Integer.parseInt( this.rowsPerPartition );
int rowsPerPartition = SerializationUtil.parseIntegerParameter( ROWS_PER_PARTITION, serializedRowsPerPartition );
Object lowerID = null;
Object upperID = null;
while ( scroll.scroll( rowsPerPartition ) ) {
Expand All @@ -217,9 +218,10 @@ private ScrollableResults buildScrollableResults(StatelessSession ss,
if ( criterions != null ) {
criterions.forEach( c -> criteria.add( c ) );
}
int fetchSize = SerializationUtil.parseIntegerParameter( FETCH_SIZE, serializedFetchSize );
ScrollableResults scroll = criteria
.setProjection( Projections.alias( Projections.id(), "aliasedId" ) )
.setFetchSize( Integer.parseInt( fetchSize ) )
.setFetchSize( fetchSize )
.setReadOnly( true )
.addOrder( Order.asc( "aliasedId" ) )
.scroll( ScrollMode.FORWARD_ONLY );
Expand Down
Expand Up @@ -29,6 +29,8 @@
import org.hibernate.search.util.StringHelper;
import org.hibernate.search.util.logging.impl.LoggerFactory;

import static org.hibernate.search.jsr352.massindexing.MassIndexingJobParameters.CUSTOM_QUERY_CRITERIA;

/**
* Utility allowing to set up and retrieve the job context data, shared by all the steps.
* <p>
Expand Down Expand Up @@ -106,7 +108,7 @@ private static JobContextData createData(EntityManagerFactory emf, String entity
}

@SuppressWarnings("unchecked")
Set<Criterion> criteria = (Set<Criterion>) SerializationUtil.deserialize( serializedCustomQueryCriteria );
Set<Criterion> criteria = SerializationUtil.parseParameter( Set.class, CUSTOM_QUERY_CRITERIA, serializedCustomQueryCriteria );
if ( criteria == null ) {
criteria = Collections.emptySet();
}
Expand Down

0 comments on commit 9a8662d

Please sign in to comment.