Skip to content
Permalink
Browse files

Fixed Warnings

  • Loading branch information...
Ghost-Programmer committed Apr 3, 2019
1 parent 9871b97 commit 0abe768c233bc79446cbc1ee2f520e768871c40a
@@ -33,18 +33,18 @@
* data. This can transform the data to a new type, or update data on the pipe.
*
* @author jmiller
* @param <T>
* @param <A>
* The type the Pipe will accept,
* @param <R>
* @param <B>
* The Type the Pipe will return.
*
*/
private class ActionPipe<S, T> implements PipeInterface<S, T> {
private class ActionPipe<A, B> implements PipeInterface<A, B> {

/**
* Action Functional interface
*/
private Function<? super S, ? extends T> action;
private Function<? super A, ? extends B> action;

/**
* Constructor to provide the Class it will act on, and the functional interface
@@ -54,7 +54,7 @@
* @param action
* Functional interface to act on.
*/
public ActionPipe(Function<? super S, ? extends T> action) {
public ActionPipe(Function<? super A, ? extends B> action) {
this.action = action;
}

@@ -66,7 +66,7 @@ public ActionPipe(Function<? super S, ? extends T> action) {
* Object)
*/
@Override
public T process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
public B process(final A data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
return this.action.apply(data);
}
@@ -96,10 +96,10 @@ public boolean process(Throwable throwable, PipeInterface<?, ?> pipe, Object dat
*
* @author jmiller
*
* @param <S>
* @param <A>
* Source Type
*/
private class DistinctByPipe<S> implements PipeInterface<S, S> {
private class DistinctByPipe<A> implements PipeInterface<A, A> {

/**
* List containing data blocks previously processed
@@ -108,27 +108,27 @@ public boolean process(Throwable throwable, PipeInterface<?, ?> pipe, Object dat

private boolean onDistinct = true;

private Function<? super S, Object> keyExtractor;
private Function<? super A, Object> keyExtractor;

/**
* Default contructor setting up the distinctList
*/
public DistinctByPipe(Function<? super S, Object> keyExtractor) {
public DistinctByPipe(Function<? super A, Object> keyExtractor) {
this.seen = new ConcurrentHashMap<>();
this.keyExtractor = keyExtractor;
}

/**
* Default contructor setting up the distinctList
*/
public DistinctByPipe(Function<? super S, Object> keyExtractor, boolean onDistinct) {
public DistinctByPipe(Function<? super A, Object> keyExtractor, boolean onDistinct) {
this.seen = new ConcurrentHashMap<>();
this.keyExtractor = keyExtractor;
this.onDistinct = onDistinct;
}

@Override
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
public A process(final A data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
if (this.seen.putIfAbsent(this.keyExtractor.apply(data), Boolean.TRUE) == null) {
if (this.onDistinct) {
@@ -151,15 +151,15 @@ public S process(final S data, List<PipeFuture<?>> futures, String pipelineName,
*
* @author jmiller
*
* @param <S>
* @param <A>
* Source Type
*/
private class DistinctPipe<S> implements PipeInterface<S, S> {
private class DistinctPipe<A> implements PipeInterface<A, A> {

/**
* List containing data blocks previously processed
*/
private Set<S> distinctList;
private Set<A> distinctList;

private boolean onDistinct = true;

@@ -179,7 +179,7 @@ public DistinctPipe(boolean onDistinct) {
}

@Override
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
public A process(final A data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
if (!this.distinctList.contains(data)) {
this.distinctList.add(data);
@@ -204,28 +204,28 @@ public S process(final S data, List<PipeFuture<?>> futures, String pipelineName,
*
* @author jmiller
*
* @param <S>
* @param <A>
* Source Type
*/
private class FilterPipe<S> implements PipeInterface<S, S> {
private class FilterPipe<A> implements PipeInterface<A, A> {

/**
* Predicate functional interface proving the filtering algorithm.
*/
private Predicate<? super S> predicate;
private Predicate<? super A> predicate;

/**
* Constructor to setup the FilterPipe
*
* @param predicate
* Predicate to be used for filtering data.
*/
public FilterPipe(Predicate<? super S> predicate) {
public FilterPipe(Predicate<? super A> predicate) {
this.predicate = predicate;
}

@Override
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
public A process(final A data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
if (this.predicate.test(data)) {
return data;
@@ -242,23 +242,23 @@ public S process(final S data, List<PipeFuture<?>> futures, String pipelineName,
*
* @author jmiller
*
* @param <S>
* @param <A>
* Source Type
*/
private class ForkPipe<S> implements PipeInterface<S, S> {
private class ForkPipe<A> implements PipeInterface<A, A> {

/**
* List of Pipelines data is to be forked into.
*/
private List<Pipeline<S, ?>> pipelines = null;
private List<Pipeline<A, ?>> pipelines = null;

/**
* Constructor for accepting a list of pipelines to fork data into.
*
* @param pipelines
* List of Pipelines.
*/
public ForkPipe(List<Pipeline<S, ?>> pipelines) {
public ForkPipe(List<Pipeline<A, ?>> pipelines) {
this.pipelines = pipelines;
}

@@ -279,9 +279,9 @@ public ForkPipe(List<Pipeline<S, ?>> pipelines) {
}

@Override
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
public A process(final A data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
for (Pipeline<S, ?> pipeline : this.pipelines) {
for (Pipeline<A, ?> pipeline : this.pipelines) {
if (isParallel) {
pipeline.internalParallel(data, futures);
} else {
@@ -299,20 +299,20 @@ public S process(final S data, List<PipeFuture<?>> futures, String pipelineName,
*
* @author jmiller
*
* @param <S>
* @param <A>
* Source Type
*/
private class MaxPipe<S> implements PipeInterface<S, S> {
private class MaxPipe<A> implements PipeInterface<A, A> {

/**
* Max value to allow through the pipe
*/
private S max;
private A max;

/**
* Comparator to use to compare the data against the max value.
*/
private Comparator<? super S> comparator;
private Comparator<? super A> comparator;

/**
* Constructor accepting the max value to allow and the comparator.
@@ -322,13 +322,13 @@ public S process(final S data, List<PipeFuture<?>> futures, String pipelineName,
* @param comparator
* Comparator to compare type S.
*/
public MaxPipe(S max, Comparator<? super S> comparator) {
public MaxPipe(A max, Comparator<? super A> comparator) {
this.max = max;
this.comparator = comparator;
}

@Override
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
public A process(final A data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
if (this.comparator.compare(data, this.max) <= 0) {
return data;
@@ -344,20 +344,20 @@ public S process(final S data, List<PipeFuture<?>> futures, String pipelineName,
*
* @author jmiller
*
* @param <S>
* @param <A>
* Source Type
*/
private class MinPipe<S> implements PipeInterface<S, S> {
private class MinPipe<A> implements PipeInterface<A, A> {

/**
* Min value to allow thorugh the pipe.
*/
private S min;
private A min;

/**
* Comparator to use to compare the data against the min value.
*/
private Comparator<? super S> comparator;
private Comparator<? super A> comparator;

/**
* Constructor accepting the min value to allow and the comparator.
@@ -367,13 +367,13 @@ public S process(final S data, List<PipeFuture<?>> futures, String pipelineName,
* @param comparator
* Comparator to compare type S.
*/
public MinPipe(S min, Comparator<? super S> comparator) {
public MinPipe(A min, Comparator<? super A> comparator) {
this.min = min;
this.comparator = comparator;
}

@Override
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
public A process(final A data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
if (this.comparator.compare(this.min, data) >= 0) {
return data;
@@ -390,28 +390,28 @@ public S process(final S data, List<PipeFuture<?>> futures, String pipelineName,
*
* @author jmiller
*
* @param <S>
* @param <A>
* Source TYpe
*/
private class PeekPipe<S> implements PipeInterface<S, S> {
private class PeekPipe<A> implements PipeInterface<A, A> {

/**
* Consumer interface to allow 'peeking' at the data.
*/
private Consumer<? super S> action = null;
private Consumer<? super A> action = null;

/**
* Constructor for creating a peek at the data.
*
* @param action
* Consumer interface to peek at the data.
*/
public PeekPipe(Consumer<? super S> action) {
public PeekPipe(Consumer<? super A> action) {
this.action = action;
}

@Override
public S process(final S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
public A process(final A data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
this.action.accept(data);
return data;
@@ -519,28 +519,28 @@ public void run() {
*
* @author jmiller
*
* @param <S>
* @param <A>
* Source Type
*/
private class SwitchPipe<S> implements PipeInterface<S, S> {
private class SwitchPipe<A> implements PipeInterface<A, A> {

/**
* Predicate functional interface proving the filtering algorithm.
*/
private Predicate<? super S> predicate;
private Predicate<? super A> predicate;

/**
* Alternate Pipeline for data to flow.
*/
private Pipeline<S, ?> pipeline;
private Pipeline<A, ?> pipeline;

/**
* Constructor to setup the FilterPipe
*
* @param predicate
* Predicate to be used for filtering data.
*/
public SwitchPipe(Predicate<? super S> predicate, Pipeline<S, ?> pipeline) {
public SwitchPipe(Predicate<? super A> predicate, Pipeline<A, ?> pipeline) {
this.predicate = predicate;
this.pipeline = pipeline;
}
@@ -550,14 +550,13 @@ public SwitchPipe(Predicate<? super S> predicate, Pipeline<S, ?> pipeline) {
*
* @see name.mymiller.extensions.utils.pipelines.PipeInterface#getFutures()
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public List<PipeFuture<?>> getFutures() {
return this.pipeline.getFutures();
}

@Override
public S process(S data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
public A process(A data, List<PipeFuture<?>> futures, String pipelineName, boolean isParallel)
throws Throwable {
if (this.predicate.test(data)) {
if (isParallel) {
@@ -6,7 +6,7 @@
* @author jmiller Class used to remote enable/disable the predicate for use in
* pipeline switches.
*/
public class RemoteSwitch implements Predicate {
public class RemoteSwitch<A> implements Predicate<A> {

private boolean enabled = false;

@@ -44,7 +44,7 @@ public synchronized void setEnabled(boolean enabled) {
}

@Override
public boolean test(Object t) {
public boolean test(A t) {
return this.isEnabled();
}

0 comments on commit 0abe768

Please sign in to comment.
You can’t perform that action at this time.