Skip to content
Permalink
Browse files
Merge branch 'master' into BATCHEE-65
  • Loading branch information
rsandtner committed Nov 9, 2015
2 parents e954710 + 1a0259c commit 7592acbadfb64affac0488604660b499f2f67e11
Showing 19 changed files with 1,036 additions and 565 deletions.

Large diffs are not rendered by default.

@@ -33,7 +33,7 @@ public class JobContextImpl implements JobContext {
private Object transientUserData = null;
private ModelNavigator<JSLJob> navigator = null;

private String id; // Name
private String jobName; // 'id' attribute in JSL
private Properties properties = new Properties();

private long executionId;
@@ -42,7 +42,7 @@ public class JobContextImpl implements JobContext {

public JobContextImpl(final ModelNavigator<JSLJob> navigator, final JSLProperties jslProperties) {
this.navigator = navigator;
this.id = navigator.getRootModelElement().getId();
this.jobName = navigator.getRootModelElement().getId();
this.batchStatus = BatchStatus.STARTING;
this.properties = convertJSProperties(jslProperties);
}
@@ -71,7 +71,11 @@ public void setExitStatus(String exitStatus) {
}

public String getJobName() {
return id;
return jobName;
}

public void setJobName(final String jobName) {
this.jobName = jobName;
}

public BatchStatus getBatchStatus() {
@@ -123,7 +127,7 @@ public void setRestartOn(String restartOn) {
@Override
public String toString() {
return ("batchStatus = " + batchStatus) + " , exitStatus = " + exitStatus + " , id = "
+ id + " , executionId = " + executionId + " , instanceId = " + instanceId + " , restartOn = " + restartOn;
+ jobName + " , executionId = " + executionId + " , instanceId = " + instanceId + " , restartOn = " + restartOn;
}
}

@@ -298,7 +298,7 @@ private void buildSubJobBatchWorkUnits() throws JobRestartException, JobStartExc
new PartitionsBuilderConfig(subJobs, partitionProperties, analyzerStatusQueue, completedWorkQueue, jobExecutionImpl.getExecutionId());
// Then build all the subjobs but do not start them yet
if (stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
parallelBatchWorkUnits = kernelService.buildOnRestartParallelPartitions(config);
parallelBatchWorkUnits = kernelService.buildOnRestartParallelPartitions(config, jobExecutionImpl.getJobContext(), stepContext);
} else {
parallelBatchWorkUnits = kernelService.buildNewParallelPartitions(config, jobExecutionImpl.getJobContext(), stepContext);
}
@@ -121,9 +121,9 @@ private void buildSubJobBatchWorkUnits() {
int count = batchKernel.getJobInstanceCount(job.getId());
FlowInSplitBuilderConfig config = new FlowInSplitBuilderConfig(job, completedWorkQueue, rootJobExecutionId);
if (count == 0) {
parallelBatchWorkUnits.add(batchKernel.buildNewFlowInSplitWorkUnit(config));
parallelBatchWorkUnits.add(batchKernel.buildNewFlowInSplitWorkUnit(config, jobExecution.getJobContext()));
} else if (count == 1) {
parallelBatchWorkUnits.add(batchKernel.buildOnRestartFlowInSplitWorkUnit(config));
parallelBatchWorkUnits.add(batchKernel.buildOnRestartFlowInSplitWorkUnit(config, jobExecution.getJobContext()));
} else {
throw new IllegalStateException("There is an inconsistency somewhere in the internal subjob creation");
}
@@ -18,17 +18,17 @@

import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.exception.BatchContainerServiceException;
import org.apache.batchee.spi.DataRepresentationService;
import org.apache.batchee.spi.PersistenceManagerService;

import javax.batch.api.chunk.CheckpointAlgorithm;
import javax.batch.api.chunk.ItemReader;
import javax.batch.api.chunk.ItemWriter;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;

public class CheckpointManager {
private final PersistenceManagerService persistenceManagerService;
private final DataRepresentationService dataRepresentationService;
private final ItemReader readerProxy;
private final ItemWriter writerProxy;
private final CheckpointAlgorithm checkpointAlgorithm;
@@ -39,14 +39,16 @@ public class CheckpointManager {
public CheckpointManager(final ItemReader reader, final ItemWriter writer,
final CheckpointAlgorithm chkptAlg,
final long jobInstanceID, final String stepId,
final PersistenceManagerService persistenceManagerService) {
final PersistenceManagerService persistenceManagerService,
final DataRepresentationService dataRepresentationService) {
this.readerProxy = reader;
this.writerProxy = writer;
this.checkpointAlgorithm = chkptAlg;
this.stepId = stepId;
this.jobInstanceID = jobInstanceID;

this.persistenceManagerService = persistenceManagerService;
this.dataRepresentationService = dataRepresentationService;
}

public boolean applyCheckPointPolicy() {
@@ -58,27 +60,19 @@ public boolean applyCheckPointPolicy() {
}

public void checkpoint() {
final ByteArrayOutputStream readerChkptBA = new ByteArrayOutputStream();
final ByteArrayOutputStream writerChkptBA = new ByteArrayOutputStream();
final ObjectOutputStream readerOOS;
final ObjectOutputStream writerOOS;
final CheckpointDataKey readerChkptDK;
final CheckpointDataKey writerChkptDK;
try {
readerOOS = new ObjectOutputStream(readerChkptBA);
readerOOS.writeObject(readerProxy.checkpointInfo());
readerOOS.close();
byte[] checkpointBytes = dataRepresentationService.toInternalRepresentation(readerProxy.checkpointInfo());
CheckpointData readerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.READER);
readerChkptData.setRestartToken(readerChkptBA.toByteArray());
readerChkptData.setRestartToken(checkpointBytes);
readerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.READER);

persistenceManagerService.setCheckpointData(readerChkptDK, readerChkptData);

writerOOS = new ObjectOutputStream(writerChkptBA);
writerOOS.writeObject(writerProxy.checkpointInfo());
writerOOS.close();
checkpointBytes = dataRepresentationService.toInternalRepresentation(writerProxy.checkpointInfo());
CheckpointData writerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.WRITER);
writerChkptData.setRestartToken(writerChkptBA.toByteArray());
writerChkptData.setRestartToken(checkpointBytes);
writerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.WRITER);

persistenceManagerService.setCheckpointData(writerChkptDK, writerChkptData);
@@ -32,6 +32,7 @@
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
import org.apache.batchee.spi.BatchArtifactFactory;
import org.apache.batchee.spi.DataRepresentationService;
import org.apache.batchee.spi.PersistenceManagerService;

import javax.batch.api.chunk.CheckpointAlgorithm;
@@ -49,7 +50,6 @@
import javax.batch.api.chunk.listener.SkipReadListener;
import javax.batch.api.chunk.listener.SkipWriteListener;
import javax.batch.runtime.BatchStatus;
import java.io.ByteArrayInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -63,6 +63,7 @@ public class ChunkStepController extends SingleThreadedStepController {

private final PersistenceManagerService persistenceManagerService;
private final BatchArtifactFactory artifactFactory;
private final DataRepresentationService dataRepresentationService;

private Chunk chunk = null;
private ItemReader readerProxy = null;
@@ -88,6 +89,7 @@ public ChunkStepController(final RuntimeJobExecution jobExecutionImpl, final Ste
super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue, servicesManager);
this.persistenceManagerService = servicesManager.service(PersistenceManagerService.class);
this.artifactFactory = servicesManager.service(BatchArtifactFactory.class);
this.dataRepresentationService = servicesManager.service(DataRepresentationService.class);
}

/**
@@ -536,7 +538,7 @@ private void invokeChunk() {
positionWriterAtCheckpoint();
checkpointManager = new CheckpointManager(readerProxy, writerProxy,
getCheckpointAlgorithm(itemCount, timeInterval), jobExecutionImpl
.getJobInstance().getInstanceId(), step.getId(), persistenceManagerService);
.getJobInstance().getInstanceId(), step.getId(), persistenceManagerService, dataRepresentationService);
}
}

@@ -745,8 +747,8 @@ private void initializeChunkArtifacts() {
chkptAlg = checkpointProxy;
}

checkpointManager
= new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(), persistenceManagerService);
checkpointManager = new CheckpointManager(readerProxy, writerProxy, chkptAlg, jobExecutionImpl.getJobInstance().getInstanceId(), step.getId(),
persistenceManagerService, dataRepresentationService);

skipHandler = new SkipHandler(chunk);
skipHandler.addSkipProcessListener(skipProcessListeners);
@@ -769,12 +771,8 @@ private void openReaderAndWriter() {
// check for data in backing store
if (readerChkptData != null) {
final byte[] readertoken = readerChkptData.getRestartToken();
final ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
TCCLObjectInputStream readerOIS;
try {
readerOIS = new TCCLObjectInputStream(readerChkptBA);
readerProxy.open((Serializable) readerOIS.readObject());
readerOIS.close();
readerProxy.open((Serializable) dataRepresentationService.toJavaRepresentation(readertoken));
} catch (final Exception ex) {
// is this what I should be throwing here?
throw new BatchContainerServiceException("Cannot read the checkpoint data for [" + step.getId() + "]", ex);
@@ -799,12 +797,8 @@ private void openReaderAndWriter() {
// check for data in backing store
if (writerChkptData != null) {
final byte[] writertoken = writerChkptData.getRestartToken();
final ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
TCCLObjectInputStream writerOIS;
try {
writerOIS = new TCCLObjectInputStream(writerChkptBA);
writerProxy.open((Serializable) writerOIS.readObject());
writerOIS.close();
writerProxy.open((Serializable) dataRepresentationService.toJavaRepresentation(writertoken));
} catch (final Exception ex) {
throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
}
@@ -896,12 +890,8 @@ private void positionReaderAtCheckpoint() {
// check for data in backing store
if (readerData != null) {
byte[] readertoken = readerData.getRestartToken();
ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
TCCLObjectInputStream readerOIS;
try {
readerOIS = new TCCLObjectInputStream(readerChkptBA);
readerProxy.open((Serializable) readerOIS.readObject());
readerOIS.close();
readerProxy.open((Serializable) dataRepresentationService.toJavaRepresentation(readertoken));
} catch (Exception ex) {
// is this what I should be throwing here?
throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
@@ -929,12 +919,9 @@ private void positionWriterAtCheckpoint() {
// check for data in backing store
if (writerData != null) {
byte[] writertoken = writerData.getRestartToken();
ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
TCCLObjectInputStream writerOIS;
try {
writerOIS = new TCCLObjectInputStream(writerChkptBA);
writerProxy.open((Serializable) writerOIS.readObject());
writerOIS.close();
writerProxy.open((Serializable) dataRepresentationService.toJavaRepresentation(writertoken));
} catch (Exception ex) {
// is this what I should be throwing here?
throw new BatchContainerServiceException("Cannot read the checkpoint data for [" + step.getId() + "]", ex);
@@ -71,6 +71,7 @@ public void prepareForExecution(final JobContextImpl jobContext) {
public void inheritJobContext(final JobContextImpl jc) {
jobContext.setExecutionId(jc.getExecutionId());
jobContext.setInstanceId(jc.getInstanceId());
jobContext.setJobName(jc.getJobName());
}

public void setRestartOn(final String restartOn) {
@@ -54,7 +54,7 @@ InternalJobExecution restartJob(long executionID, Properties overrideJobParamete

List<BatchPartitionWorkUnit> buildNewParallelPartitions(PartitionsBuilderConfig config, JobContextImpl jc, StepContextImpl sc) throws JobRestartException, JobStartException;

List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(PartitionsBuilderConfig config) throws JobRestartException,
List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(PartitionsBuilderConfig config, JobContextImpl jc, StepContextImpl sc) throws JobRestartException,
JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException;

void startGeneratedJob(BatchWorkUnit batchWork);
@@ -64,10 +64,10 @@ List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(PartitionsBuilderC
boolean isExecutionRunning(long executionId);

BatchFlowInSplitWorkUnit buildNewFlowInSplitWorkUnit(
FlowInSplitBuilderConfig config);
FlowInSplitBuilderConfig config, JobContextImpl jc);

BatchFlowInSplitWorkUnit buildOnRestartFlowInSplitWorkUnit(
FlowInSplitBuilderConfig config);
FlowInSplitBuilderConfig config, JobContextImpl jc);


}
@@ -20,6 +20,7 @@
import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.exception.BatchContainerServiceException;
import org.apache.batchee.container.services.callback.SimpleJobExecutionCallbackService;
import org.apache.batchee.container.services.data.DefaultDataRepresentationService;
import org.apache.batchee.container.services.executor.DefaultThreadPoolService;
import org.apache.batchee.container.services.factory.CDIBatchArtifactFactory;
import org.apache.batchee.container.services.factory.DefaultBatchArtifactFactory;
@@ -34,6 +35,7 @@
import org.apache.batchee.spi.BatchArtifactFactory;
import org.apache.batchee.spi.BatchService;
import org.apache.batchee.spi.BatchThreadPoolService;
import org.apache.batchee.spi.DataRepresentationService;
import org.apache.batchee.spi.JobExecutionCallbackService;
import org.apache.batchee.spi.JobXMLLoaderService;
import org.apache.batchee.spi.PersistenceManagerService;
@@ -65,6 +67,7 @@ public class ServicesManager implements BatchContainerConstants {
SERVICE_IMPL_CLASS_NAMES.put(JobXMLLoaderService.class.getName(), DefaultJobXMLLoaderService.class.getName());
SERVICE_IMPL_CLASS_NAMES.put(SecurityService.class.getName(), DefaultSecurityService.class.getName());
SERVICE_IMPL_CLASS_NAMES.put(JobExecutionCallbackService.class.getName(), SimpleJobExecutionCallbackService.class.getName());
SERVICE_IMPL_CLASS_NAMES.put(DataRepresentationService.class.getName(), DefaultDataRepresentationService.class.getName());
try {
Thread.currentThread().getContextClassLoader().loadClass("javax.enterprise.inject.spi.BeanManager");
SERVICE_IMPL_CLASS_NAMES.put(BatchArtifactFactory.class.getName(), CDIBatchArtifactFactory.class.getName());

0 comments on commit 7592acb

Please sign in to comment.