Skip to content

Commit

Permalink
HSEARCH-2689 Do not use JobContextData to pass the partition bounds t…
Browse files Browse the repository at this point in the history
…o partitions

The JobContextData is not preserved when switching from the main thread
to a partition in JBatch, so this makes no sense. It only used to work
in JBeret by chance.
  • Loading branch information
yrodiere authored and Sanne committed Oct 25, 2017
1 parent cd606f7 commit f77e500
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 55 deletions.
Expand Up @@ -17,7 +17,7 @@
import org.hibernate.criterion.Criterion;
import org.hibernate.search.exception.SearchException;
import org.hibernate.search.jsr352.logging.impl.Log;
import org.hibernate.search.jsr352.massindexing.impl.util.MassIndexerUtil;
import org.hibernate.search.jsr352.massindexing.impl.util.SerializationUtil;
import org.hibernate.search.util.logging.impl.LoggerFactory;

/**
Expand Down Expand Up @@ -334,7 +334,7 @@ public Properties build() {
try {
jobParams.put(
MassIndexingJobParameters.CUSTOM_QUERY_CRITERIA,
MassIndexerUtil.serializeCriteria( customQueryCriteria )
SerializationUtil.serialize( customQueryCriteria )
);
}
catch (IOException e) {
Expand Down
Expand Up @@ -20,7 +20,6 @@
import org.hibernate.criterion.Criterion;
import org.hibernate.search.engine.integration.impl.ExtendedSearchIntegrator;
import org.hibernate.search.hcore.util.impl.ContextHelper;
import org.hibernate.search.jsr352.massindexing.impl.util.PartitionBound;

/**
* Container for data shared across the entire batch job.
Expand All @@ -39,11 +38,6 @@ public class JobContextData {
*/
private Map<String, Class<?>> entityTypeMap;

/**
* The list of partition boundaries, one element per partition
*/
private List<PartitionBound> partitionBounds;

private Set<Criterion> customQueryCriteria;

public JobContextData() {
Expand Down Expand Up @@ -90,14 +84,6 @@ public Set<Criterion> getCustomQueryCriteria() {
return customQueryCriteria;
}

public void setPartitionBounds(List<PartitionBound> partitionBounds) {
this.partitionBounds = partitionBounds;
}

public PartitionBound getPartitionBound(int partitionId) {
return partitionBounds.get( partitionId );
}

public void setCustomQueryCriteria(Set<Criterion> criteria) {
this.customQueryCriteria = criteria;
}
Expand All @@ -108,7 +94,6 @@ public String toString() {
.append( "JobContextData [" )
.append( "entityManagerFactory=" ).append( entityManagerFactory )
.append( ", entityTypeMap=" ).append( entityTypeMap )
.append( ", partitionBounds=" ).append( partitionBounds )
.append( ", customQueryCriteria=" ).append( customQueryCriteria )
.append( "]" )
.toString();
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.hibernate.search.jsr352.massindexing.impl.util.JobContextUtil;
import org.hibernate.search.jsr352.massindexing.impl.util.MassIndexingPartitionProperties;
import org.hibernate.search.jsr352.massindexing.impl.util.PartitionBound;
import org.hibernate.search.jsr352.massindexing.impl.util.SerializationUtil;
import org.hibernate.search.util.logging.impl.LoggerFactory;

/**
Expand Down Expand Up @@ -110,9 +111,16 @@ public class EntityReader extends AbstractItemReader {
@BatchProperty(name = MassIndexingPartitionProperties.PARTITION_ID)
private String partitionIdStr;

@Inject
@BatchProperty(name = MassIndexingPartitionProperties.LOWER_BOUND)
private String serializedLowerBound;

@Inject
@BatchProperty(name = MassIndexingPartitionProperties.UPPER_BOUND)
private String serializedUpperBound;

private EntityManagerFactory emf;

private Class<?> entityType;
private Serializable checkpointId;
private Session session;
private StatelessSession ss;
Expand All @@ -124,26 +132,23 @@ public EntityReader() {

/**
* Constructor for unit test TODO should it be done in this way?
*
* @param cacheable
* @param entityName
* @param fetchSize
* @param hql
* @param maxResults
* @param partitionIdStr
*/
EntityReader(String cacheable,
String entityName,
String fetchSize,
String hql,
String maxResults,
String partitionIdStr) {
String partitionIdStr,
String serializedLowerBound,
String serializedUpperBound) {
this.cacheable = cacheable;
this.entityName = entityName;
this.fetchSize = fetchSize;
this.customQueryHql = hql;
this.customQueryLimit = maxResults;
this.partitionIdStr = partitionIdStr;
this.serializedLowerBound = serializedLowerBound;
this.serializedUpperBound = serializedUpperBound;
}

/**
Expand Down Expand Up @@ -207,8 +212,7 @@ public void open(Serializable checkpointId) throws Exception {

JobContextData jobData = getJobContextData();

entityType = jobData.getIndexedType( entityName );
PartitionBound bound = jobData.getPartitionBound( partitionId );
PartitionBound bound = getPartitionBound( jobData );
log.printBound( bound );

emf = jobData.getEntityManagerFactory();
Expand Down Expand Up @@ -255,6 +259,13 @@ private JobContextData getJobContextData() throws ClassNotFoundException, IOExce
entityTypes, serializedCustomQueryCriteria );
}

private PartitionBound getPartitionBound(JobContextData jobContextData) throws IOException, ClassNotFoundException {
Class<?> entityType = jobContextData.getIndexedType( entityName );
Object lowerBound = SerializationUtil.deserialize( serializedLowerBound );
Object upperBound = SerializationUtil.deserialize( serializedUpperBound );
return new PartitionBound( entityType, lowerBound, upperBound );
}

private ScrollableResults buildScrollUsingHQL(StatelessSession ss, String HQL) {
return ss.createQuery( HQL )
.setReadOnly( true )
Expand All @@ -266,6 +277,7 @@ private ScrollableResults buildScrollUsingHQL(StatelessSession ss, String HQL) {

private ScrollableResults buildScrollUsingCriteria(StatelessSession ss,
PartitionBound unit, Object checkpointId, JobContextData jobData) {
Class<?> entityType = unit.getEntityType();
String idName = sessionFactory.getClassMetadata( entityType )
.getIdentifierPropertyName();

Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.hibernate.search.jsr352.massindexing.impl.JobContextData;
import org.hibernate.search.jsr352.massindexing.impl.util.MassIndexingPartitionProperties;
import org.hibernate.search.jsr352.massindexing.impl.util.PartitionBound;
import org.hibernate.search.jsr352.massindexing.impl.util.SerializationUtil;
import org.hibernate.search.util.logging.impl.LoggerFactory;

/**
Expand Down Expand Up @@ -133,7 +134,6 @@ public PartitionPlan mapPartitions() throws Exception {
}
break;
}
jobData.setPartitionBounds( partitionBounds );

// Build partition plan
final int threads = Integer.valueOf( maxThreads );
Expand All @@ -142,9 +142,12 @@ public PartitionPlan mapPartitions() throws Exception {
log.partitionsPlan( partitions, threads );

for ( int i = 0; i < partitionBounds.size(); i++ ) {
PartitionBound bound = partitionBounds.get( i );
props[i] = new Properties();
props[i].setProperty( MassIndexingPartitionProperties.ENTITY_NAME, partitionBounds.get( i ).getEntityName() );
props[i].setProperty( MassIndexingPartitionProperties.ENTITY_NAME, bound.getEntityName() );
props[i].setProperty( MassIndexingPartitionProperties.PARTITION_ID, String.valueOf( i ) );
props[i].setProperty( MassIndexingPartitionProperties.LOWER_BOUND, SerializationUtil.serialize( bound.getLowerBound() ) );
props[i].setProperty( MassIndexingPartitionProperties.UPPER_BOUND, SerializationUtil.serialize( bound.getUpperBound() ) );
}

PartitionPlan partitionPlan = new PartitionPlanImpl();
Expand Down
Expand Up @@ -8,6 +8,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -104,7 +105,11 @@ private static JobContextData createData(EntityManagerFactory emf, String entity
}
}

Set<Criterion> criteria = MassIndexerUtil.deserializeCriteria( serializedCustomQueryCriteria );
@SuppressWarnings("unchecked")
Set<Criterion> criteria = (Set<Criterion>) SerializationUtil.deserialize( serializedCustomQueryCriteria );
if ( criteria == null ) {
criteria = Collections.emptySet();
}
log.criteriaSize( criteria.size() );

JobContextData jobContextData = new JobContextData();
Expand Down
Expand Up @@ -19,4 +19,8 @@ private MassIndexingPartitionProperties() {
public static final String ENTITY_NAME = "entityName";

public static final String PARTITION_ID = "partitionId";

public static final String LOWER_BOUND = "lowerBound";

public static final String UPPER_BOUND = "upperBound";
}
Expand Up @@ -12,46 +12,39 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Base64;
import java.util.Collections;
import java.util.Set;

import org.hibernate.criterion.Criterion;
import org.hibernate.search.util.StringHelper;
import org.jboss.logging.Logger;

/**
* @author Mincong Huang
*/
public final class MassIndexerUtil {
public final class SerializationUtil {

public static final Logger LOGGER = Logger.getLogger( MassIndexerUtil.class );
public static final Logger LOGGER = Logger.getLogger( SerializationUtil.class );

private MassIndexerUtil() {
private SerializationUtil() {
// Private constructor, do not use it.
}

public static String serializeCriteria(Set<Criterion> criteria)
throws IOException {
public static String serialize(Object object) throws IOException {
try ( ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream( baos ) ) {
oos.writeObject( criteria );
oos.writeObject( object );
oos.flush();
byte bytes[] = baos.toByteArray();
return Base64.getEncoder().encodeToString( bytes );
}
}

public static Set<Criterion> deserializeCriteria(String serialized)
throws IOException, ClassNotFoundException {
public static Object deserialize(String serialized) throws IOException, ClassNotFoundException {
if ( StringHelper.isEmpty( serialized ) ) {
return Collections.emptySet();
return null;
}
byte bytes[] = Base64.getDecoder().decode( serialized );
try ( ByteArrayInputStream bais = new ByteArrayInputStream( bytes );
ObjectInputStream ois = new ObjectInputStream( bais ) ) {
@SuppressWarnings("unchecked")
Set<Criterion> criteria = (Set<Criterion>) ois.readObject();
return criteria;
return ois.readObject();
}
}
}
Expand Up @@ -9,7 +9,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

import java.util.Arrays;
import java.util.HashSet;

import javax.batch.runtime.context.JobContext;
Expand All @@ -20,7 +19,6 @@

import org.hibernate.search.jsr352.logging.impl.Log;
import org.hibernate.search.jsr352.massindexing.impl.JobContextData;
import org.hibernate.search.jsr352.massindexing.impl.util.PartitionBound;
import org.hibernate.search.jsr352.massindexing.test.entity.Company;
import org.hibernate.search.util.logging.impl.LoggerFactory;

Expand Down Expand Up @@ -88,24 +86,21 @@ public void setUp() {
fetchSize,
hql,
maxResults,
partitionId );
partitionId,
null,
null
);

MockitoAnnotations.initMocks( this );
}

@Test
public void testReadItem_withoutBoundary() throws Exception {

Object upper = null;
Object lower = null;
PartitionBound partitionBound = new PartitionBound( Company.class, lower, upper );

// mock job context
JobContextData jobData = new JobContextData();
jobData.setEntityManagerFactory( emf );
jobData.setCustomQueryCriteria( new HashSet<>() );
jobData.setEntityTypes( Company.class );
jobData.setPartitionBounds( Arrays.asList( partitionBound ) );
Mockito.when( mockedJobContext.getTransientUserData() ).thenReturn( jobData );

// mock step context
Expand Down

0 comments on commit f77e500

Please sign in to comment.