Skip to content
Permalink
Browse files
BATCHEE-54 BATCHEE-69 applying Scott Kurz's patches, reexecution of p…
…artition after a restart and fixes regarding the spec in ChunkStepController
  • Loading branch information
rmannibucau committed Apr 26, 2016
1 parent f08eead commit 88041b9230930a6b0bfa85e89c96e3dd14636b81
Showing 12 changed files with 483 additions and 403 deletions.
@@ -97,7 +97,7 @@
<dependency>
<groupId>com.ibm.jbatch.tck</groupId>
<artifactId>com.ibm.jbatch.tck</artifactId>
<version>1.1-b02</version>
<version>1.1-b03</version>
<scope>test</scope>
<exclusions>
<exclusion>
@@ -108,7 +108,6 @@
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</exclusion>
<!-- SNAPSHOTS -->
<exclusion>
<groupId>com.ibm.jbatch</groupId>
<artifactId>com.ibm.jbatch.spi</artifactId>
@@ -118,7 +117,7 @@
<dependency>
<groupId>com.ibm.jbatch.tck</groupId>
<artifactId>com.ibm.jbatch.tck.spi</artifactId>
<version>1.1-b02</version>
<version>1.1-b03</version>
<scope>test</scope>
<exclusions>
<exclusion>
@@ -67,12 +67,16 @@ public abstract class BaseStepController implements ExecutionElementController {

protected StepContextImpl stepContext;
protected Step step;
protected String stepName;
protected StepStatus stepStatus;

protected BlockingQueue<PartitionDataWrapper> analyzerStatusQueue = null;

protected long rootJobExecutionId;

// Restart of partitioned steps needs to be handled specially
protected boolean restartAfterCompletion = false;

protected final BatchKernelService kernelService;
protected final PersistenceManagerService persistenceManagerService;
private final JobStatusManagerService statusManagerService;
@@ -90,6 +94,7 @@ protected BaseStepController(final RuntimeJobExecution jobExecution, final Step
throw new IllegalArgumentException("Step parameter to ctor cannot be null.");
}
this.step = step;
this.stepName = step.getId();

this.txService = servicesManager.service(TransactionManagementService.class);
this.kernelService = servicesManager.service(BatchKernelService.class);
@@ -311,6 +316,8 @@ private boolean shouldStepBeExecutedOnRestart() {
// boolean, but it should default to 'false', which is the spec'd default.
if (!Boolean.parseBoolean(step.getAllowStartIfComplete())) {
return false;
} else {
restartAfterCompletion = true;
}
}

@@ -340,6 +347,9 @@ private boolean shouldStepBeExecutedOnRestart() {
return true;
}

protected boolean isRestartExecution() {
return stepStatus.getStartCount() > 1;
}

protected void statusStarting() {
stepStatus.setBatchStatus(BatchStatus.STARTING);
@@ -71,6 +71,27 @@ public class PartitionedStepController extends BaseStepController {

private PartitionReducer partitionReducerProxy = null;

private enum ExecutionType {
/**
* First execution of this step for the job instance (among all job executions)
*/
START,
/**
* Step previously executed but did not complete successfully, override=false so continue from previous partitions' checkpoints, etc.
*/
RESTART_NORMAL,
/**
* Step previously executed but did not complete successfully, override=true so start with an entire set of new partitions, checkpoints, etc.
*/
RESTART_OVERRIDE,
/**
* Step previously completed, but we are re-executing with an entire set of new partitions, checkpoints, etc.
*/
RESTART_AFTER_COMPLETION
}

private ExecutionType executionType = null;

// On invocation this will be re-primed to reflect already-completed partitions from a previous execution.
int numPreviouslyCompleted = 0;

@@ -247,6 +268,37 @@ private PartitionPlan generatePartitionPlan() {
return plan;
}

private void calculateExecutionType() {
// We want to ignore override on the initial execution
if (isRestartExecution()) {
if (restartAfterCompletion) {
executionType = ExecutionType.RESTART_AFTER_COMPLETION;
} else if (plan.getPartitionsOverride()) {
executionType = ExecutionType.RESTART_OVERRIDE;
} else {
executionType = ExecutionType.RESTART_NORMAL;
}
} else {
executionType = ExecutionType.START;
}
}

private void validateNumberOfPartitions() {

int currentPlanSize = plan.getPartitions();

if (executionType == ExecutionType.RESTART_NORMAL) {
int previousPlanSize = stepStatus.getNumPartitions();
if (previousPlanSize > 0 && previousPlanSize != currentPlanSize) {
String msg = "On a normal restart, the plan on restart specified: " + currentPlanSize + " # of partitions, but the previous " +
"executions' plan specified a different number: " + previousPlanSize + " # of partitions. Failing job.";
throw new IllegalStateException(msg);
}
}

//persist the partition plan so on restart we have the same plan to reuse
stepStatus.setNumPartitions(currentPlanSize);
}

@Override
protected void invokeCoreStep() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
@@ -255,6 +307,9 @@ protected void invokeCoreStep() throws JobRestartException, JobStartException, J

//persist the partition plan so on restart we have the same plan to reuse
stepStatus.setNumPartitions(plan.getPartitions());
calculateExecutionType();

validateNumberOfPartitions();

/* When true is specified, the partition count from the current run
* is used and all results from past partitions are discarded. Any
@@ -263,7 +318,7 @@ protected void invokeCoreStep() throws JobRestartException, JobStartException, J
* rollbackPartitionedStep method is invoked during restart before any
* partitions begin processing to provide a cleanup hook.
*/
if (plan.getPartitionsOverride()) {
if (executionType == ExecutionType.RESTART_OVERRIDE) {
if (this.partitionReducerProxy != null) {
try {
this.partitionReducerProxy.rollbackPartitionedStep();
@@ -303,9 +358,14 @@ private void buildSubJobBatchWorkUnits() throws JobRestartException, JobStartExc
PartitionsBuilderConfig config =
new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());
// Then build all the subjobs but do not start them yet
if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
if (executionType == ExecutionType.RESTART_NORMAL) {
parallelBatchWorkUnits = kernelService.buildOnRestartParallelPartitions(config, jobExecutionImpl.getJobContext(), stepContext);
} else {
// This case includes RESTART_OVERRIDE and RESTART_AFTER_COMPLETION.
//
// So we're just going to create new "subjob" job instances in the DB in these cases,
// and we'll have to make sure we're dealing with the correct ones, say in a subsequent "normal" restart
// (of the current execution which is itself a restart)
parallelBatchWorkUnits = kernelService.buildNewParallelPartitions(config, jobExecutionImpl.getJobContext(), stepContext);
}

@@ -324,6 +384,11 @@ private void executeAndWaitForCompletion() throws JobRestartException {
int numCurrentCompleted = 0;
int numCurrentSubmitted = 0;

// All partitions have already completed on a previous execution.
if (numTotalForThisExecution == 0) {
return;
}

//Start up to to the max num we are allowed from the num threads attribute
for (int i = 0; i < this.threads && i < numTotalForThisExecution; i++, numCurrentSubmitted++) {
final BatchWorkUnit workUnit = parallelBatchWorkUnits.get(i);

This file was deleted.

@@ -51,6 +51,22 @@ public CheckpointManager(final ItemReader reader, final ItemWriter writer,
this.dataRepresentationService = dataRepresentationService;
}

public void beginCheckpoint() {
try {
this.checkpointAlgorithm.beginCheckpoint();
} catch (final Exception e) {
throw new BatchContainerRuntimeException("Checkpoint algorithm beginCheckpoint() failed", e);
}
}

public void endCheckpoint() {
try {
this.checkpointAlgorithm.endCheckpoint();
} catch (final Exception e) {
throw new BatchContainerRuntimeException("Checkpoint algorithm endCheckpoint() failed", e);
}
}

public boolean applyCheckPointPolicy() {
try {
return checkpointAlgorithm.isReadyToCheckpoint();
@@ -48,19 +48,20 @@ public static int getTimeLimit(Chunk chunk) {
return timeLimit;
}

public static String getCheckpointPolicy(Chunk chunk) {
public static boolean isCustomCheckpointPolicy(Chunk chunk) {
String checkpointPolicy = chunk.getCheckpointPolicy();

if (checkpointPolicy != null && !checkpointPolicy.isEmpty()) {
if (!(checkpointPolicy.equals("item") || checkpointPolicy.equals("custom"))) {
if (checkpointPolicy.equals("item")) {
return false;
} else if (checkpointPolicy.equals("custom")) {
return true;
} else {
throw new IllegalArgumentException("The only supported attributed values for 'checkpoint-policy' are 'item' and 'custom'.");
}
} else {
checkpointPolicy = "item";
return false;
}

chunk.setCheckpointPolicy(checkpointPolicy);
return checkpointPolicy;
}

public static int getSkipLimit(Chunk chunk) {

0 comments on commit 88041b9

Please sign in to comment.