Skip to content

Commit

Permalink
1) actually re-execute partitions which are part of a partitioned ste…
Browse files Browse the repository at this point in the history
…p that runs on a restart execution after the step has been COMPLETED (allow-start-if-complete=true). 2) Fix restart after a restart of a partitioned step with override=true. 3) Fix metrics aggregation after a restart of a partitioned step with override=true. 4) validate plan size (# of partitions) after normal restart (without override or previous completion). 5) Add ON DELETE CASCADE to a couple targets of foreign keys. Fixes issue #36.
  • Loading branch information
scottkurz committed Nov 13, 2015
1 parent 9814c28 commit 57fa84b
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 83 deletions.
Expand Up @@ -68,11 +68,15 @@ public abstract class BaseStepControllerImpl implements IExecutionElementControl

protected StepContextImpl stepContext;
protected Step step;
protected String stepName;
protected StepStatus stepStatus;

protected BlockingQueue<PartitionDataWrapper> analyzerStatusQueue = null;

protected long rootJobExecutionId;

// Restart of partitioned steps needs to be handled specially
protected boolean restartAfterCompletion = false;

protected static IBatchKernelService batchKernel = ServicesManagerImpl.getInstance().getBatchKernelService();

Expand All @@ -91,6 +95,7 @@ protected BaseStepControllerImpl(RuntimeJobExecution jobExecution, Step step, St
throw new IllegalArgumentException("Step parameter to ctor cannot be null.");
}
this.step = step;
this.stepName = step.getId();
}

protected BaseStepControllerImpl(RuntimeJobExecution jobExecution, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
Expand Down Expand Up @@ -321,6 +326,7 @@ protected boolean shouldStepBeExecuted() {
}

private boolean shouldStepBeExecutedOnRestart() {

BatchStatus stepBatchStatus = stepStatus.getBatchStatus();
if (stepBatchStatus.equals(BatchStatus.COMPLETED)) {
// A bit of parsing involved since the model gives us a String not a
Expand All @@ -330,6 +336,7 @@ private boolean shouldStepBeExecutedOnRestart() {
return false;
} else {
logger.fine("Step: " + step.getId() + " already has batch status of COMPLETED, and allow-start-if-complete is set to 'true'");
restartAfterCompletion = true;
}
}

Expand Down Expand Up @@ -362,7 +369,10 @@ private boolean shouldStepBeExecutedOnRestart() {
return true;
}


protected boolean isRestartExecution() {
return stepStatus.getStartCount() > 1;
}

protected void statusStarting() {
stepStatus.setBatchStatus(BatchStatus.STARTING);
_jobStatusService.updateJobCurrentStep(jobInstance.getInstanceId(), step.getId());
Expand Down
Expand Up @@ -298,6 +298,9 @@ public List<BatchPartitionWorkUnit> buildNewParallelPartitions(PartitionsBuilder
return batchWorkUnits;
}

/*
* There are some assumptions that all partition subjobs have associated DB entries
*/
@Override
public List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(PartitionsBuilderConfig config) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {

Expand All @@ -314,7 +317,7 @@ public List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(PartitionsB
Properties partitionProps = (partitionProperties == null) ? null : partitionProperties[instance];

try {
long execId = getMostRecentExecutionId(parallelJob);
long execId = getMostRecentSubJobExecutionId(parallelJob);

RuntimeJobExecution jobExecution = null;
try {
Expand Down Expand Up @@ -376,15 +379,13 @@ public BatchFlowInSplitWorkUnit buildNewFlowInSplitWorkUnit(FlowInSplitBuilderCo
return batchWork;
}

private long getMostRecentExecutionId(JSLJob jobModel) {
private long getMostRecentSubJobExecutionId(JSLJob jobModel) {

//There can only be one instance associated with a subjob's id since it is generated from an unique
//job instance id. So there should be no way to directly start a subjob with particular
List<Long> instanceIds = persistenceService.jobOperatorGetJobInstanceIds(jobModel.getId(), 0, 2);
// Pick off the first. There are some subtle cases we're ignoring probably.
List<Long> instanceIds = persistenceService.jobOperatorGetJobInstanceIds(jobModel.getId(), 0, 1);

// Maybe we should blow up on '0' too?
if (instanceIds.size() > 1) {
String errorMsg = "Found " + instanceIds.size() + " entries for instance id = " + jobModel.getId() + ", which should not have happened. Blowing up.";
if (instanceIds.size() == 0) {
String errorMsg = "Did not find an entry for job name = " + jobModel.getId();
logger.severe(errorMsg);
throw new IllegalStateException(errorMsg);
}
Expand Down Expand Up @@ -413,7 +414,7 @@ public BatchFlowInSplitWorkUnit buildOnRestartFlowInSplitWorkUnit(FlowInSplitBui
logger.entering(sourceClass, method, jobModel);
}

long execId = getMostRecentExecutionId(jobModel);
long execId = getMostRecentSubJobExecutionId(jobModel);

RuntimeFlowInSplitExecution jobExecution = null;
try {
Expand Down
Expand Up @@ -77,6 +77,9 @@ public class PartitionedStepControllerImpl extends BaseStepControllerImpl {
private volatile List<BatchPartitionWorkUnit> parallelBatchWorkUnits;

private PartitionReducerProxy partitionReducerProxy = null;

private enum ExecutionType {START, RESTART_NORMAL, RESTART_OVERRIDE, RESTART_AFTER_COMPLETION};
private ExecutionType executionType = null;

// On invocation this will be re-primed to reflect already-completed partitions from a previous execution.
int numPreviouslyCompleted = 0;
Expand Down Expand Up @@ -179,7 +182,7 @@ private PartitionPlan generatePartitionPlan() {
}

plan.setPartitionProperties(mapperPlan.getPartitionProperties());

if (logger.isLoggable(Level.FINE)) {
logger.fine("Partition plan defined by partition mapper: " + plan);
}
Expand Down Expand Up @@ -260,14 +263,48 @@ private PartitionPlan generatePartitionPlan() {
return plan;
}

private void calculateExecutionType() {
// We want to ignore override on the initial execution
if (isRestartExecution()) {
if (restartAfterCompletion) {
executionType = ExecutionType.RESTART_AFTER_COMPLETION;
} else if (plan.getPartitionsOverride()) {
executionType = ExecutionType.RESTART_OVERRIDE;
} else {
executionType = ExecutionType.RESTART_NORMAL;
}
} else {
executionType = ExecutionType.START;
}
}

private void validateNumberOfPartitions() {

int currentPlanSize = plan.getPartitions();

if (executionType == ExecutionType.RESTART_NORMAL) {
int previousPlanSize = stepStatus.getNumPartitions();
if (previousPlanSize > 0 && previousPlanSize != currentPlanSize) {
String msg = "On a normal restart, the plan on restart specified: " + currentPlanSize + " # of partitions, but the previous " +
"executions' plan specified a different number: " + previousPlanSize + " # of partitions. Failing job.";
logger.severe(msg);
throw new IllegalStateException(msg);
}
}

//persist the partition plan so on restart we have the same plan to reuse
stepStatus.setNumPartitions(currentPlanSize);
}


@Override
protected void invokeCoreStep() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {

this.plan = this.generatePartitionPlan();

//persist the partition plan so on restart we have the same plan to reuse
stepStatus.setNumPartitions(plan.getPartitions());
calculateExecutionType();

validateNumberOfPartitions();

/* When true is specified, the partition count from the current run
* is used and all results from past partitions are discarded. Any
Expand All @@ -276,11 +313,11 @@ protected void invokeCoreStep() throws JobRestartException, JobStartException, J
* rollbackPartitionedStep method is invoked during restart before any
* partitions begin processing to provide a cleanup hook.
*/
if (plan.getPartitionsOverride()) {
if (executionType == ExecutionType.RESTART_OVERRIDE) {
if (this.partitionReducerProxy != null) {
this.partitionReducerProxy.rollbackPartitionedStep();
}
}
}
}

logger.fine("Number of partitions in step: " + partitions + " in step " + step.getId() + "; Subjob properties defined by partition mapper: " + partitionProperties);

Expand Down Expand Up @@ -319,9 +356,14 @@ private void buildSubJobBatchWorkUnits() throws JobRestartException, JobStartExc

PartitionsBuilderConfig config = new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, finishedWorkQueue, jobExecutionImpl.getExecutionId());
// Then build all the subjobs but do not start them yet
if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
if (executionType == ExecutionType.RESTART_NORMAL) {
parallelBatchWorkUnits = batchKernel.buildOnRestartParallelPartitions(config);
} else {
} else {
// This case includes RESTART_OVERRIDE and RESTART_AFTER_COMPLETION.
//
// So we're just going to create new "subjob" job instances in the DB in these cases,
// and we'll have to make sure we're dealing with the correct ones, say in a subsequent "normal" restart
// (of the current execution which is itself a restart)
parallelBatchWorkUnits = batchKernel.buildNewParallelPartitions(config);
}

Expand Down
Expand Up @@ -43,6 +43,7 @@
import java.util.logging.Logger;

import javax.batch.operations.NoSuchJobExecutionException;
import javax.batch.operations.NoSuchJobInstanceException;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.JobInstance;
import javax.batch.runtime.Metric;
Expand Down Expand Up @@ -718,6 +719,9 @@ public List<Long> jobOperatorGetJobInstanceIds(String jobName, String appTag, in
}

@Override
/**
* Returns instance ids sorted from high to low instance id
*/
public List<Long> jobOperatorGetJobInstanceIds(String jobName, int start, int count) {

Connection conn = null;
Expand Down Expand Up @@ -1907,22 +1911,66 @@ public void updateStepExecution(StepContextImpl stepContext) {
* @param rootJobExecutionId JobExecution id of the top-level job
* @param stepName Step name of the top-level stepName
*/
private String getPartitionLevelJobInstanceWildCard(long rootJobExecutionId, String stepName) {

long jobInstanceId = getJobInstanceIdByExecutionId(rootJobExecutionId);
private String getPartitionLevelJobInstanceWildCard(long rootJobInstanceId, String stepName) {

StringBuilder sb = new StringBuilder(":");
sb.append(Long.toString(jobInstanceId));
sb.append(Long.toString(rootJobInstanceId));
sb.append(":");
sb.append(stepName);
sb.append(":%");

return sb.toString();
}

/*
* Because of how we implement partition restart after override as well as partitions after a restart
* after completion (we simply create a new "subjob" job instance id entry), we have to have a way
* to only look at the most recent one.
*
* There may be gaps in doing it this way, e.g. what if the previous execution blew up before persisting
* a complete set (one for each partition)? Will leave that for another time if necessary.
*/
private long getMostRecentZerothPartitionSubJobInstanceId(long rootJobInstanceId, String stepName) {

StringBuilder sb = new StringBuilder(":");
sb.append(Long.toString(rootJobInstanceId));
sb.append(":");
sb.append(stepName);
sb.append(":0");

String zerothPartitionSubJobName = sb.toString();

Connection conn = null;
PreparedStatement statement = null;
ResultSet rs = null;
long instanceId = 0;

try {
conn = getConnection();
statement = conn.prepareStatement("select max(jobinstanceid) as mostrecentid from jobinstancedata where name = ?");
statement.setObject(1, zerothPartitionSubJobName);
rs = statement.executeQuery();
if (rs.next()) {
instanceId = rs.getLong("mostrecentid");
} else {
String msg = "Did not find sub job instance named = " + zerothPartitionSubJobName;
logger.fine(msg);
throw new NoSuchJobInstanceException(msg);
}
} catch (SQLException e) {
throw new PersistenceException(e);
} finally {
cleanupConnection(conn, rs, statement);
}

return instanceId;
}

@Override
public void updateWithFinalPartitionAggregateStepExecution(long rootJobExecutionId, StepContextImpl stepContext) {

String stepName = stepContext.getStepName();

Connection conn = null;
PreparedStatement statement = null;
ResultSet rs = null;
Expand All @@ -1943,9 +1991,12 @@ public void updateWithFinalPartitionAggregateStepExecution(long rootJobExecution
" from stepexecutioninstancedata STEPEX inner join executioninstancedata JOBEX" +
" on STEPEX.jobexecid = JOBEX.jobexecid" +
" where JOBEX.jobinstanceid IN" +
" (select jobinstanceid from JOBINSTANCEDATA where name like ?)");
" (select jobinstanceid from JOBINSTANCEDATA where name like ? and jobinstanceid >= ?)");

long rootJobInstanceId = getJobInstanceIdByExecutionId(rootJobExecutionId);

statement.setString(1, getPartitionLevelJobInstanceWildCard(rootJobExecutionId, stepContext.getStepName()));
statement.setString(1, getPartitionLevelJobInstanceWildCard(rootJobInstanceId, stepName ));
statement.setLong(2, getMostRecentZerothPartitionSubJobInstanceId(rootJobInstanceId, stepName));
rs = statement.executeQuery();
if(rs.next()) {
readCount = rs.getLong("readcount");
Expand All @@ -1966,7 +2017,7 @@ public void updateWithFinalPartitionAggregateStepExecution(long rootJobExecution
updateStepExecutionWithMetrics(stepContext, readCount,
writeCount, commitCount, rollbackCount, readSkipCount, processSkipCount, filterCount,
writeSkipCount);
}
}

private void updateStepExecutionWithMetrics(StepContextImpl stepContext, long readCount,
long writeCount, long commitCount, long rollbackCount, long readSkipCount, long processSkipCount, long filterCount,
Expand Down Expand Up @@ -2266,4 +2317,6 @@ public void shutdown() throws BatchContainerServiceException {
}




}
Expand Up @@ -49,7 +49,7 @@ interface JDBCPersistenceManagerSQLConstants {
+ "parameters BLOB,"
+ "batchstatus VARCHAR(512),"
+ "exitstatus VARCHAR(512),"
+ "CONSTRAINT JOBINST_JOBEXEC_FK FOREIGN KEY (jobinstanceid) REFERENCES JOBINSTANCEDATA (jobinstanceid))";
+ "CONSTRAINT JOBINST_JOBEXEC_FK FOREIGN KEY (jobinstanceid) REFERENCES JOBINSTANCEDATA (jobinstanceid) ON DELETE CASCADE)";
final String CREATE_TAB_STEPEXECUTIONINSTANCEDATA = "CREATE TABLE STEPEXECUTIONINSTANCEDATA("
+ "stepexecid BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) CONSTRAINT STEPEXECUTION_PK PRIMARY KEY,"
+ "jobexecid BIGINT,"
Expand All @@ -67,7 +67,7 @@ interface JDBCPersistenceManagerSQLConstants {
+ "startTime TIMESTAMP,"
+ "endTime TIMESTAMP,"
+ "persistentData BLOB,"
+ "CONSTRAINT JOBEXEC_STEPEXEC_FK FOREIGN KEY (jobexecid) REFERENCES EXECUTIONINSTANCEDATA (jobexecid))";
+ "CONSTRAINT JOBEXEC_STEPEXEC_FK FOREIGN KEY (jobexecid) REFERENCES EXECUTIONINSTANCEDATA (jobexecid) ON DELETE CASCADE)";

final String INSERT_JOBSTATUS = "insert into jobstatus values(?, ?)";

Expand Down

0 comments on commit 57fa84b

Please sign in to comment.