-
Notifications
You must be signed in to change notification settings - Fork 242
/
RestartChunkIT.java
151 lines (130 loc) · 5.16 KB
/
RestartChunkIT.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.jsr352.massindexing;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.JobExecution;
import javax.batch.runtime.StepExecution;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import org.hibernate.search.jsr352.massindexing.test.entity.Company;
import org.hibernate.search.jsr352.massindexing.test.entity.Person;
import org.hibernate.search.jsr352.test.util.JobFactory;
import org.hibernate.search.jsr352.test.util.JobTestUtil;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
/**
* @author Mincong Huang
*/
@RunWith(org.jboss.byteman.contrib.bmunit.BMUnitRunner.class)
@BMRules(rules = {
@BMRule(
name = "Create count-down before the step partitioning",
targetClass = "org.hibernate.search.jsr352.massindexing.impl.steps.lucene.PartitionMapper",
targetMethod = "mapPartitions",
targetLocation = "AT EXIT",
action = "createCountDown(\"beforeRestart\", " + RestartChunkIT.ITEMS_BEFORE_SIMULATED_FAILURE + ")"
),
@BMRule(
name = "Count down for each item read, interrupt the job when counter is 0",
targetClass = "org.hibernate.search.jsr352.massindexing.impl.steps.lucene.EntityReader",
targetMethod = "readItem",
targetLocation = "AT ENTRY",
condition = "countDown(\"beforeRestart\")",
action = "throw new java.lang.InterruptedException(\"Job is interrupted by Byteman.\")"
)
})
public class RestartChunkIT {
static final int ITEMS_BEFORE_SIMULATED_FAILURE = 100;
private static final String PERSISTENCE_UNIT_NAME = "h2";
private static final int JOB_TIMEOUT_MS = 10_000;
private static final long DB_COMP_ROWS = 100;
private static final long DB_PERS_ROWS = 50;
private JobOperator jobOperator;
private EntityManagerFactory emf;
@Before
public void setup() {
String[][] str = new String[][] {
{ "Google", "Sundar", "Pichai" },
{ "Red Hat", "James", "M. Whitehurst" },
{ "Microsoft", "Satya", "Nadella" },
{ "Facebook", "Mark", "Zuckerberg" },
{ "Amazon", "Jeff", "Bezos" }
};
jobOperator = JobFactory.getJobOperator();
emf = Persistence.createEntityManagerFactory( PERSISTENCE_UNIT_NAME );
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
for ( int i = 0; i < DB_COMP_ROWS; i++ ) {
em.persist( new Company( str[i % 5][0] ) );
}
for ( int i = 0; i < DB_PERS_ROWS; i++ ) {
String firstName = str[i % 5][1];
String lastName = str[i % 5][2];
String id = String.format( Locale.ROOT, "%2d%c", i, firstName.charAt( 0 ) );
em.persist( new Person( id, firstName, lastName ) );
}
em.getTransaction().commit();
em.close();
}
@After
public void shutdownJPA() {
emf.close();
}
@Test
public void testJob() throws InterruptedException, IOException {
List<Company> companies = JobTestUtil.findIndexedResults( emf, Company.class, "name", "Google" );
List<Person> people = JobTestUtil.findIndexedResults( emf, Person.class, "firstName", "Sundar" );
assertEquals( 0, companies.size() );
assertEquals( 0, people.size() );
// start the job
Properties parameters = MassIndexingJob.parameters()
.forEntities( Company.class, Person.class )
.entityManagerFactoryReference( PERSISTENCE_UNIT_NAME )
// must be smaller than ITEMS_BEFORE_SIMULATED_FAILURE to trigger the restart
.checkpointInterval( 10 )
.build();
long execId1 = jobOperator.start(
MassIndexingJob.NAME,
parameters
);
JobExecution jobExec1 = jobOperator.getJobExecution( execId1 );
JobTestUtil.waitForTermination( jobOperator, jobExec1, JOB_TIMEOUT_MS );
// job will be stopped by the byteman
for ( StepExecution stepExec : jobOperator.getStepExecutions( execId1 ) ) {
if ( stepExec.getStepName().equals( "produceLuceneDoc" ) ) {
assertEquals( BatchStatus.FAILED, stepExec.getBatchStatus() );
}
}
// restart the job
/*
* From the specs (v1.0, 10.8.1):
* Job parameter values are not remembered from one execution to the next.
*/
long execId2 = jobOperator.restart( execId1, parameters );
JobExecution jobExec2 = jobOperator.getJobExecution( execId2 );
JobTestUtil.waitForTermination( jobOperator, jobExec2, JOB_TIMEOUT_MS );
for ( StepExecution stepExec : jobOperator.getStepExecutions( execId2 ) ) {
assertEquals( BatchStatus.COMPLETED, stepExec.getBatchStatus() );
}
// search again
companies = JobTestUtil.findIndexedResults( emf, Company.class, "name", "google" );
people = JobTestUtil.findIndexedResults( emf, Person.class, "firstName", "Sundar" );
assertEquals( DB_COMP_ROWS / 5, companies.size() );
assertEquals( DB_PERS_ROWS / 5, people.size() );
}
}