Skip to content
Permalink
Browse files
BATCHEE-29 move listener proxies to real proxies
  • Loading branch information
struberg committed Apr 24, 2014
1 parent cb289b9 commit 458f304160d5cccaa615283471ec5d9496dd5a3f
Show file tree
Hide file tree
Showing 20 changed files with 179 additions and 510 deletions.
@@ -20,10 +20,10 @@
import org.apache.batchee.container.ThreadRootController;
import org.apache.batchee.container.impl.JobContextImpl;
import org.apache.batchee.container.impl.StepContextImpl;
import org.apache.batchee.container.impl.controller.chunk.ExceptionConfig;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.navigator.ModelNavigator;
import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.JobListenerProxy;
import org.apache.batchee.container.proxy.ListenerFactory;
import org.apache.batchee.container.services.JobStatusManagerService;
import org.apache.batchee.container.services.ServicesManager;
@@ -34,6 +34,7 @@
import org.apache.batchee.spi.BatchArtifactFactory;
import org.apache.batchee.spi.PersistenceManagerService;

import javax.batch.api.listener.JobListener;
import javax.batch.runtime.BatchStatus;
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -261,17 +262,27 @@ public void stop() {

// Call beforeJob() on all the job listeners
protected void jobListenersBeforeJob() {
final List<JobListenerProxy> jobListeners = listenerFactory.getJobListeners();
for (final JobListenerProxy listenerProxy : jobListeners) {
listenerProxy.beforeJob();
InjectionReferences injectionRef = new InjectionReferences(jobContext, null, null);
final List<JobListener> jobListeners = listenerFactory.getJobListeners(injectionRef);
for (final JobListener listenerProxy : jobListeners) {
try {
listenerProxy.beforeJob();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
}
}

// Call afterJob() on all the job listeners
private void jobListenersAfterJob() {
final List<JobListenerProxy> jobListeners = listenerFactory.getJobListeners();
for (final JobListenerProxy listenerProxy : jobListeners) {
listenerProxy.afterJob();
InjectionReferences injectionRef = new InjectionReferences(jobContext, null, null);
final List<JobListener> jobListeners = listenerFactory.getJobListeners(injectionRef);
for (final JobListener listenerProxy : jobListeners) {
try {
listenerProxy.afterJob();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
}
}

@@ -19,24 +19,24 @@
import org.apache.batchee.container.exception.BatchContainerServiceException;
import org.apache.batchee.container.impl.controller.SingleThreadedStepController;
import org.apache.batchee.container.impl.StepContextImpl;
import org.apache.batchee.container.impl.controller.chunk.ExceptionConfig;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.proxy.BatchletProxy;
import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.PartitionDataWrapper;
import org.apache.batchee.jaxb.Batchlet;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
import org.apache.batchee.spi.BatchArtifactFactory;

import javax.batch.api.Batchlet;
import javax.batch.runtime.BatchStatus;
import java.util.List;
import java.util.concurrent.BlockingQueue;

public class BatchletStepController extends SingleThreadedStepController {
private final BatchArtifactFactory factory;
private BatchletProxy batchletProxy;
private Batchlet batchletProxy;

public BatchletStepController(final RuntimeJobExecution jobExecutionImpl, final Step step,
final StepContextImpl stepContext, final long rootJobExecutionId,
@@ -46,14 +46,19 @@ public BatchletStepController(final RuntimeJobExecution jobExecutionImpl, final
factory = manager.service(BatchArtifactFactory.class);
}

private void invokeBatchlet(final Batchlet batchlet) throws BatchContainerServiceException {
private void invokeBatchlet(final org.apache.batchee.jaxb.Batchlet batchlet) throws BatchContainerServiceException {
final String batchletId = batchlet.getRef();
final List<Property> propList = (batchlet.getProperties() == null) ? null : batchlet.getProperties().getPropertyList();
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, propList);
batchletProxy = ProxyFactory.createBatchletProxy(factory, batchletId, injectionRef, stepContext, jobExecutionImpl);

if (!wasStopIssued()) {
final String processRetVal = batchletProxy.process();
String processRetVal = null;
try {
processRetVal = batchletProxy.process();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
stepContext.setBatchletProcessRetVal(processRetVal);
}
}
@@ -93,7 +98,11 @@ public synchronized void stop() {
stepContext.setBatchStatus(BatchStatus.STOPPING);

if (batchletProxy != null) {
batchletProxy.stop();
try {
batchletProxy.stop();
} catch (Exception e) {
ExceptionConfig.wrapBatchException(e);
}
}
}/* else {
// TODO do we need to throw an error if the batchlet is already stopping/stopped
@@ -23,15 +23,8 @@
import org.apache.batchee.container.impl.controller.SingleThreadedStepController;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.proxy.CheckpointAlgorithmProxy;
import org.apache.batchee.container.proxy.ChunkListenerProxy;
import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.container.proxy.RetryProcessListenerProxy;
import org.apache.batchee.container.proxy.RetryReadListenerProxy;
import org.apache.batchee.container.proxy.RetryWriteListenerProxy;
import org.apache.batchee.container.proxy.SkipProcessListenerProxy;
import org.apache.batchee.container.proxy.SkipReadListenerProxy;
import org.apache.batchee.container.proxy.SkipWriteListenerProxy;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.PartitionDataWrapper;
import org.apache.batchee.container.util.TCCLObjectInputStream;
@@ -45,9 +38,16 @@
import javax.batch.api.chunk.ItemProcessor;
import javax.batch.api.chunk.ItemReader;
import javax.batch.api.chunk.ItemWriter;
import javax.batch.api.chunk.listener.ChunkListener;
import javax.batch.api.chunk.listener.ItemProcessListener;
import javax.batch.api.chunk.listener.ItemReadListener;
import javax.batch.api.chunk.listener.ItemWriteListener;
import javax.batch.api.chunk.listener.RetryProcessListener;
import javax.batch.api.chunk.listener.RetryReadListener;
import javax.batch.api.chunk.listener.RetryWriteListener;
import javax.batch.api.chunk.listener.SkipProcessListener;
import javax.batch.api.chunk.listener.SkipReadListener;
import javax.batch.api.chunk.listener.SkipWriteListener;
import javax.batch.runtime.BatchStatus;
import java.io.ByteArrayInputStream;
import java.io.Serializable;
@@ -74,7 +74,7 @@ public class ChunkStepController extends SingleThreadedStepController {
private SkipHandler skipHandler = null;
private CheckpointDataKey readerChkptDK = null;
private CheckpointDataKey writerChkptDK = null;
private List<ChunkListenerProxy> chunkListeners = null;
private List<ChunkListener> chunkListeners = null;
private List<ItemReadListener> itemReadListeners = null;
private List<ItemProcessListener> itemProcessListeners = null;
private List<ItemWriteListener> itemWriteListeners = null;
@@ -252,7 +252,7 @@ private Object readItem(ItemStatus status) {
try {
readListenerProxy.onReadError(e);
} catch (Exception e1) {
handleBatchException(e1);
ExceptionConfig.wrapBatchException(e1);
}
}
if (!rollbackRetry) {
@@ -261,7 +261,7 @@ private Object readItem(ItemStatus status) {
try {
readListenerProxy.onReadError(e);
} catch (Exception e1) {
handleBatchException(e1);
ExceptionConfig.wrapBatchException(e1);
}
}
// if not a rollback exception, just retry the current item
@@ -306,13 +306,6 @@ private Object readItem(ItemStatus status) {
return itemRead;
}

private void handleBatchException(Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new BatchContainerRuntimeException(e);
}
}

/**
* Process an item previously read by the reader
@@ -352,7 +345,7 @@ private Object processItem(final Object itemRead, final ItemStatus status) {
try {
processListenerProxy.onProcessError(processedItem, e);
} catch (Exception e1) {
handleBatchException(e1);
ExceptionConfig.wrapBatchException(e1);
}
}
if (!rollbackRetry) {
@@ -364,7 +357,7 @@ private Object processItem(final Object itemRead, final ItemStatus status) {
try {
processListenerProxy.beforeProcess(itemRead);
} catch (Exception e1) {
handleBatchException(e1);
ExceptionConfig.wrapBatchException(e1);
}
}
processedItem = processItem(itemRead, status);
@@ -378,7 +371,7 @@ private Object processItem(final Object itemRead, final ItemStatus status) {
try {
processListenerProxy.afterProcess(itemRead, processedItem);
} catch (Exception e1) {
handleBatchException(e1);
ExceptionConfig.wrapBatchException(e1);
}
}
} else {
@@ -405,7 +398,7 @@ private Object processItem(final Object itemRead, final ItemStatus status) {
try {
processListenerProxy.beforeProcess(itemRead);
} catch (Exception e1) {
handleBatchException(e1);
ExceptionConfig.wrapBatchException(e1);
}
}
processedItem = processItem(itemRead, status);
@@ -419,7 +412,7 @@ private Object processItem(final Object itemRead, final ItemStatus status) {
try {
processListenerProxy.afterProcess(itemRead, processedItem);
} catch (Exception e1) {
handleBatchException(e1);
ExceptionConfig.wrapBatchException(e1);
}
}
} else {
@@ -466,7 +459,7 @@ private void writeChunk(List<Object> theChunk, ItemStatus status) {
try {
writeListenerProxy.onWriteError(theChunk, e);
} catch (Exception e1) {
handleBatchException(e1);
ExceptionConfig.wrapBatchException(e1);
}
}
if (!rollbackRetry) {
@@ -530,7 +523,7 @@ private void invokeChunk() {
transactionManager.setTransactionTimeout(newtimeOut);
}
transactionManager.begin();
for (ChunkListenerProxy chunkProxy : chunkListeners) {
for (ChunkListener chunkProxy : chunkListeners) {
chunkProxy.beforeChunk();
}

@@ -588,7 +581,7 @@ private void invokeChunk() {

checkpointManager.checkpoint();

for (ChunkListenerProxy chunkProxy : chunkListeners) {
for (ChunkListener chunkProxy : chunkListeners) {
chunkProxy.afterChunk();
}

@@ -624,7 +617,7 @@ private void invokeChunk() {
} catch (final Exception e) {
logger.log(Level.SEVERE, "Failure in Read-Process-Write Loop", e);
// Only try to call onError() if we have an Exception, but not an Error.
for (ChunkListenerProxy chunkProxy : chunkListeners) {
for (ChunkListener chunkProxy : chunkListeners) {
try {
chunkProxy.onError(e);
} catch (final Exception e1) {
@@ -719,17 +712,17 @@ private void initializeChunkArtifacts() {
this.itemReadListeners = jobExecutionImpl.getListenerFactory().getItemReadListeners(step, injectionRef, stepContext, jobExecutionImpl);
this.itemProcessListeners = jobExecutionImpl.getListenerFactory().getItemProcessListeners(step, injectionRef, stepContext, jobExecutionImpl);
this.itemWriteListeners = jobExecutionImpl.getListenerFactory().getItemWriteListeners(step, injectionRef, stepContext, jobExecutionImpl);
final List<SkipProcessListenerProxy> skipProcessListeners
final List<SkipProcessListener> skipProcessListeners
= jobExecutionImpl.getListenerFactory().getSkipProcessListeners(step, injectionRef, stepContext, jobExecutionImpl);
final List<SkipReadListenerProxy> skipReadListeners
final List<SkipReadListener> skipReadListeners
= jobExecutionImpl.getListenerFactory().getSkipReadListeners(step, injectionRef, stepContext, jobExecutionImpl);
final List<SkipWriteListenerProxy> skipWriteListeners
final List<SkipWriteListener> skipWriteListeners
= jobExecutionImpl.getListenerFactory().getSkipWriteListeners(step, injectionRef, stepContext, jobExecutionImpl);
final List<RetryProcessListenerProxy> retryProcessListeners
final List<RetryProcessListener> retryProcessListeners
= jobExecutionImpl.getListenerFactory().getRetryProcessListeners(step, injectionRef, stepContext, jobExecutionImpl);
final List<RetryReadListenerProxy> retryReadListeners
final List<RetryReadListener> retryReadListeners
= jobExecutionImpl.getListenerFactory().getRetryReadListeners(step, injectionRef, stepContext, jobExecutionImpl);
final List<RetryWriteListenerProxy> retryWriteListeners
final List<RetryWriteListener> retryWriteListeners
= jobExecutionImpl.getListenerFactory().getRetryWriteListeners(step, injectionRef, stepContext, jobExecutionImpl);

if ("item".equals(checkpointProxy.getCheckpointType())) {
@@ -808,7 +801,7 @@ private void openReaderAndWriter() {
try {
writerProxy.open(null);
} catch (Exception e) {
handleBatchException(e);
ExceptionConfig.wrapBatchException(e);
}
}
} catch (final ClassCastException e) {
@@ -21,11 +21,25 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.batchee.container.exception.BatchContainerRuntimeException;

public class ExceptionConfig {
protected final Set<String> includes = new HashSet<String>();
protected final Set<String> excludes = new HashSet<String>();
private final ConcurrentMap<Class<? extends Exception>, Boolean> cache = new ConcurrentHashMap<Class<? extends Exception>, Boolean>();

/**
* Helper method to wrap unknown Exceptions into a {@link org.apache.batchee.container.exception.BatchContainerRuntimeException}.
* This method can be used to handle Exceptions which are actually already catched inside of our proxies.
*/
public static void wrapBatchException(Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new BatchContainerRuntimeException(e);
}
}

public Set<String> getIncludes() {
return includes;
}
@@ -92,4 +106,5 @@ private static int score(final Class<?> config, final Class<?> ex) {
}
return score;
}

}

0 comments on commit 458f304

Please sign in to comment.