Skip to content
Permalink
Browse files
BATCHEE-88 improve handling in case of error during step commit
  • Loading branch information
struberg committed May 15, 2016
1 parent 4313c77 commit 7a74df4bc9295377ab35ea53eacb8204698cd425
Showing 5 changed files with 92 additions and 20 deletions.
@@ -19,6 +19,10 @@
import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchRuntime;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.Metric;
import javax.batch.runtime.StepExecution;

import java.util.List;

import org.apache.batchee.util.Batches;
import org.testng.Assert;
@@ -30,8 +34,27 @@ public class TxErrorTest {
@Test
public void testRolledBackDuringWork() {
final JobOperator jobOperator = BatchRuntime.getJobOperator();
BatchStatus batchStatus = Batches.waitFor(jobOperator, jobOperator.start("txtest1", null));
Assert.assertEquals(batchStatus, BatchStatus.COMPLETED);
Assert.assertEquals(TxErrorWriter1.written.intValue(), 5);
long executionId = jobOperator.start("txtest1", null);
BatchStatus batchStatus = Batches.waitFor(jobOperator, executionId);
Assert.assertEquals(batchStatus, BatchStatus.FAILED);
Assert.assertEquals(TxErrorWriter1.written.intValue(), 3);

List<StepExecution> stepExecutions = jobOperator.getStepExecutions(executionId);
Assert.assertEquals(stepExecutions.size(), 1);
StepExecution stepExecution = stepExecutions.get(0);
Metric[] metrics = stepExecution.getMetrics();
assertMetric(Metric.MetricType.READ_COUNT, 2, metrics);
assertMetric(Metric.MetricType.WRITE_COUNT, 2, metrics);
assertMetric(Metric.MetricType.ROLLBACK_COUNT, 1, metrics);
}

private void assertMetric(Metric.MetricType metricType, long expected, Metric[] metrics) {
for (Metric metric : metrics) {
if (metricType.equals(metric.getType())) {
Assert.assertEquals(metric.getValue(), expected);
return;
}
}
Assert.fail("MetricType " + metricType + " not in collected metrics");
}
}
@@ -18,9 +18,6 @@
<chunk item-count="1">
<reader ref="org.apache.batchee.its.transaction.TxErrorReader"></reader>
<writer ref="org.apache.batchee.its.transaction.TxErrorWriter1"></writer>
<skippable-exception-classes>
<include class="java.lang.Exception"/> <!-- all exceptions... -->
</skippable-exception-classes>
</chunk>
</step>
</job>
@@ -359,6 +359,11 @@ protected void statusStarting() {
}

protected void persistUserData() {
PersistentDataWrapper userData = resolveUserData();
storeUserData(userData);
}

protected PersistentDataWrapper resolveUserData() {
final ByteArrayOutputStream persistentBAOS = new ByteArrayOutputStream();
final ObjectOutputStream persistentDataOOS;

@@ -370,8 +375,16 @@ protected void persistUserData() {
throw new BatchContainerServiceException("Cannot persist the persistent user data for the step.", e);
}

stepStatus.setPersistentUserData(new PersistentDataWrapper(persistentBAOS.toByteArray()));
statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
return new PersistentDataWrapper(persistentBAOS.toByteArray());
}

protected void storeUserData(PersistentDataWrapper userData) {
try {
stepStatus.setPersistentUserData(userData);
statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
} catch (final Exception e) {
throw new BatchContainerServiceException("Cannot persist the persistent user data for the step.", e);
}
}

protected void persistExitStatusAndEndTimestamp() {
@@ -16,6 +16,9 @@
*/
package org.apache.batchee.container.impl.controller.chunk;

import java.util.HashMap;
import java.util.Map;

import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.exception.BatchContainerServiceException;
import org.apache.batchee.spi.DataRepresentationService;
@@ -75,28 +78,45 @@ public boolean applyCheckPointPolicy() {
}
}

public void checkpoint() {
/**
* Takes the current checkpoint data from the ItemReader and ItemWriter
* and store them in the database
*/
public Map<CheckpointDataKey, CheckpointData> prepareCheckpoints() {
final CheckpointDataKey readerChkptDK;
final CheckpointDataKey writerChkptDK;
Map<CheckpointDataKey, CheckpointData> checkpoints = new HashMap<CheckpointDataKey, CheckpointData>(2);
try {
byte[] checkpointBytes = dataRepresentationService.toInternalRepresentation(readerProxy.checkpointInfo());
CheckpointData readerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.READER);
readerChkptData.setRestartToken(checkpointBytes);
readerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.READER);

persistenceManagerService.setCheckpointData(readerChkptDK, readerChkptData);
checkpoints.put(readerChkptDK, readerChkptData);

checkpointBytes = dataRepresentationService.toInternalRepresentation(writerProxy.checkpointInfo());
CheckpointData writerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.WRITER);
writerChkptData.setRestartToken(checkpointBytes);
writerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.WRITER);

persistenceManagerService.setCheckpointData(writerChkptDK, writerChkptData);

checkpoints.put(writerChkptDK, writerChkptData);
} catch (final Exception ex) {
// is this what I should be throwing here?
throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + stepId + "]", ex);
}

return checkpoints;
}

public void storeCheckPoints(Map<CheckpointDataKey, CheckpointData> checkpoints) {
try {
for (Map.Entry<CheckpointDataKey, CheckpointData> checkpointEntry : checkpoints.entrySet()) {
persistenceManagerService.setCheckpointData(checkpointEntry.getKey(), checkpointEntry.getValue());
}
} catch (final Exception ex) {
throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + stepId + "]", ex);
}

}

public int checkpointTimeout() {
@@ -48,10 +48,14 @@
import javax.batch.api.chunk.listener.SkipProcessListener;
import javax.batch.api.chunk.listener.SkipReadListener;
import javax.batch.api.chunk.listener.SkipWriteListener;
import javax.batch.operations.BatchRuntimeException;
import javax.batch.runtime.BatchStatus;
import javax.transaction.Status;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
@@ -585,11 +589,22 @@ private void invokeChunk() {
chunkProxy.afterChunk();
}

checkpointManager.checkpoint();

this.persistUserData();

transactionManager.commit();
Map<CheckpointDataKey, CheckpointData> checkpoints = checkpointManager.prepareCheckpoints();
PersistentDataWrapper userData = resolveUserData();
try {
transactionManager.commit();
storeUserData(userData);
checkpointManager.storeCheckPoints(checkpoints);
} catch (Exception e) {
// only set the Exception if we didn't blow up before anyway
if (this.stepContext.getException() != null) {
this.stepContext.setException(e);
}
if (e instanceof BatchRuntimeException) {
throw e;
}
throw new BatchContainerServiceException("Cannot commit the transaction for the step.", e);
}

checkpointManager.endCheckpoint();

@@ -659,14 +674,15 @@ private boolean doClose() {
*/
private void rollback(final Throwable t) {
try {
// ignore, we blow up anyway
transactionManager.setRollbackOnly();
try {
doClose();
} catch (Exception e) {
// ignore, we blow up anyway
}

// ignore, we blow up anyway
transactionManager.setRollbackOnly();

if (t instanceof Exception) {
Exception e = (Exception) t;
for (ChunkListener chunkProxy : chunkListeners) {
@@ -681,7 +697,10 @@ private void rollback(final Throwable t) {
// ever come up in the spec, but seems marginally more useful.
stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue();
} finally {
transactionManager.rollback();
int txStatus = transactionManager.getStatus();
if (txStatus == Status.STATUS_ACTIVE || txStatus == Status.STATUS_MARKED_ROLLBACK) {
transactionManager.rollback();
}
throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", t);
}
}

0 comments on commit 7a74df4

Please sign in to comment.