Skip to content

Commit

Permalink
#21 show mass index progress using PartitionAnalyzer
Browse files Browse the repository at this point in the history
  • Loading branch information
mincong-h committed Jun 5, 2016
1 parent 7072f6f commit 911990f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 2 deletions.
Expand Up @@ -5,6 +5,7 @@
import javax.batch.api.BatchProperty;
import javax.batch.api.Batchlet;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.context.JobContext;
import javax.inject.Inject;
import javax.inject.Named;
import javax.persistence.EntityManager;
Expand All @@ -31,6 +32,9 @@ public class IdProducerBatchlet implements Batchlet {
@Inject @BatchProperty private int fetchSize;
@Inject @BatchProperty private int maxResults;

@Inject
private JobContext jobContext;

@Inject
private IndexingContext indexingContext;

Expand All @@ -56,6 +60,7 @@ public String process() throws Exception {
.setCacheable(false)
.uniqueResult();
System.out.printf("Total row = %d%n", rowCount);
jobContext.setTransientUserData(rowCount);

// load ids and store in scrollable results
ScrollableResults scrollableIds = session
Expand All @@ -74,10 +79,12 @@ public String process() throws Exception {
Serializable id = (Serializable) scrollableIds.get(0);
ids[i++] = id;
if (i == arrayCapacity) {
/*
for (Serializable _id : ids) {
System.out.printf("%5d ", _id);
}
System.out.printf("%n");
*/
indexingContext.add(ids);
// reset id array and index
ids = new Serializable[arrayCapacity];
Expand Down
Expand Up @@ -2,24 +2,50 @@

import java.io.Serializable;

import javax.batch.api.BatchProperty;
import javax.batch.api.partition.PartitionAnalyzer;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.context.JobContext;
import javax.inject.Inject;
import javax.inject.Named;

@Named
public class LucenePartitionAnalyzer implements PartitionAnalyzer {

@Inject
private JobContext jobContext;
private int workCount = 0;
private float percentage = 0;
@Inject @BatchProperty
private int maxResults;

/**
* Analyze data obtained from different partition plans via partition data
* collectors. The current analyze is to summarize to their progresses :
*
* workCount = workCount1 + workCount2 + ... + workCountN
*
* Then it shows the total mass index progress in percentage. This method is
* very similar to the current simple progress monitor. Note: concerning
* the "total" number of entities to process, it depends on 2 values : the
* number of row in the db table and the max results to process, defined by
* user before the job start. So the minimum between them will be used.
*
* @param fromCollector the checkpoint obtained from partition collector's
* collectPartitionData
*/
@Override
public void analyzeCollectorData(Serializable fromCollector) throws Exception {

long rowCount = (long) jobContext.getTransientUserData();
int total = Math.min((int) rowCount, maxResults);

workCount += (int) fromCollector;
System.out.printf("#analyzeCollectorData(): %d works processed.%n", workCount);
if (total != 0) {
percentage = 100 * (float) workCount / total;
}
System.out.printf("#analyzeCollectorData(): %d works processed (%.1f%%).%n",
workCount, percentage);
}

@Override
Expand Down
Expand Up @@ -35,7 +35,13 @@
</properties>
</mapper>
<collector ref="lucenePartitionCollector"/>
<analyzer ref="lucenePartitionAnalyzer"/>
<analyzer ref="lucenePartitionAnalyzer">
<properties>
<!-- maxResults - the limit of rows returned in criteria. It defines the
limit of results the query will ever get. Same as LIMIT in SQL. -->
<property name="maxResults" value="#{jobParameters['maxResults']}?:2147483647;"/>
</properties>
</analyzer>
<reducer ref="lucenePartitionReducer"/>
</partition>
</step>
Expand Down

0 comments on commit 911990f

Please sign in to comment.