Skip to content
Permalink
Browse files
BATCHEE-29 migrate ItemReaderProxy to a 'real' proxy
  • Loading branch information
struberg committed Apr 23, 2014
1 parent 7b4fe14 commit 1f087413948c5a1fe6e0986144e767e594d91aca
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 83 deletions.
@@ -18,24 +18,25 @@

import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.exception.BatchContainerServiceException;
import org.apache.batchee.container.proxy.ItemReaderProxy;
import org.apache.batchee.container.proxy.ItemWriterProxy;
import org.apache.batchee.spi.PersistenceManagerService;

import javax.batch.api.chunk.CheckpointAlgorithm;
import javax.batch.api.chunk.ItemReader;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;

public class CheckpointManager {
private final PersistenceManagerService persistenceManagerService;
private final ItemReaderProxy readerProxy;
private final ItemReader readerProxy;
private final ItemWriterProxy writerProxy;
private final CheckpointAlgorithm checkpointAlgorithm;
private final String stepId;
private final long jobInstanceID;


public CheckpointManager(final ItemReaderProxy reader, final ItemWriterProxy writer,
public CheckpointManager(final ItemReader reader, final ItemWriterProxy writer,
final CheckpointAlgorithm chkptAlg,
final long jobInstanceID, final String stepId,
final PersistenceManagerService persistenceManagerService) {
@@ -28,7 +28,6 @@
import org.apache.batchee.container.proxy.ItemProcessListenerProxy;
import org.apache.batchee.container.proxy.ItemProcessorProxy;
import org.apache.batchee.container.proxy.ItemReadListenerProxy;
import org.apache.batchee.container.proxy.ItemReaderProxy;
import org.apache.batchee.container.proxy.ItemWriteListenerProxy;
import org.apache.batchee.container.proxy.ItemWriterProxy;
import org.apache.batchee.container.proxy.ProxyFactory;
@@ -43,14 +42,14 @@
import org.apache.batchee.container.util.TCCLObjectInputStream;
import org.apache.batchee.jaxb.Chunk;
import org.apache.batchee.jaxb.ItemProcessor;
import org.apache.batchee.jaxb.ItemReader;
import org.apache.batchee.jaxb.ItemWriter;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
import org.apache.batchee.spi.BatchArtifactFactory;
import org.apache.batchee.spi.PersistenceManagerService;

import javax.batch.api.chunk.CheckpointAlgorithm;
import javax.batch.api.chunk.ItemReader;
import javax.batch.runtime.BatchStatus;
import java.io.ByteArrayInputStream;
import java.io.Serializable;
@@ -68,7 +67,7 @@ public class ChunkStepController extends SingleThreadedStepController {
private final BatchArtifactFactory artifactFactory;

private Chunk chunk = null;
private ItemReaderProxy readerProxy = null;
private ItemReader readerProxy = null;
private ItemProcessorProxy processorProxy = null;
private ItemWriterProxy writerProxy = null;
private CheckpointAlgorithmProxy checkpointProxy = null;
@@ -602,7 +601,11 @@ private void invokeChunk() {

private void rollback(final Throwable t) {
transactionManager.setRollbackOnly();
readerProxy.close();
try {
readerProxy.close();
} catch (Exception e) {
// ignore, we blow up anyway
}
writerProxy.close();
transactionManager.rollback();
throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", t);
@@ -637,7 +640,7 @@ private void initializeChunkArtifacts() {
final int timeInterval = ChunkHelper.getTimeLimit(chunk);

{
final ItemReader itemReader = chunk.getReader();
final org.apache.batchee.jaxb.ItemReader itemReader = chunk.getReader();
final List<Property> itemReaderProps = itemReader.getProperties() == null ? null : itemReader.getProperties().getPropertyList();
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemReaderProps);
readerProxy = ProxyFactory.createItemReaderProxy(artifactFactory, itemReader.getRef(), injectionRef, stepContext, jobExecutionImpl);
@@ -735,7 +738,12 @@ private void openReaderAndWriter() {
} else {
// no chkpt data exists in the backing store
readerChkptData = null;
readerProxy.open(null);
try {
readerProxy.open(null);
} catch (final Exception ex) {
// is this what I should be throwing here?
throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
}
}
} catch (final ClassCastException e) {
throw new IllegalStateException("Expected CheckpointData but found" + readerChkptData);
@@ -861,7 +869,12 @@ private void positionReaderAtCheckpoint() {
} else {
// no chkpt data exists in the backing store
readerData = null;
readerProxy.open(null);
try {
readerProxy.open(null);
} catch (final Exception ex) {
// is this what I should be throwing here?
throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
}
}
} catch (final ClassCastException e) {
throw new IllegalStateException("Expected CheckpointData but found" + readerData);
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.container.proxy;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;

import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.impl.StepContextImpl;

/**
* Proxy handler for all our Batch parts like ItemReader,
* ItemWriter, etc.
* Most batch part methods store away the exception in the StepContext. A few methods
* need to be excluded from that rule in order to provide skip/retry logic.
*
*/
public class BatchProxyInvocationHandler implements InvocationHandler {

private final StepContextImpl stepContext;
private final Object delegate;
private final String[] nonExceptionHandlingMethods;

/**
*
* @param delegate the internal delegate which does the real job
* @param stepContext used to store any Exceptions
* @param nonExceptionHandlingMethods array of methods which should not cause the Exceptions to be stored in the StepContext
*/
public BatchProxyInvocationHandler(Object delegate, StepContextImpl stepContext, String... nonExceptionHandlingMethods) {
this.delegate = delegate;
this.stepContext = stepContext;
this.nonExceptionHandlingMethods = nonExceptionHandlingMethods;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try {
return method.invoke(delegate, args);
} catch (Throwable e) {
if (e instanceof InvocationTargetException) {
e = ((InvocationTargetException) e).getCause();
}

if (nonExceptionHandlingMethods == null || Arrays.binarySearch(nonExceptionHandlingMethods, method.getName()) < 0) {
if (e instanceof Exception) {
this.stepContext.setException((Exception) e);
throw new BatchContainerRuntimeException(e);
}
}

throw e;
}
}
}

This file was deleted.

@@ -16,6 +16,10 @@
*/
package org.apache.batchee.container.proxy;

import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.List;

import org.apache.batchee.container.impl.StepContextImpl;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.spi.BatchArtifactFactory;
@@ -35,7 +39,7 @@
* Introduce a level of indirection so proxies are not instantiated directly by newing them up.
*/
public class ProxyFactory {
private static final ThreadLocal<InjectionReferences> INJECTION_CONTEXT = new ThreadLocal<InjectionReferences>();
static final ThreadLocal<InjectionReferences> INJECTION_CONTEXT = new ThreadLocal<InjectionReferences>();

private ProxyFactory() {
// private utility class ct
@@ -95,12 +99,30 @@ public static CheckpointAlgorithmProxy createCheckpointAlgorithmProxy(final Batc
return proxy;
}

public static ItemReaderProxy createItemReaderProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs,
public static ItemReader createItemReaderProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs,
final StepContextImpl stepContext, final RuntimeJobExecution execution) {
final ItemReader loadedArtifact = (ItemReader) loadArtifact(factory, id, injectionRefs, execution);
final ItemReaderProxy proxy = new ItemReaderProxy(loadedArtifact);
proxy.setStepContext(stepContext);
return proxy;
return (ItemReader) Proxy.newProxyInstance(loadedArtifact.getClass().getClassLoader(), getInterfaces(loadedArtifact.getClass()),
new BatchProxyInvocationHandler(loadedArtifact, stepContext, "readItem"));
}

/**
* @return all the interfaces fo the given class and it's superclasses
*/
private static Class<?>[] getInterfaces(Class<?> clazz) {
if (clazz.getSuperclass() == Object.class) {
return clazz.getInterfaces();
} else {
List<Class<?>> clazzes = new ArrayList<Class<?>>();
while (clazz != Object.class) {
for (Class<?> interfaceClass : clazz.getInterfaces()) {
clazzes.add(interfaceClass);
}
clazz = clazz.getSuperclass();
}

return clazzes.toArray(new Class<?>[clazzes.size()]);
}
}

public static ItemProcessorProxy createItemProcessorProxy(final BatchArtifactFactory factory, final String id, final InjectionReferences injectionRefs,

0 comments on commit 1f08741

Please sign in to comment.