Skip to content

Commit

Permalink
#24 separate the beforeChunk enhancements from the chunk step
Browse files Browse the repository at this point in the history
  • Loading branch information
mincong-h committed Jul 17, 2016
1 parent b7765a9 commit 02cb248
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 30 deletions.
Expand Up @@ -7,6 +7,7 @@
package org.hibernate.search.jsr352.internal;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

Expand All @@ -26,10 +27,14 @@ public JobContextData(Set<Class<?>> entityClazzes) {
entityClazzes.forEach( clz -> entityClazzMap.put( clz.toString(), clz ) );
}

public Set<String> getEntityNames() {
public Set<String> getEntityNameSet() {
return entityClazzMap.keySet();
}

public Set<Class<?>> getEntityClazzSet() {
return new HashSet<Class<?>>( entityClazzMap.values() );
}

public String[] getEntityNameArray() {
Set<String> keySet = entityClazzMap.keySet();
return keySet.toArray( new String[keySet.size()] );
Expand Down
Expand Up @@ -7,8 +7,6 @@
package org.hibernate.search.jsr352.internal.steps.lucene;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;

import javax.batch.api.BatchProperty;
import javax.batch.runtime.context.JobContext;
Expand All @@ -23,8 +21,6 @@
import org.hibernate.StatelessSession;
import org.hibernate.criterion.Order;
import org.hibernate.criterion.Restrictions;
import org.hibernate.search.backend.PurgeAllLuceneWork;
import org.hibernate.search.backend.spi.BatchBackend;
import org.hibernate.search.hcore.util.impl.ContextHelper;
import org.hibernate.search.jsr352.internal.JobContextData;
import org.jboss.logging.Logger;
Expand All @@ -49,14 +45,6 @@ public class ItemReader implements javax.batch.api.chunk.ItemReader {

private static final Logger logger = Logger.getLogger( ItemReader.class );

@Inject
@BatchProperty
private boolean optimizeAfterPurge;

@Inject
@BatchProperty
private boolean purgeAtStart;

@Inject
@BatchProperty
private int maxResults;
Expand Down Expand Up @@ -162,19 +150,6 @@ public void open(Serializable checkpoint) throws Exception {
String path = "java:comp/env/" + persistenceUnitName;
em = (EntityManager) InitialContext.doLookup( path );
session = em.unwrap( Session.class );
final BatchBackend backend = ContextHelper
.getSearchintegrator( session )
.makeBatchBackend( null );

// enhancement before indexing
if ( this.purgeAtStart ) {
logger.infof( "purging %s ...", entityName );
backend.doWorkInSync( new PurgeAllLuceneWork( null, entityClazz ) );
if ( this.optimizeAfterPurge ) {
logger.infof( "optimizing %s ...", entityName );
backend.optimize( new HashSet<Class<?>>( Arrays.asList( entityClazz ) ) );
}
}

ss = session.getSessionFactory().openStatelessSession();
String idName = ContextHelper
Expand Down
@@ -0,0 +1,82 @@
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.jsr352.internal.steps.purge;

import javax.batch.api.BatchProperty;
import javax.batch.api.Batchlet;
import javax.batch.runtime.context.JobContext;
import javax.inject.Inject;
import javax.inject.Named;
import javax.naming.InitialContext;
import javax.persistence.EntityManager;

import org.hibernate.Session;
import org.hibernate.search.backend.PurgeAllLuceneWork;
import org.hibernate.search.backend.spi.BatchBackend;
import org.hibernate.search.hcore.util.impl.ContextHelper;
import org.hibernate.search.jsr352.internal.JobContextData;
import org.jboss.logging.Logger;

/**
* Enhancements before the chunk step "produceLuceneDoc" (lucene document
* production)
*
* @author Mincong Huang
*/
@Named
public class BeforeChunkBatchlet implements Batchlet {

private static final Logger logger = Logger.getLogger( BeforeChunkBatchlet.class );
private final JobContext jobContext;

@Inject
@BatchProperty
private String persistenceUnitName;

@Inject
@BatchProperty
private boolean purgeAtStart;

@Inject
@BatchProperty
private boolean optimizeAfterPurge;

@Inject
public BeforeChunkBatchlet(JobContext jobContext) {
this.jobContext = jobContext;
}

@Override
public String process() throws Exception {

if ( this.purgeAtStart ) {

logger.info( "purging index for all entities ..." );
String path = "java:comp/env/" + persistenceUnitName;
EntityManager em = (EntityManager) InitialContext.doLookup( path );
Session session = em.unwrap( Session.class );
final BatchBackend backend = ContextHelper
.getSearchintegrator( session )
.makeBatchBackend( null );

JobContextData jobData = (JobContextData) jobContext.getTransientUserData();
jobData.getEntityClazzSet()
.forEach( clz -> backend.doWorkInSync( new PurgeAllLuceneWork( null, clz ) ) );

if ( this.optimizeAfterPurge ) {
logger.info( "optimizing all entities ..." );
backend.optimize( jobData.getEntityClazzSet() );
}
}
return null;
}

@Override
public void stop() throws Exception {
// TODO Auto-generated method stub
}
}
12 changes: 10 additions & 2 deletions core/src/main/resources/META-INF/batch-jobs/mass-index.xml
Expand Up @@ -17,14 +17,22 @@
</listener>
</listeners>

<step id="beforeChunk" next="produceLuceneDoc">
<batchlet ref="beforeChunkBatchlet">
<properties>
<property name="optimizeAfterPurge" value="#{jobParameters['optimizeAfterPurge']}" />
<property name="persistenceUnitName" value="#{jobParameters['persistenceUnitName']}" />
<property name="purgeAtStart" value="#{jobParameters['purgeAtStart']}" />
</properties>
</batchlet>
</step>

<step id="produceLuceneDoc">
<chunk checkpoint-policy="custom">
<reader ref="itemReader">
<properties>
<property name="entityName" value="#{partitionPlan['entityName']}" />
<property name="maxResults" value="#{jobParameters['maxResults']}?:10000000;" />
<property name="optimizeAfterPurge" value="#{jobParameters['optimizeAfterPurge']}" />
<property name="purgeAtStart" value="#{jobParameters['purgeAtStart']}" />
<property name="persistenceUnitName" value="#{jobParameters['persistenceUnitName']}" />
<property name="scrollInterval" value="#{partitionPlan['scrollInterval']}" />
<property name="scrollOffset" value="#{partitionPlan['scrollOffset']}" />
Expand Down
Expand Up @@ -48,9 +48,9 @@ public class MassIndexerIT {

private static final Logger logger = Logger.getLogger( MassIndexerIT.class );

private final boolean JOB_OPTIMIZE_AFTER_PURGE = false;
private final boolean JOB_OPTIMIZE_AFTER_PURGE = true;
private final boolean JOB_OPTIMIZE_AT_END = false;
private final boolean JOB_PURGE_AT_START = false;
private final boolean JOB_PURGE_AT_START = true;
private final int JOB_FETCH_SIZE = 100 * 1000;
private final int JOB_MAX_RESULTS = 200 * 1000;
private final int JOB_MAX_THREADS = 1;
Expand Down

0 comments on commit 02cb248

Please sign in to comment.