Skip to content

Commit

Permalink
Fix with tests for Bug 5675.
Browse files Browse the repository at this point in the history
  • Loading branch information
scottkurz committed Nov 5, 2014
1 parent 855b97b commit 8a82576
Show file tree
Hide file tree
Showing 6 changed files with 416 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public abstract class BaseStepControllerImpl implements IExecutionElementControl

protected TransactionManagerAdapter transactionManager = null;

private static IPersistenceManagerService _persistenceManagementService = ServicesManagerImpl.getInstance().getPersistenceManagerService();
protected static IPersistenceManagerService _persistenceManagementService = ServicesManagerImpl.getInstance().getPersistenceManagerService();

private static IJobStatusManagerService _jobStatusService = (IJobStatusManagerService) ServicesManagerImpl.getInstance().getJobStatusManagerService();

Expand Down Expand Up @@ -198,7 +198,7 @@ public ExecutionStatus execute() {
persistUserData();
transitionToFinalBatchStatus();
defaultExitStatusIfNecessary();
persistExitStatusAndEndTimestamp();
persistExitStatusEndTimestampAndStepExecution();
} catch (Throwable t) {
// Don't let an exception caught here prevent us from persisting the failed batch status.
markJobAndStepFailed();
Expand Down Expand Up @@ -386,7 +386,7 @@ protected void persistUserData() {
_jobStatusService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
}

protected void persistExitStatusAndEndTimestamp() {
protected void persistExitStatusEndTimestampAndStepExecution() {
stepStatus.setExitStatus(stepContext.getExitStatus());
_jobStatusService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);

Expand All @@ -395,6 +395,10 @@ protected void persistExitStatusAndEndTimestamp() {
Timestamp endTS = new Timestamp(time);
stepContext.setEndTime(endTS);

persistStepExecution();
}

protected void persistStepExecution() {
_persistenceManagementService.updateStepExecution(rootJobExecutionId, stepContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@ private void executeAndWaitForCompletion() throws JobRestartException {
boolean exceptionThrownAnalyzingCollectorData = false;
boolean exceptionThrownAnalyzingStatus = false;


if (numTotalForThisExecution == 0) {
logger.finer("All partitions have already completed on a previous execution. Returning");
return;
}

while (true) {
logger.finer("Begin main loop in waitForQueueCompletion(), readyToSubmitAnother = " + readyToSubmitAnother);
try {
Expand Down Expand Up @@ -568,4 +574,10 @@ protected void sendStatusFromPartitionToAnalyzerIfPresent() {
// be anything to do on this thread. It's only on the partitioned
// threads that there is something to send back.
}

@Override
protected void persistStepExecution() {
// Call special aggregating method
_persistenceManagementService.updateWithFinalPartitionAggregateStepExecution(rootJobExecutionId, stepContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ public enum TimestampType { CREATE, END, LAST_UPDATED, STARTED };
*/
public void updateStepExecution(long jobExecId, StepContextImpl stepContext);

/**
* Update a StepExecution for the "top-level" StepExecution of a partitioned step.
*
* This will aggregate the metrics from the "partition-level" StepExecution(s),
* (which by the way are not spec-defined and not accessible through standard, public APIs.)
*
* @param jobExecId the parent JobExecution id
* @param stepContext the step context for this step execution
*/
public void updateWithFinalPartitionAggregateStepExecution(long rootJobExecutionId, StepContextImpl stepContext);


// JOB_STATUS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1834,18 +1834,16 @@ private StepExecutionImpl createStepExecution(long rootJobExecId, String batchS
return stepExecution;
}



/* (non-Javadoc)
* @see com.ibm.jbatch.container.services.IPersistenceManagerService#updateStepExecution(long, com.ibm.jbatch.container.context.impl.StepContextImpl)
*/
@Override
public void updateStepExecution(long rootJobExecId, StepContextImpl stepContext) {
long stepExecutionId = stepContext.getInternalStepExecutionId();
String batchStatus = stepContext.getBatchStatus() == null ? BatchStatus.STARTING.name() : stepContext.getBatchStatus().name();
String exitStatus = stepContext.getExitStatus();
String stepName = stepContext.getStepName();
if (logger.isLoggable(Level.FINE)) {
logger.fine("batchStatus: " + batchStatus + " | stepName: " + stepName + " | stepExecID: " + stepContext.getStepExecutionId());
}

Metric[] metrics = stepContext.getMetrics();

long readCount = 0;
long writeCount = 0;
long commitCount = 0;
Expand All @@ -1854,10 +1852,7 @@ public void updateStepExecution(long rootJobExecId, StepContextImpl stepContext)
long processSkipCount = 0;
long filterCount = 0;
long writeSkipCount = 0;
Timestamp startTime = stepContext.getStartTimeTS();
Timestamp endTime = stepContext.getEndTimeTS();

Metric[] metrics = stepContext.getMetrics();

for (int i = 0; i < metrics.length; i++) {
if (metrics[i].getType().equals(MetricImpl.MetricType.READ_COUNT)) {
readCount = metrics[i].getValue();
Expand All @@ -1877,35 +1872,143 @@ public void updateStepExecution(long rootJobExecId, StepContextImpl stepContext)
writeSkipCount = metrics[i].getValue();
}
}
Serializable persistentData = stepContext.getPersistentUserData();

updateStepExecution(stepExecutionId, rootJobExecId, batchStatus, exitStatus, stepName, readCount,
writeCount, commitCount, rollbackCount, readSkipCount, processSkipCount, filterCount,
writeSkipCount, startTime, endTime, persistentData);

updateStepExecutionWithMetrics(rootJobExecId, stepContext, readCount,
writeCount, commitCount, rollbackCount, readSkipCount, processSkipCount, filterCount,
writeSkipCount);
}

/**
* Obviously would be nice if the code writing this special format were in the same place as this
* code reading it.
*
* Assumes format like:
*
* JOBINSTANCEDATA
* (jobinstanceid name, ...)
*
* 1197,"partitionMetrics","NOTSET"
* 1198,":1197:step1:0","NOTSET"
* 1199,":1197:step1:1","NOTSET"
* 1200,":1197:step2:0","NOTSET"
*
* @param rootJobExecutionId JobExecution id of the top-level job
* @param stepName Step name of the top-level stepName
*/
private String getPartitionLevelJobInstanceWildCard(long rootJobExecutionId, String stepName) {

Connection conn = null;
PreparedStatement statement = null;
ResultSet rs = null;
long jobInstanceId = 0L;

try {
conn = getConnection();
statement = conn.prepareStatement("select jobinstanceid from jbatch.EXECUTIONINSTANCEDATA where jobexecid = ?");

statement.setLong(1, rootJobExecutionId);
rs = statement.executeQuery();
if(rs.next()) {
jobInstanceId = rs.getLong(1);
} else {
return null;
}
} catch (SQLException e) {
throw new PersistenceException(e);
} finally {
cleanupConnection(conn, rs, statement);
}

StringBuilder sb = new StringBuilder(":");
sb.append(Long.toString(jobInstanceId));
sb.append(":");
sb.append(stepName);
sb.append(":%");

return sb.toString();
}

@Override
public void updateWithFinalPartitionAggregateStepExecution(long rootJobExecutionId, StepContextImpl stepContext) {

Connection conn = null;
PreparedStatement statement = null;
ResultSet rs = null;

long readCount =0;
long writeCount = 0;
long commitCount = 0;
long rollbackCount = 0;
long readSkipCount = 0;
long processSkipCount = 0;
long filterCount = 0;
long writeSkipCount = 0;

try {
conn = getConnection();
statement = conn.prepareStatement("select SUM(STEPEX.readcount) readcount, SUM(STEPEX.writecount) writecount, SUM(STEPEX.commitcount) commitcount, SUM(STEPEX.rollbackcount) rollbackcount," +
" SUM(STEPEX.readskipcount) readskipcount, SUM(STEPEX.processskipcount) processskipcount, SUM(STEPEX.filtercount) filtercount, SUM(STEPEX.writeSkipCount) writeSkipCount" +
" from stepexecutioninstancedata STEPEX inner join executioninstancedata JOBEX" +
" on STEPEX.jobexecid = JOBEX.jobexecid" +
" where JOBEX.jobinstanceid IN" +
" (select jobinstanceid from JOBINSTANCEDATA where name like ?)");

statement.setString(1, getPartitionLevelJobInstanceWildCard(rootJobExecutionId, stepContext.getStepName()));
rs = statement.executeQuery();
if(rs.next()) {
readCount = rs.getLong("readcount");
writeCount = rs.getLong("writecount");
commitCount = rs.getLong("commitcount");
rollbackCount = rs.getLong("rollbackcount");
readSkipCount = rs.getLong("readskipcount");
processSkipCount = rs.getLong("processskipcount");
filterCount = rs.getLong("filtercount");
writeSkipCount = rs.getLong("writeSkipCount");
}
} catch (SQLException e) {
throw new PersistenceException(e);
} finally {
cleanupConnection(conn, rs, statement);
}

updateStepExecutionWithMetrics(rootJobExecutionId, stepContext, readCount,
writeCount, commitCount, rollbackCount, readSkipCount, processSkipCount, filterCount,
writeSkipCount);
}

private void updateStepExecution(long stepExecutionId, long jobExecId, String batchStatus, String exitStatus, String stepName, long readCount,
private void updateStepExecutionWithMetrics(long rootJobExecId, StepContextImpl stepContext, long readCount,
long writeCount, long commitCount, long rollbackCount, long readSkipCount, long processSkipCount, long filterCount,
long writeSkipCount, Timestamp startTime, Timestamp endTime, Serializable persistentData) {
long writeSkipCount) {

logger.entering(CLASSNAME, "updateStepExecution", new Object[] {stepExecutionId, jobExecId, batchStatus, exitStatus==null ? "<null>" : exitStatus, stepName, readCount,
long stepExecutionId = stepContext.getInternalStepExecutionId();
String batchStatus = stepContext.getBatchStatus() == null ? BatchStatus.STARTING.name() : stepContext.getBatchStatus().name();
String exitStatus = stepContext.getExitStatus();
String stepName = stepContext.getStepName();
if (logger.isLoggable(Level.FINE)) {
logger.fine("batchStatus: " + batchStatus + " | stepName: " + stepName + " | stepExecID: " + stepContext.getStepExecutionId());
}

Timestamp startTime = stepContext.getStartTimeTS();
Timestamp endTime = stepContext.getEndTimeTS();

Serializable persistentData = stepContext.getPersistentUserData();

if (logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, "About to update StepExecution with: ", new Object[] {stepExecutionId, rootJobExecId, batchStatus, exitStatus==null ? "<null>" : exitStatus, stepName, readCount,
writeCount, commitCount, rollbackCount, readSkipCount, processSkipCount, filterCount, writeSkipCount, startTime==null ? "<null>" : startTime,
endTime==null ? "<null>" : endTime, persistentData==null ? "<null>" : persistentData});
}

Connection conn = null;
PreparedStatement statement = null;
ResultSet rs = null;
StepExecutionImpl stepExecution = null;
String query = "UPDATE stepexecutioninstancedata SET jobexecid = ?, batchstatus = ?, exitstatus = ?, stepname = ?, readcount = ?,"
+ "writecount = ?, commitcount = ?, rollbackcount = ?, readskipcount = ?, processskipcount = ?, filtercount = ?, writeskipcount = ?,"
+ " starttime = ?, endtime = ?, persistentdata = ? WHERE stepexecid = ?";

try {
conn = getConnection();
statement = conn.prepareStatement(query);
statement.setLong(1, jobExecId);
statement.setLong(1, rootJobExecId);
statement.setString(2, batchStatus);
statement.setString(3, exitStatus);
statement.setString(4, stepName);
Expand All @@ -1929,8 +2032,6 @@ private void updateStepExecution(long stepExecutionId, long jobExecId, String b
} finally {
cleanupConnection(conn, null, statement);
}

logger.exiting(CLASSNAME, "updateStepExecution");
}

/* (non-Javadoc)
Expand Down
Loading

0 comments on commit 8a82576

Please sign in to comment.