Skip to content
Permalink
Browse files
BATCHEE-67 patch from Scott Kurz to propagate job name + upgrading TC…
…Ks. Also fixing few tests not determistic and upgrading openejb version for TCKs.
  • Loading branch information
Romain Manni-Bucau committed Oct 24, 2015
1 parent 6d7a20d commit 31a4d1214753f509a3ef244b94f6bdf202cf8eca
Show file tree
Hide file tree
Showing 11 changed files with 448 additions and 488 deletions.

Large diffs are not rendered by default.

@@ -33,7 +33,7 @@ public class JobContextImpl implements JobContext {
private Object transientUserData = null;
private ModelNavigator<JSLJob> navigator = null;

private String id; // Name
private String jobName; // 'id' attribute in JSL
private Properties properties = new Properties();

private long executionId;
@@ -42,7 +42,7 @@ public class JobContextImpl implements JobContext {

public JobContextImpl(final ModelNavigator<JSLJob> navigator, final JSLProperties jslProperties) {
this.navigator = navigator;
this.id = navigator.getRootModelElement().getId();
this.jobName = navigator.getRootModelElement().getId();
this.batchStatus = BatchStatus.STARTING;
this.properties = convertJSProperties(jslProperties);
}
@@ -71,7 +71,11 @@ public void setExitStatus(String exitStatus) {
}

public String getJobName() {
return id;
return jobName;
}

public void setJobName(final String jobName) {
this.jobName = jobName;
}

public BatchStatus getBatchStatus() {
@@ -123,7 +127,7 @@ public void setRestartOn(String restartOn) {
@Override
public String toString() {
return ("batchStatus = " + batchStatus) + " , exitStatus = " + exitStatus + " , id = "
+ id + " , executionId = " + executionId + " , instanceId = " + instanceId + " , restartOn = " + restartOn;
+ jobName + " , executionId = " + executionId + " , instanceId = " + instanceId + " , restartOn = " + restartOn;
}
}

@@ -298,7 +298,7 @@ private void buildSubJobBatchWorkUnits() throws JobRestartException, JobStartExc
new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());
// Then build all the subjobs but do not start them yet
if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
parallelBatchWorkUnits = kernelService.buildOnRestartParallelPartitions(config);
parallelBatchWorkUnits = kernelService.buildOnRestartParallelPartitions(config, jobExecutionImpl.getJobContext(), stepContext);
} else {
parallelBatchWorkUnits = kernelService.buildNewParallelPartitions(config, jobExecutionImpl.getJobContext(), stepContext);
}
@@ -121,9 +121,9 @@ private void buildSubJobBatchWorkUnits() {
int count = batchKernel.getJobInstanceCount(job.getId());
FlowInSplitBuilderConfig config = new FlowInSplitBuilderConfig(job, completedWorkQueue, rootJobExecutionId);
if (count == 0) {
parallelBatchWorkUnits.add(batchKernel.buildNewFlowInSplitWorkUnit(config));
parallelBatchWorkUnits.add(batchKernel.buildNewFlowInSplitWorkUnit(config, jobExecution.getJobContext()));
} else if (count == 1) {
parallelBatchWorkUnits.add(batchKernel.buildOnRestartFlowInSplitWorkUnit(config));
parallelBatchWorkUnits.add(batchKernel.buildOnRestartFlowInSplitWorkUnit(config, jobExecution.getJobContext()));
} else {
throw new IllegalStateException("There is an inconsistency somewhere in the internal subjob creation");
}
@@ -71,6 +71,7 @@ public void prepareForExecution(final JobContextImpl jobContext) {
public void inheritJobContext(final JobContextImpl jc) {
jobContext.setExecutionId(jc.getExecutionId());
jobContext.setInstanceId(jc.getInstanceId());
jobContext.setJobName(jc.getJobName());
}

public void setRestartOn(final String restartOn) {
@@ -54,7 +54,7 @@ InternalJobExecution restartJob(long executionID, Properties overrideJobParamete

List<BatchPartitionWorkUnit> buildNewParallelPartitions(PartitionsBuilderConfig config, JobContextImpl jc, StepContextImpl sc) throws JobRestartException, JobStartException;

List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(PartitionsBuilderConfig config) throws JobRestartException,
List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(PartitionsBuilderConfig config, JobContextImpl jc, StepContextImpl sc) throws JobRestartException,
JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException;

void startGeneratedJob(BatchWorkUnit batchWork);
@@ -64,10 +64,10 @@ List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(PartitionsBuilderC
boolean isExecutionRunning(long executionId);

BatchFlowInSplitWorkUnit buildNewFlowInSplitWorkUnit(
FlowInSplitBuilderConfig config);
FlowInSplitBuilderConfig config, JobContextImpl jc);

BatchFlowInSplitWorkUnit buildOnRestartFlowInSplitWorkUnit(
FlowInSplitBuilderConfig config);
FlowInSplitBuilderConfig config, JobContextImpl jc);


}
@@ -186,7 +186,7 @@ public List<BatchPartitionWorkUnit> buildNewParallelPartitions(final PartitionsB
}

@Override
public List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(final PartitionsBuilderConfig config)
public List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(final PartitionsBuilderConfig config, final JobContextImpl jc, final StepContextImpl sc)
throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {

final List<JSLJob> jobModels = config.getJobModels();
@@ -205,13 +205,15 @@ public List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(final Parti
final RuntimeJobExecution jobExecution;
try {
jobExecution = JobExecutionHelper.restartPartition(servicesManager, execId, parallelJob, partitionProps);
jobExecution.inheritJobContext(jc);
jobExecution.setPartitionInstance(instance);
} catch (final NoSuchJobExecutionException e) {
throw new IllegalStateException("Caught NoSuchJobExecutionException but this is an internal JobExecution so this shouldn't have happened: execId ="
+ execId, e);
}

final BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(jobExecution, config, servicesManager);
batchWork.inheritStepContext(sc);
registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());

batchWorkUnits.add(batchWork);
@@ -231,11 +233,12 @@ public void restartGeneratedJob(final BatchWorkUnit batchWork) throws JobRestart
}

@Override
public BatchFlowInSplitWorkUnit buildNewFlowInSplitWorkUnit(final FlowInSplitBuilderConfig config) {
public BatchFlowInSplitWorkUnit buildNewFlowInSplitWorkUnit(final FlowInSplitBuilderConfig config, final JobContextImpl jc) {
final JSLJob parallelJob = config.getJobModel();

final RuntimeFlowInSplitExecution execution = JobExecutionHelper.startFlowInSplit(servicesManager, parallelJob);
final BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(execution, config, servicesManager);
execution.inheritJobContext(jc);

registerCurrentInstanceAndExecution(execution, batchWork.getController());
return batchWork;
@@ -264,7 +267,7 @@ private long getMostRecentExecutionId(final JSLJob jobModel) {
}

@Override
public BatchFlowInSplitWorkUnit buildOnRestartFlowInSplitWorkUnit(final FlowInSplitBuilderConfig config)
public BatchFlowInSplitWorkUnit buildOnRestartFlowInSplitWorkUnit(final FlowInSplitBuilderConfig config, final JobContextImpl jc)
throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {

final JSLJob jobModel = config.getJobModel();
@@ -277,7 +280,7 @@ public BatchFlowInSplitWorkUnit buildOnRestartFlowInSplitWorkUnit(final FlowInSp
}

final BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(jobExecution, config, servicesManager);

jobExecution.inheritJobContext(jc);
registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
return batchWork;
}
@@ -243,6 +243,9 @@ public Properties getParameters(final long executionId) throws NoSuchJobExecutio
public List<InternalJobExecution> jobOperatorGetJobExecutions(final long jobInstanceId) {
final List<InternalJobExecution> list = new LinkedList<InternalJobExecution>();
final Structures.JobInstanceData jobInstanceData = data.jobInstanceData.get(jobInstanceId);
if (jobInstanceData == null || jobInstanceData.executions == null) {
return list;
}
synchronized (jobInstanceData.executions) {
for (final Structures.ExecutionInstanceData executionInstanceData : jobInstanceData.executions) {
list.add(executionInstanceData.execution);

This file was deleted.

21 pom.xml
@@ -191,6 +191,18 @@
</resources>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-remote-resources-plugin</artifactId>
<version>1.5</version>
<dependencies>
<dependency> <!-- workaround for jdk 1.8 -->
<groupId>org.apache.maven.shared</groupId>
<artifactId>maven-filtering</artifactId>
<version>1.3</version>
</dependency>
</dependencies>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -471,15 +483,6 @@
</plugins>
</build>

<pluginRepositories>

<!-- WTF? surely not! -->
<pluginRepository>
<id>sonatype-public-repository</id>
<url>https://oss.sonatype.org/content/groups/public</url>
</pluginRepository>
</pluginRepositories>

<reporting>
<plugins>
<plugin>
@@ -18,18 +18,22 @@

import org.apache.batchee.cli.lifecycle.Lifecycle;
import org.apache.batchee.util.Batches;
import org.junit.FixMethodOrder;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.StandardOutputStreamLog;
import org.junit.runners.MethodSorters;

import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchRuntime;

import static java.lang.Thread.sleep;
import static org.apache.batchee.cli.BatchEECLI.main;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class MainTest {
@Rule
public final StandardOutputStreamLog stdout = new StandardOutputStreamLog();
@@ -74,6 +78,11 @@ public void running() {
final JobOperator jobOperator = BatchRuntime.getJobOperator();
final long id = jobOperator.start("long-sample", null);

try {
sleep(100); // ensure it is started
} catch (final InterruptedException e) {
Thread.interrupted();
}
main(new String[]{"running"});
assertThat(stdout.getLog(), containsString("long-sample -> ["));

@@ -154,18 +163,17 @@ public void executions() {
final JobOperator jobOperator = BatchRuntime.getJobOperator();
final long id = jobOperator.start("sample", null);

main(new String[]{"executions", "-id", Long.toString(id)});

// output looks like:
// Executions of sample for instance 5
// execution id | batch status | exit status | start time | end time
// 5 | COMPLETED | COMPLETED | sam. janv. 04 17:20:24 CET 2014 | sam. janv. 04 17:20:24 CET 2014


assertThat(stdout.getLog(), containsString("Executions of sample for instance 5"));
assertThat(stdout.getLog(), containsString("COMPLETED"));

Batches.waitForEnd(jobOperator, id);
main(new String[]{"executions", "-id", Long.toString(id)});

assertThat(stdout.getLog(), containsString("Executions of sample for instance " + id));
assertThat(stdout.getLog(), containsString("COMPLETED"));
}

@Test

0 comments on commit 31a4d12

Please sign in to comment.