Skip to content
Permalink
Browse files

Added Exception Handling

  • Loading branch information...
Ghost-Programmer committed Feb 1, 2019
1 parent fb410e2 commit a94cde215684592981c2c08290b7c956cba95e28
@@ -42,7 +42,7 @@ public R getCollection() {
* Object)
*/
@Override
public S process(S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) {
public S process(S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) throws Throwable {
this.collector.accumulator().accept(this.supplier, data);
return data;
}
@@ -0,0 +1,32 @@
package name.mymiller.extensions.utils.pipelines;

import java.util.List;

/**
* Interface for Exception handling in pipelines.
*
* @author jmiller
*
*/
public interface ExceptionHandlerInterface {

/**
* Process Exceptions to determine if processing should continue to occur
*
* @param throwable
* Throwable caught.
* @param pipe
* PipeInterface that received the exception
* @param data
* Data passed to the pipe.
* @param futures
* Futures passed to the pipe.
* @param pipelineName
* Name of the pipeline.
* @param isParallel
* Indicates if the processing is occurring in Parallel
* @return Boolean true indicates attempt to process, false abort.
*/
public abstract boolean process(Throwable throwable, PipeInterface<?, ?> pipe, final Object data,
List<PipeFuture<?>> futures, String pipelineName, boolean isParallel);
}
@@ -24,7 +24,8 @@
* @param data
* Object being pushed to the pipe
* @return Object after processing has occurred.
* @throws Throwable TODO
*/
public abstract T process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel);
public abstract T process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) throws Throwable;

}
@@ -66,11 +66,30 @@ public ActionPipe(Function<? super S, ? extends T> action) {
* Object)
*/
@Override
public T process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) {
public T process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
return this.action.apply(data);
}
}

private class DefaultExceptionHandler implements ExceptionHandlerInterface {

@Override
public boolean process(Throwable throwable, PipeInterface<?, ?> pipe, Object data, List<PipeFuture<?>> futures,
String pipelineName, boolean isParallel) {
System.out.println("Pipe: " + pipe.getClass().getName());
System.out.println("Date: " + data.toString());
System.out.println("Pipeline: " + pipelineName);
System.out.println("Parallel: " + isParallel);
System.out.println("Exception: " + throwable.getMessage());
System.out.println("Stacktrace: ");
throwable.printStackTrace(System.out);

return false;
}

}

/**
* Pipe segment used to allow only distinct data blocks duplicate data blocks
* from being further processed
@@ -109,7 +128,8 @@ public DistinctByPipe(Function<? super S, Object> keyExtractor, boolean onDistin
}

@Override
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) {
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
if (this.seen.putIfAbsent(this.keyExtractor.apply(data), Boolean.TRUE) == null) {
if (this.onDistinct) {
return data;
@@ -159,7 +179,8 @@ public DistinctPipe(boolean onDistinct) {
}

@Override
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) {
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
if (!this.distinctList.contains(data)) {
this.distinctList.add(data);
if (this.onDistinct) {
@@ -204,7 +225,8 @@ public FilterPipe(Predicate<? super S> predicate) {
}

@Override
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) {
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
if (this.predicate.test(data)) {
return data;
}
@@ -257,7 +279,8 @@ public ForkPipe(List<Pipeline<S, ?>> pipelines) {
}

@Override
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) {
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
for (Pipeline<S, ?> pipeline : this.pipelines) {
if (isParallel) {
pipeline.internalParallel(data, futures);
@@ -305,7 +328,8 @@ public MaxPipe(S max, Comparator<? super S> comparator) {
}

@Override
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) {
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
if (this.comparator.compare(data, this.max) <= 0) {
return data;
}
@@ -349,7 +373,8 @@ public MinPipe(S min, Comparator<? super S> comparator) {
}

@Override
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) {
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
if (this.comparator.compare(this.min, data) >= 0) {
return data;
}
@@ -386,7 +411,8 @@ public PeekPipe(Consumer<? super S> action) {
}

@Override
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) {
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
this.action.accept(data);
return data;
}
@@ -420,6 +446,11 @@ public S process(final S data, List<PipeFuture<?>> futures, String pipelineName,
*/
private String pipelineName;

/**
* Exception Handler to use for processing
*/
private ExceptionHandlerInterface exceptionHandler = null;

/**
* Constructor for creating a paralell processing on the pipeline.
*
@@ -432,11 +463,13 @@ public S process(final S data, List<PipeFuture<?>> futures, String pipelineName,
* @param pipelineName
* Name of this pipeline
*/
protected PipeRun(List<PipeInterface<?, ?>> pipes, S source, List<PipeFuture<?>> futures, String pipelineName) {
protected PipeRun(List<PipeInterface<?, ?>> pipes, S source, List<PipeFuture<?>> futures, String pipelineName,
ExceptionHandlerInterface exceptionHandler) {
this.pipes = pipes;
this.source = source;
this.futures = futures;
this.pipelineName = pipelineName;
this.exceptionHandler = exceptionHandler;
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -445,7 +478,27 @@ public void run() {
Object source = this.source;
Object target = null;
for (PipeInterface pipe : this.pipes) {
target = pipe.process(source, this.futures, this.pipelineName, true);
boolean processAgain = true;
int processAttempt = 0;

while (processAgain) {
try {
processAttempt++;
target = pipe.process(source, this.futures, this.pipelineName, true);
processAgain = false;
} catch (Throwable throwable) {
if (this.exceptionHandler != null) {
boolean response = this.exceptionHandler.process(throwable, pipe, source, this.futures,
this.pipelineName, true);
if (response && (processAttempt == 1)) {
processAgain = true;
} else {
processAgain = false;
target = null;
}
}
}
}
if (target == null) {
break;
}
@@ -504,7 +557,8 @@ public SwitchPipe(Predicate<? super S> predicate, Pipeline<S, ?> pipeline) {
}

@Override
public S process(S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) {
public S process(S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
if (this.predicate.test(data)) {
if (isParallel) {
this.pipeline.internalParallel(data, futures);
@@ -542,6 +596,8 @@ public S process(S data, List<PipeFuture<?>> futures, String pipelineName, boole
return chain;
}

private DefaultExceptionHandler defaultExceptionHandler = new DefaultExceptionHandler();

/**
* Executor to use for parallel processing
*/
@@ -559,6 +615,11 @@ public S process(S data, List<PipeFuture<?>> futures, String pipelineName, boole

private UUID pipelineId;

/**
* Exception Handler to use for processing
*/
private ExceptionHandlerInterface exceptionHandler = this.defaultExceptionHandler;

/**
* Copy constructor used to create a duplicate pipeline.
*
@@ -764,6 +825,13 @@ private Pipeline(String pipelineName) {
return this.connectInternalPipe(new ForkPipe<T>(pipelines));
}

/**
* @return the exceptionHandler
*/
protected synchronized ExceptionHandlerInterface getExceptionHandler() {
return this.exceptionHandler;
}

/**
*
* @return List of Futures for the next processing
@@ -803,7 +871,7 @@ public String getPipelineName() {
* associated pipeline completes.
*/
public void internalParallel(S source, List<PipeFuture<?>> futures) {
this.executorService.submit(new PipeRun(this.pipes, source, futures, this.pipelineName));
this.executorService.submit(new PipeRun(this.pipes, source, futures, this.pipelineName, this.exceptionHandler));
}

/**
@@ -869,8 +937,28 @@ public void internalParallel(S source, List<PipeFuture<?>> futures) {
public T process(S s) {
Object source = s;
Object target = null;
for (PipeInterface p : this.pipes) {
target = p.process(source, null, this.pipelineName, false);
for (PipeInterface pipe : this.pipes) {
boolean processAgain = true;
int processAttempt = 0;

while (processAgain) {
try {
processAttempt++;
target = pipe.process(source, null, this.pipelineName, false);
processAgain = false;
} catch (Throwable throwable) {
if (this.exceptionHandler != null) {
boolean response = this.exceptionHandler.process(throwable, pipe, source, null,
this.getPipelineName(), false);
if (response && (processAttempt == 1)) {
processAgain = true;
} else {
processAgain = false;
target = null;
}
}
}
}
if (target == null) {
break;
}
@@ -889,11 +977,22 @@ public T process(S s) {
public List<PipeFuture<?>> processParallel(S source) {
List<PipeFuture<?>> futures = this.getFutures();

this.executorService.submit(new PipeRun(this.pipes, source, futures, this.pipelineName));
this.executorService.submit(new PipeRun(this.pipes, source, futures, this.pipelineName, this.exceptionHandler));

return futures;
}

/**
* @param exceptionHandler
* the exceptionHandler to set
*/
protected synchronized void setExceptionHandler(ExceptionHandlerInterface exceptionHandler) {
this.exceptionHandler = exceptionHandler;
if (this.exceptionHandler == null) {
this.exceptionHandler = this.defaultExceptionHandler;
}
}

/**
* Allow a data object to flow to another pipeline if the predicate matches.
*
@@ -15,7 +15,7 @@
public class ObjectInputStreamPIpe<S, T> implements PipeInterface<S, T> {

@Override
public T process(S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) {
public T process(S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) throws Throwable {
// TODO Auto-generated method stub
return null;
}
@@ -1,16 +1,30 @@
package name.mymiller.extensions.utils.pipelines.pipes;

import java.io.ObjectOutputStream;
import java.util.List;

import name.mymiller.extensions.utils.pipelines.PipeFuture;
import name.mymiller.extensions.utils.pipelines.PipeInterface;

public class ObjectOutputStreamPipe<S, T> implements PipeInterface<S, T> {
public class ObjectOutputStreamPipe<S> implements PipeInterface<S, S> {

private ObjectOutputStream oos;

/**
* @param oos
*/
public ObjectOutputStreamPipe(ObjectOutputStream oos) {
super();
this.oos = oos;
}

@Override
public T process(S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) {
// TODO Auto-generated method stub
return null;
public S process(S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel) throws Throwable {

if (this.oos != null) {
this.oos.writeObject(data);
}
return data;
}

}

0 comments on commit a94cde2

Please sign in to comment.
You can’t perform that action at this time.