Skip to content
Permalink
Browse files
BATCHEE-66 store Checkpoint info in human readable form
  • Loading branch information
struberg committed Nov 5, 2015
1 parent 31a4d12 commit 16c309b38fee0f69d55d2ce549d839b1ed15dfa0
Show file tree
Hide file tree
Showing 7 changed files with 568 additions and 38 deletions.
@@ -18,17 +18,17 @@

import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.exception.BatchContainerServiceException;
import org.apache.batchee.spi.DataRepresentationService;
import org.apache.batchee.spi.PersistenceManagerService;

import javax.batch.api.chunk.CheckpointAlgorithm;
import javax.batch.api.chunk.ItemReader;
import javax.batch.api.chunk.ItemWriter;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;

public class CheckpointManager {
private final PersistenceManagerService persistenceManagerService;
private final DataRepresentationService dataRepresentationService;
private final ItemReader readerProxy;
private final ItemWriter writerProxy;
private final CheckpointAlgorithm checkpointAlgorithm;
@@ -39,14 +39,16 @@ public class CheckpointManager {
public CheckpointManager(final ItemReader reader, final ItemWriter writer,
final CheckpointAlgorithm chkptAlg,
final long jobInstanceID, final String stepId,
final PersistenceManagerService persistenceManagerService) {
final PersistenceManagerService persistenceManagerService,
final DataRepresentationService dataRepresentationService) {
this.readerProxy = reader;
this.writerProxy = writer;
this.checkpointAlgorithm = chkptAlg;
this.stepId = stepId;
this.jobInstanceID = jobInstanceID;

this.persistenceManagerService = persistenceManagerService;
this.dataRepresentationService = dataRepresentationService;
}

public boolean applyCheckPointPolicy() {
@@ -58,27 +60,19 @@ public boolean applyCheckPointPolicy() {
}

public void checkpoint() {
final ByteArrayOutputStream readerChkptBA = new ByteArrayOutputStream();
final ByteArrayOutputStream writerChkptBA = new ByteArrayOutputStream();
final ObjectOutputStream readerOOS;
final ObjectOutputStream writerOOS;
final CheckpointDataKey readerChkptDK;
final CheckpointDataKey writerChkptDK;
try {
readerOOS = new ObjectOutputStream(readerChkptBA);
readerOOS.writeObject(readerProxy.checkpointInfo());
readerOOS.close();
byte[] checkpointBytes = dataRepresentationService.toInternalRepresentation(readerProxy.checkpointInfo());
CheckpointData readerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.READER);
readerChkptData.setRestartToken(readerChkptBA.toByteArray());
readerChkptData.setRestartToken(checkpointBytes);
readerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.READER);

persistenceManagerService.setCheckpointData(readerChkptDK, readerChkptData);

writerOOS = new ObjectOutputStream(writerChkptBA);
writerOOS.writeObject(writerProxy.checkpointInfo());
writerOOS.close();
checkpointBytes = dataRepresentationService.toInternalRepresentation(writerProxy.checkpointInfo());
CheckpointData writerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.WRITER);
writerChkptData.setRestartToken(writerChkptBA.toByteArray());
writerChkptData.setRestartToken(checkpointBytes);
writerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.WRITER);

persistenceManagerService.setCheckpointData(writerChkptDK, writerChkptData);
@@ -32,6 +32,7 @@
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
import org.apache.batchee.spi.BatchArtifactFactory;
import org.apache.batchee.spi.DataRepresentationService;
import org.apache.batchee.spi.PersistenceManagerService;

import javax.batch.api.chunk.CheckpointAlgorithm;
@@ -49,7 +50,6 @@
import javax.batch.api.chunk.listener.SkipReadListener;
import javax.batch.api.chunk.listener.SkipWriteListener;
import javax.batch.runtime.BatchStatus;
import java.io.ByteArrayInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -63,6 +63,7 @@ public class ChunkStepController extends SingleThreadedStepController {

private final PersistenceManagerService persistenceManagerService;
private final BatchArtifactFactory artifactFactory;
private final DataRepresentationService dataRepresentationService;

private Chunk chunk = null;
private ItemReader readerProxy = null;
@@ -88,6 +89,7 @@ public ChunkStepController(final RuntimeJobExecution jobExecutionImpl, final Ste
super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue, servicesManager);
this.persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
this.artifactFactory = servicesManager.service(BatchArtifactFactory.class);
this.dataRepresentationService = servicesManager.service(DataRepresentationService.class);
}

/**
@@ -536,7 +538,7 @@ private void invokeChunk() {
positionWriterAtCheckpoint();
checkpointManager = new CheckpointManager(readerProxy, writerProxy,
getCheckpointAlgorithm(itemCount, timeInterval), jobExecutionImpl
.getJobInstance().getInstanceId(), step.getId(), persistenceManagerService);
.getJobInstance().getInstanceId(), step.getId(), persistenceManagerService, dataRepresentationService);
}
}

@@ -745,8 +747,8 @@ private void initializeChunkArtifacts() {
chkptAlg = checkpointProxy;
}

checkpointManager
= new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), persistenceManagerService);
checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(),
persistenceManagerService, dataRepresentationService);

skipHandler = new SkipHandler(chunk);
skipHandler.addSkipProcessListener(skipProcessListeners);
@@ -769,12 +771,8 @@ private void openReaderAndWriter() {
// check for data in backing store
if (readerChkptData != null) {
final byte[] readertoken = readerChkptData.getRestartToken();
final ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
TCCLObjectInputStream readerOIS;
try {
readerOIS = new TCCLObjectInputStream(readerChkptBA);
readerProxy.open((Serializable) readerOIS.readObject());
readerOIS.close();
readerProxy.open((Serializable) dataRepresentationService.toJavaRepresentation(readertoken));
} catch (final Exception ex) {
// is this what I should be throwing here?
throw new BatchContainerServiceException("Cannot read the checkpoint data for [" + step.getId() + "]", ex);
@@ -799,12 +797,8 @@ private void openReaderAndWriter() {
// check for data in backing store
if (writerChkptData != null) {
final byte[] writertoken = writerChkptData.getRestartToken();
final ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
TCCLObjectInputStream writerOIS;
try {
writerOIS = new TCCLObjectInputStream(writerChkptBA);
writerProxy.open((Serializable) writerOIS.readObject());
writerOIS.close();
writerProxy.open((Serializable) dataRepresentationService.toJavaRepresentation(writertoken));
} catch (final Exception ex) {
throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
}
@@ -896,12 +890,8 @@ private void positionReaderAtCheckpoint() {
// check for data in backing store
if (readerData != null) {
byte[] readertoken = readerData.getRestartToken();
ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
TCCLObjectInputStream readerOIS;
try {
readerOIS = new TCCLObjectInputStream(readerChkptBA);
readerProxy.open((Serializable) readerOIS.readObject());
readerOIS.close();
readerProxy.open((Serializable) dataRepresentationService.toJavaRepresentation(readertoken));
} catch (Exception ex) {
// is this what I should be throwing here?
throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
@@ -929,12 +919,9 @@ private void positionWriterAtCheckpoint() {
// check for data in backing store
if (writerData != null) {
byte[] writertoken = writerData.getRestartToken();
ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
TCCLObjectInputStream writerOIS;
try {
writerOIS = new TCCLObjectInputStream(writerChkptBA);
writerProxy.open((Serializable) writerOIS.readObject());
writerOIS.close();
writerProxy.open((Serializable) dataRepresentationService.toJavaRepresentation(writertoken));
} catch (Exception ex) {
// is this what I should be throwing here?
throw new BatchContainerServiceException("Cannot read the checkpoint data for [" + step.getId() + "]", ex);
@@ -20,6 +20,7 @@
import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.exception.BatchContainerServiceException;
import org.apache.batchee.container.services.callback.SimpleJobExecutionCallbackService;
import org.apache.batchee.container.services.data.DefaultDataRepresentationService;
import org.apache.batchee.container.services.executor.DefaultThreadPoolService;
import org.apache.batchee.container.services.factory.CDIBatchArtifactFactory;
import org.apache.batchee.container.services.factory.DefaultBatchArtifactFactory;
@@ -34,6 +35,7 @@
import org.apache.batchee.spi.BatchArtifactFactory;
import org.apache.batchee.spi.BatchService;
import org.apache.batchee.spi.BatchThreadPoolService;
import org.apache.batchee.spi.DataRepresentationService;
import org.apache.batchee.spi.JobExecutionCallbackService;
import org.apache.batchee.spi.JobXMLLoaderService;
import org.apache.batchee.spi.PersistenceManagerService;
@@ -65,6 +67,7 @@ public class ServicesManager implements BatchContainerConstants {
SERVICE_IMPL_CLASS_NAMES.put(JobXMLLoaderService.class.getName(), DefaultJobXMLLoaderService.class.getName());
SERVICE_IMPL_CLASS_NAMES.put(SecurityService.class.getName(), DefaultSecurityService.class.getName());
SERVICE_IMPL_CLASS_NAMES.put(JobExecutionCallbackService.class.getName(), SimpleJobExecutionCallbackService.class.getName());
SERVICE_IMPL_CLASS_NAMES.put(DataRepresentationService.class.getName(), DefaultDataRepresentationService.class.getName());
try {
Thread.currentThread().getContextClassLoader().loadClass("javax.enterprise.inject.spi.BeanManager");
SERVICE_IMPL_CLASS_NAMES.put(BatchArtifactFactory.class.getName(), CDIBatchArtifactFactory.class.getName());

0 comments on commit 16c309b

Please sign in to comment.