diff --git a/src/main/java/com/google/cloud/genomics/utils/Paginator.java b/src/main/java/com/google/cloud/genomics/utils/Paginator.java index 14f8907..43c34bb 100644 --- a/src/main/java/com/google/cloud/genomics/utils/Paginator.java +++ b/src/main/java/com/google/cloud/genomics/utils/Paginator.java @@ -100,13 +100,13 @@ *} * * - * @param The API type. - * @param The request type. - * @param The {@link GenomicsRequest} subtype. - * @param The response type. - * @param The type of object being streamed back to the user. + * @param The API type. + * @param The request type. + * @param The {@link GenomicsRequest} subtype. + * @param The response type. + * @param The type of object being streamed back to the user. */ -public abstract class Paginator, D, E> { +public abstract class Paginator, ResponseT, ItemT> { /** * A callback object for @@ -115,10 +115,10 @@ public abstract class Paginator, D, E> { * the value returned from the * {@link #search(Object, GenomicsRequestInitializer, Callback, RetryPolicy)} method. * - * @param The type of objects returned from a search - * @param The type of object to accumulate when consuming search results. + * @param The type of objects returned from a search + * @param The type of object to accumulate when consuming search results. */ - public interface Callback { + public interface Callback { /** * Consume the search results and accumulate an object to return. @@ -127,7 +127,7 @@ public interface Callback { * @return the accumulated summary value. * @throws IOException */ - F consumeResponses(Iterable responses) throws IOException; + AccumulatedT consumeResponses(Iterable responses) throws IOException; } /** @@ -303,10 +303,10 @@ private Jobs(Genomics genomics) { private class Pair { - final C request; - final D response; + final RequestSubT request; + final ResponseT response; - Pair(C request, D response) { + Pair(RequestSubT request, ResponseT response) { this.request = request; this.response = response; } @@ -998,13 +998,13 @@ public Paginator(Genomics genomics) { this.genomics = genomics; } - abstract C createSearch(A api, B request, Optional pageToken) throws IOException; + abstract RequestSubT createSearch(ApiT api, RequestT request, Optional pageToken) throws IOException; - abstract A getApi(Genomics genomics); + abstract ApiT getApi(Genomics genomics); - abstract String getNextPageToken(D response); + abstract String getNextPageToken(ResponseT response); - abstract Iterable getResponses(D response); + abstract Iterable getResponses(ResponseT response); /** * Search for objects. @@ -1012,11 +1012,11 @@ public Paginator(Genomics genomics) { * @param request The search request. * @return the stream of search results. */ - public final Iterable search(final B request) { + public final Iterable search(final RequestT request) { return search(request, DEFAULT_INITIALIZER, RetryPolicy.defaultPolicy()); } - public final F search(B request, Callback callback) throws IOException { + public final AccumulatedT search(RequestT request, Callback callback) throws IOException { return search(request, DEFAULT_INITIALIZER, callback, RetryPolicy.defaultPolicy()); } @@ -1031,11 +1031,11 @@ public final F search(B request, Callback callback) throws I * (usually due to SocketTimeoutExceptions) * @return A lazy stream of search results. */ - public final Iterable search( - final B request, - final GenomicsRequestInitializer initializer, + public final Iterable search( + final RequestT request, + final GenomicsRequestInitializer initializer, final RetryPolicy retryPolicy) { - final A api = getApi(genomics); + final ApiT api = getApi(genomics); return FluentIterable .from( new Iterable() { @@ -1046,11 +1046,11 @@ public final Iterable search( @Override protected Pair computeNext(Pair pair) { return Optional.fromNullable(pair.request) .transform( - new Function() { - @Override public Pair apply(C search) { + new Function() { + @Override public Pair apply(RequestSubT search) { try { initializer.initialize(search); - D response = retryPolicy.execute(search); + ResponseT response = retryPolicy.execute(search); Optional pageToken = Optional.fromNullable(getNextPageToken(response)); return new Pair( @@ -1073,15 +1073,15 @@ public final Iterable search( }) .skip(1) .transform( - new Function() { - @Override public D apply(Pair pair) { + new Function() { + @Override public ResponseT apply(Pair pair) { return pair.response; } }) .transformAndConcat( - new Function>() { - @Override public Iterable apply(D response) { - return Optional.fromNullable(getResponses(response)).or(Collections.emptyList()); + new Function>() { + @Override public Iterable apply(ResponseT response) { + return Optional.fromNullable(getResponses(response)).or(Collections.emptyList()); } }); } @@ -1100,9 +1100,9 @@ public final Iterable search( * @throws IOException if an IOException occurred while consuming search results. */ public final F search( - B request, - GenomicsRequestInitializer initializer, - Callback callback, + RequestT request, + GenomicsRequestInitializer initializer, + Callback callback, RetryPolicy retryPolicy) throws IOException { try { return callback.consumeResponses(search(request, initializer, retryPolicy)); @@ -1121,11 +1121,11 @@ public final F search( * @param fields The fields to set. * @return the stream of search results. */ - public final Iterable search(final B request, final String fields) { + public final Iterable search(final RequestT request, final String fields) { return search(request, setFieldsInitializer(fields), RetryPolicy.defaultPolicy()); } - public final F search(B request, final String fields, Callback callback) + public final F search(RequestT request, final String fields, Callback callback) throws IOException { return search(request, setFieldsInitializer(fields), callback, RetryPolicy.defaultPolicy()); } diff --git a/src/main/java/com/google/cloud/genomics/utils/grpc/GenomicsStreamIterator.java b/src/main/java/com/google/cloud/genomics/utils/grpc/GenomicsStreamIterator.java index ba5e49c..c998354 100644 --- a/src/main/java/com/google/cloud/genomics/utils/grpc/GenomicsStreamIterator.java +++ b/src/main/java/com/google/cloud/genomics/utils/grpc/GenomicsStreamIterator.java @@ -35,25 +35,25 @@ * * TODO: refactor this further to simplify the generic signature. * - * @param Streaming request type. - * @param Streaming response type. - * @param Genomic data type returned by stream. - * @param Blocking stub type. + * @param Streaming request type. + * @param Streaming response type. + * @param Genomic data type returned by stream. + * @param Blocking stub type. */ -public abstract class GenomicsStreamIterator> - implements Iterator { +public abstract class GenomicsStreamIterator> + implements Iterator { private static final Logger LOG = Logger.getLogger(GenomicsStreamIterator.class.getName()); protected final ManagedChannel genomicsChannel; - protected final Predicate shardPredicate; - protected final Stub stub; - protected final Request originalRequest; + protected final Predicate shardPredicate; + protected final StubT stub; + protected final RequestT originalRequest; protected ExponentialBackOff backoff; // Stateful members used to facilitate complex retry behavior for gRPC streams. - private Iterator delegate; - private Item lastSuccessfulDataItem; + private Iterator delegate; + private ItemT lastSuccessfulDataItem; private String idSentinel; /** @@ -67,8 +67,8 @@ public abstract class GenomicsStreamIterator shardPredicate) { + protected GenomicsStreamIterator(ManagedChannel channel, RequestT request, String fields, + Predicate shardPredicate) { this.originalRequest = request; this.shardPredicate = shardPredicate; this.genomicsChannel = channel; @@ -84,23 +84,23 @@ protected GenomicsStreamIterator(ManagedChannel channel, Request request, String idSentinel = null; } - abstract Stub createStub(ManagedChannel genomicsChannel); + abstract StubT createStub(ManagedChannel genomicsChannel); - abstract Iterator createIteratorFromStub(Request request); + abstract Iterator createIteratorFromStub(RequestT request); - abstract long getRequestStart(Request streamRequest); + abstract long getRequestStart(RequestT streamRequest); - abstract long getDataItemStart(Item dataItem); + abstract long getDataItemStart(ItemT dataItem); - abstract String getDataItemId(Item dataItem); + abstract String getDataItemId(ItemT dataItem); - abstract Request getRevisedRequest(long updatedStart); + abstract RequestT getRevisedRequest(long updatedStart); - abstract List getDataList(Response response); + abstract List getDataList(ResponseT response); - abstract Response buildResponse(Response response, Iterable dataList); + abstract ResponseT buildResponse(ResponseT response, Iterable dataList); - private Iterator createIterator(Request request) { + private Iterator createIterator(RequestT request) { while (true) { try { return createIteratorFromStub(request); @@ -188,23 +188,23 @@ private void setStreamStateForRetry() { /** * @see java.util.Iterator#next() */ - public Response next() { - Response response = delegate.next(); + public ResponseT next() { + ResponseT response = delegate.next(); // TODO: Its more clean conceptually to do the same thing for all responses, but this could be a // place where we're wasting a lot of time rebuilding response objects when nothing has actually // changed. return buildResponse(response, enforceShardPredicate(removeRepeatedData(getDataList(response)))); } - private List removeRepeatedData(List dataList) { - List filteredDataList = null; + private List removeRepeatedData(List dataList) { + List filteredDataList = null; if (null == idSentinel) { filteredDataList = dataList; } else { // Filter out previously returned data items. filteredDataList = Lists.newArrayList(); boolean sentinelFound = false; - for (Item dataItem : dataList) { + for (ItemT dataItem : dataList) { if (sentinelFound) { filteredDataList.add(dataItem); } else { @@ -224,7 +224,7 @@ private List removeRepeatedData(List dataList) { return filteredDataList; } - private Iterable enforceShardPredicate(Iterable dataList) { + private Iterable enforceShardPredicate(Iterable dataList) { if (null == shardPredicate) { return dataList; }