Skip to content
Permalink
Browse files
BATCHEE-29 deleted even more 'manual' proxies
  • Loading branch information
struberg committed Apr 24, 2014
1 parent 458f304 commit b5a976081e0a0c1d73c4d6ec2a6c9646b12ded3f
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 347 deletions.
@@ -18,9 +18,9 @@

import org.apache.batchee.container.ExecutionElementController;
import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.impl.controller.chunk.ExceptionConfig;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.jsl.ExecutionElement;
import org.apache.batchee.container.proxy.DeciderProxy;
import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.container.services.ServicesManager;
@@ -31,6 +31,7 @@
import org.apache.batchee.spi.BatchArtifactFactory;
import org.apache.batchee.spi.PersistenceManagerService;

import javax.batch.api.Decider;
import javax.batch.runtime.StepExecution;
import java.util.List;

@@ -64,9 +65,14 @@ public ExecutionStatus execute() {
//so two of these contexts will always be null
final InjectionReferences injectionRef = new InjectionReferences(jobExecution.getJobContext(), null, propList);

final DeciderProxy deciderProxy = ProxyFactory.createDeciderProxy(factory, deciderId, injectionRef, jobExecution);
final Decider deciderProxy = ProxyFactory.createDeciderProxy(factory, deciderId, injectionRef, jobExecution);

final String exitStatus = deciderProxy.decide(this.previousStepExecutions);
String exitStatus = null;
try {
exitStatus = deciderProxy.decide(this.previousStepExecutions);
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}

//Set the value returned from the decider as the job context exit status.
this.jobExecution.getJobContext().setExitStatus(exitStatus);
@@ -18,14 +18,11 @@

import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.impl.StepContextImpl;
import org.apache.batchee.container.impl.controller.chunk.ExceptionConfig;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.jsl.CloneUtility;
import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.PartitionAnalyzerProxy;
import org.apache.batchee.container.proxy.PartitionMapperProxy;
import org.apache.batchee.container.proxy.PartitionReducerProxy;
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.container.proxy.StepListenerProxy;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.BatchPartitionPlan;
import org.apache.batchee.container.util.BatchPartitionWorkUnit;
@@ -36,13 +33,15 @@
import org.apache.batchee.jaxb.Analyzer;
import org.apache.batchee.jaxb.JSLJob;
import org.apache.batchee.jaxb.JSLProperties;
import org.apache.batchee.jaxb.PartitionMapper;
import org.apache.batchee.jaxb.PartitionReducer;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
import org.apache.batchee.spi.BatchArtifactFactory;

import javax.batch.api.listener.StepListener;
import javax.batch.api.partition.PartitionAnalyzer;
import javax.batch.api.partition.PartitionMapper;
import javax.batch.api.partition.PartitionPlan;
import javax.batch.api.partition.PartitionReducer;
import javax.batch.api.partition.PartitionReducer.PartitionStatus;
import javax.batch.operations.JobExecutionAlreadyCompleteException;
import javax.batch.operations.JobExecutionNotMostRecentException;
@@ -70,15 +69,15 @@ public class PartitionedStepController extends BaseStepController {

private volatile List<BatchPartitionWorkUnit> parallelBatchWorkUnits;

private PartitionReducerProxy partitionReducerProxy = null;
private PartitionReducer partitionReducerProxy = null;

// On invocation this will be re-primed to reflect already-completed partitions from a previous execution.
int numPreviouslyCompleted = 0;

private PartitionAnalyzerProxy analyzerProxy = null;
private PartitionAnalyzer analyzerProxy = null;

final List<JSLJob> subJobs = new ArrayList<JSLJob>();
protected List<StepListenerProxy> stepListeners = null;
protected List<StepListener> stepListeners = null;

List<BatchPartitionWorkUnit> completedWork = new ArrayList<BatchPartitionWorkUnit>();

@@ -121,7 +120,7 @@ private PartitionPlan generatePartitionPlan() {

PartitionPlan plan = null;
Integer previousNumPartitions = null;
final PartitionMapper partitionMapper = step.getPartition().getMapper();
final org.apache.batchee.jaxb.PartitionMapper partitionMapper = step.getPartition().getMapper();

//from persisted plan from previous run
if (stepStatus.getNumPartitions() != null) {
@@ -136,11 +135,16 @@ private PartitionPlan generatePartitionPlan() {
// Set all the contexts associated with this controller.
// Some of them may be null
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propertyList);
final PartitionMapperProxy partitionMapperProxy =
final PartitionMapper partitionMapperProxy =
ProxyFactory.createPartitionMapperProxy(factory, partitionMapper.getRef(), injectionRef, stepContext, jobExecutionImpl);


final PartitionPlan mapperPlan = partitionMapperProxy.mapPartitions();
PartitionPlan mapperPlan = null;
try {
mapperPlan = partitionMapperProxy.mapPartitions();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}

//Set up the new partition plan
plan = new BatchPartitionPlan();
@@ -255,7 +259,11 @@ protected void invokeCoreStep() throws JobRestartException, JobStartException, J
*/
if (plan.getPartitionsOverride()) {
if (this.partitionReducerProxy != null) {
this.partitionReducerProxy.rollbackPartitionedStep();
try {
this.partitionReducerProxy.rollbackPartitionedStep();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
}
}

@@ -325,10 +333,18 @@ private void executeAndWaitForCompletion() throws JobRestartException {
if (analyzerProxy != null) {
PartitionDataWrapper dataWrapper = analyzerStatusQueue.take();
if (PartitionEventType.ANALYZE_COLLECTOR_DATA.equals(dataWrapper.getEventType())) {
analyzerProxy.analyzeCollectorData(dataWrapper.getCollectorData());
try {
analyzerProxy.analyzeCollectorData(dataWrapper.getCollectorData());
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
continue; // without being ready to submit another
} else if (PartitionEventType.ANALYZE_STATUS.equals(dataWrapper.getEventType())) {
analyzerProxy.analyzeStatus(dataWrapper.getBatchstatus(), dataWrapper.getExitStatus());
try {
analyzerProxy.analyzeStatus(dataWrapper.getBatchstatus(), dataWrapper.getExitStatus());
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
completedWork.add(completedWorkQueue.take()); // Shouldn't be a a long wait.
} else {
throw new IllegalStateException("Invalid partition state");
@@ -388,12 +404,20 @@ private void checkCompletedWork() {
//We are assuming that not providing a rollback was intentional
if (rollback) {
if (this.partitionReducerProxy != null) {
this.partitionReducerProxy.rollbackPartitionedStep();
try {
this.partitionReducerProxy.rollbackPartitionedStep();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
}
throw new BatchContainerRuntimeException("One or more partitions failed");
} else {
if (this.partitionReducerProxy != null) {
this.partitionReducerProxy.beforePartitionedStepCompletion();
try {
this.partitionReducerProxy.beforePartitionedStepCompletion();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
}
}
}
@@ -410,7 +434,7 @@ protected void setupStepArtifacts() {
analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(factory, analyzer.getRef(), injectionRef, stepContext, jobExecutionImpl);
}

final PartitionReducer partitionReducer = step.getPartition().getReducer();
final org.apache.batchee.jaxb.PartitionReducer partitionReducer = step.getPartition().getReducer();
if (partitionReducer != null) {
final List<Property> propList = partitionReducer.getProperties() == null ? null : partitionReducer.getProperties().getPropertyList();
injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
@@ -423,16 +447,24 @@ protected void setupStepArtifacts() {
protected void invokePreStepArtifacts() {

if (stepListeners != null) {
for (StepListenerProxy listenerProxy : stepListeners) {
for (StepListener listenerProxy : stepListeners) {
// Call beforeStep on all the step listeners
listenerProxy.beforeStep();
try {
listenerProxy.beforeStep();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
}
}

// Invoke the reducer before all parallel steps start (must occur
// before mapper as well)
if (this.partitionReducerProxy != null) {
this.partitionReducerProxy.beginPartitionedStep();
try {
this.partitionReducerProxy.beginPartitionedStep();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
}

}
@@ -442,19 +474,26 @@ protected void invokePostStepArtifacts() {
// Invoke the reducer after all parallel steps are done
if (this.partitionReducerProxy != null) {

if ((BatchStatus.COMPLETED).equals(stepContext.getBatchStatus())) {
this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.COMMIT);
} else {
this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.ROLLBACK);
try {
if ((BatchStatus.COMPLETED).equals(stepContext.getBatchStatus())) {
this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.COMMIT);
} else {
this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionStatus.ROLLBACK);
}
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}

}

// Called in spec'd order, e.g. Sec. 11.7
if (stepListeners != null) {
for (StepListenerProxy listenerProxy : stepListeners) {
for (StepListener listenerProxy : stepListeners) {
// Call afterStep on all the step listeners
listenerProxy.afterStep();
try {
listenerProxy.afterStep();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
}
}
}
@@ -18,11 +18,10 @@

import org.apache.batchee.container.Controller;
import org.apache.batchee.container.impl.StepContextImpl;
import org.apache.batchee.container.impl.controller.chunk.ExceptionConfig;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.PartitionCollectorProxy;
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.container.proxy.StepListenerProxy;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.PartitionDataWrapper;
import org.apache.batchee.container.util.PartitionDataWrapper.PartitionEventType;
@@ -35,6 +34,9 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;

import javax.batch.api.listener.StepListener;
import javax.batch.api.partition.PartitionCollector;

/**
* When a partitioned step is run, this controller will only be used for the partition threads,
* NOT the top-level main thread that the step executes upon.
@@ -46,7 +48,7 @@ public abstract class SingleThreadedStepController extends BaseStepController im
private final BatchArtifactFactory factory;

// Collector only used from partition threads, not main thread
protected PartitionCollectorProxy collectorProxy = null;
protected PartitionCollector collectorProxy = null;

protected SingleThreadedStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
final StepContextImpl stepContext, final long rootJobExecutionId,
@@ -56,7 +58,7 @@ protected SingleThreadedStepController(final RuntimeJobExecution jobExecutionImp
factory = servicesManager.service(BatchArtifactFactory.class);
}

List<StepListenerProxy> stepListeners = null;
List<StepListener> stepListeners = null;

protected void setupStepArtifacts() {
// set up listeners
@@ -85,8 +87,12 @@ protected void invokePreStepArtifacts() {
// Don't call beforeStep() in the partitioned case, since we are now on a partition thread, and
// have already called beforeStep() on the main thread as the spec says.
if ((stepListeners != null) && (this.jobExecutionImpl.getPartitionInstance() == null)) {
for (StepListenerProxy listenerProxy : stepListeners) {
listenerProxy.beforeStep();
for (StepListener listenerProxy : stepListeners) {
try {
listenerProxy.beforeStep();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
}
}
}
@@ -96,16 +102,24 @@ protected void invokePostStepArtifacts() {
// Don't call beforeStep() in the partitioned case, since we are now on a partition thread, and
// have already called beforeStep() on the main thread as the spec says.
if ((stepListeners != null) && (this.jobExecutionImpl.getPartitionInstance() == null)) {
for (StepListenerProxy listenerProxy : stepListeners) {
listenerProxy.afterStep();
for (StepListener listenerProxy : stepListeners) {
try {
listenerProxy.afterStep();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
}
}
}

protected void invokeCollectorIfPresent() {
if (collectorProxy != null) {
final Serializable data = collectorProxy.collectPartitionData();
sendCollectorDataToAnalyzerIfPresent(data);
try {
final Serializable data = collectorProxy.collectPartitionData();
sendCollectorDataToAnalyzerIfPresent(data);
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
}
}

This file was deleted.

@@ -284,14 +284,13 @@ public List<SkipWriteListener> getSkipWriteListeners(final Step step, final Inje
return retVal;
}

public List<StepListenerProxy> getStepListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext,
public List<StepListener> getStepListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext,
final RuntimeJobExecution execution) {
final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
final List<StepListenerProxy> retVal = new ArrayList<StepListenerProxy>();
final List<StepListener> retVal = new ArrayList<StepListener>();
for (final ListenerInfo li : stepListenerInfo) {
if (li.isStepListener()) {
final StepListenerProxy proxy = new StepListenerProxy((StepListener) li.getArtifact());
proxy.setStepContext(stepContext);
final StepListener proxy = ProxyFactory.createProxy((StepListener) li.getArtifact(), injectionRefs);
retVal.add(proxy);
}
}

0 comments on commit b5a9760

Please sign in to comment.