Skip to content
This repository has been archived by the owner on Sep 15, 2021. It is now read-only.

Commit

Permalink
Merge pull request #72 from deflaux/master
Browse files Browse the repository at this point in the history
Update type names.
  • Loading branch information
deflaux committed Nov 25, 2015
2 parents eaa7844 + de777ef commit 70d6414
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 64 deletions.
72 changes: 36 additions & 36 deletions src/main/java/com/google/cloud/genomics/utils/Paginator.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@
*}
*</pre>
*
* @param <A> The API type.
* @param <B> The request type.
* @param <C> The {@link GenomicsRequest} subtype.
* @param <D> The response type.
* @param <E> The type of object being streamed back to the user.
* @param <ApiT> The API type.
* @param <RequestT> The request type.
* @param <RequestSubT> The {@link GenomicsRequest} subtype.
* @param <ResponseT> The response type.
* @param <ItemT> The type of object being streamed back to the user.
*/
public abstract class Paginator<A, B, C extends GenomicsRequest<D>, D, E> {
public abstract class Paginator<ApiT, RequestT, RequestSubT extends GenomicsRequest<ResponseT>, ResponseT, ItemT> {

/**
* A callback object for
Expand All @@ -115,10 +115,10 @@ public abstract class Paginator<A, B, C extends GenomicsRequest<D>, D, E> {
* the value returned from the
* {@link #search(Object, GenomicsRequestInitializer, Callback, RetryPolicy)} method.
*
* @param <E> The type of objects returned from a search
* @param <F> The type of object to accumulate when consuming search results.
* @param <EntityT> The type of objects returned from a search
* @param <AccumulatedT> The type of object to accumulate when consuming search results.
*/
public interface Callback<E, F> {
public interface Callback<EntityT, AccumulatedT> {

/**
* Consume the search results and accumulate an object to return.
Expand All @@ -127,7 +127,7 @@ public interface Callback<E, F> {
* @return the accumulated summary value.
* @throws IOException
*/
F consumeResponses(Iterable<E> responses) throws IOException;
AccumulatedT consumeResponses(Iterable<EntityT> responses) throws IOException;
}

/**
Expand Down Expand Up @@ -303,10 +303,10 @@ private Jobs(Genomics genomics) {

private class Pair {

final C request;
final D response;
final RequestSubT request;
final ResponseT response;

Pair(C request, D response) {
Pair(RequestSubT request, ResponseT response) {
this.request = request;
this.response = response;
}
Expand Down Expand Up @@ -998,25 +998,25 @@ public Paginator(Genomics genomics) {
this.genomics = genomics;
}

abstract C createSearch(A api, B request, Optional<String> pageToken) throws IOException;
abstract RequestSubT createSearch(ApiT api, RequestT request, Optional<String> pageToken) throws IOException;

abstract A getApi(Genomics genomics);
abstract ApiT getApi(Genomics genomics);

abstract String getNextPageToken(D response);
abstract String getNextPageToken(ResponseT response);

abstract Iterable<E> getResponses(D response);
abstract Iterable<ItemT> getResponses(ResponseT response);

/**
* Search for objects.
*
* @param request The search request.
* @return the stream of search results.
*/
public final Iterable<E> search(final B request) {
public final Iterable<ItemT> search(final RequestT request) {
return search(request, DEFAULT_INITIALIZER, RetryPolicy.defaultPolicy());
}

public final <F> F search(B request, Callback<E, ? extends F> callback) throws IOException {
public final <AccumulatedT> AccumulatedT search(RequestT request, Callback<ItemT, ? extends AccumulatedT> callback) throws IOException {
return search(request, DEFAULT_INITIALIZER, callback, RetryPolicy.defaultPolicy());
}

Expand All @@ -1031,11 +1031,11 @@ public final <F> F search(B request, Callback<E, ? extends F> callback) throws I
* (usually due to SocketTimeoutExceptions)
* @return A lazy stream of search results.
*/
public final Iterable<E> search(
final B request,
final GenomicsRequestInitializer<? super C> initializer,
public final Iterable<ItemT> search(
final RequestT request,
final GenomicsRequestInitializer<? super RequestSubT> initializer,
final RetryPolicy retryPolicy) {
final A api = getApi(genomics);
final ApiT api = getApi(genomics);
return FluentIterable
.from(
new Iterable<Pair>() {
Expand All @@ -1046,11 +1046,11 @@ public final Iterable<E> search(
@Override protected Pair computeNext(Pair pair) {
return Optional.fromNullable(pair.request)
.transform(
new Function<C, Pair>() {
@Override public Pair apply(C search) {
new Function<RequestSubT, Pair>() {
@Override public Pair apply(RequestSubT search) {
try {
initializer.initialize(search);
D response = retryPolicy.execute(search);
ResponseT response = retryPolicy.execute(search);
Optional<String> pageToken =
Optional.fromNullable(getNextPageToken(response));
return new Pair(
Expand All @@ -1073,15 +1073,15 @@ public final Iterable<E> search(
})
.skip(1)
.transform(
new Function<Pair, D>() {
@Override public D apply(Pair pair) {
new Function<Pair, ResponseT>() {
@Override public ResponseT apply(Pair pair) {
return pair.response;
}
})
.transformAndConcat(
new Function<D, Iterable<E>>() {
@Override public Iterable<E> apply(D response) {
return Optional.fromNullable(getResponses(response)).or(Collections.<E>emptyList());
new Function<ResponseT, Iterable<ItemT>>() {
@Override public Iterable<ItemT> apply(ResponseT response) {
return Optional.fromNullable(getResponses(response)).or(Collections.<ItemT>emptyList());
}
});
}
Expand All @@ -1100,9 +1100,9 @@ public final Iterable<E> search(
* @throws IOException if an IOException occurred while consuming search results.
*/
public final <F> F search(
B request,
GenomicsRequestInitializer<? super C> initializer,
Callback<E, ? extends F> callback,
RequestT request,
GenomicsRequestInitializer<? super RequestSubT> initializer,
Callback<ItemT, ? extends F> callback,
RetryPolicy retryPolicy) throws IOException {
try {
return callback.consumeResponses(search(request, initializer, retryPolicy));
Expand All @@ -1121,11 +1121,11 @@ public final <F> F search(
* @param fields The fields to set.
* @return the stream of search results.
*/
public final Iterable<E> search(final B request, final String fields) {
public final Iterable<ItemT> search(final RequestT request, final String fields) {
return search(request, setFieldsInitializer(fields), RetryPolicy.defaultPolicy());
}

public final <F> F search(B request, final String fields, Callback<E, ? extends F> callback)
public final <F> F search(RequestT request, final String fields, Callback<ItemT, ? extends F> callback)
throws IOException {
return search(request, setFieldsInitializer(fields), callback, RetryPolicy.defaultPolicy());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,25 @@
*
* TODO: refactor this further to simplify the generic signature.
*
* @param <Request> Streaming request type.
* @param <Response> Streaming response type.
* @param <Item> Genomic data type returned by stream.
* @param <Stub> Blocking stub type.
* @param <RequestT> Streaming request type.
* @param <ResponseT> Streaming response type.
* @param <ItemT> Genomic data type returned by stream.
* @param <StubT> Blocking stub type.
*/
public abstract class GenomicsStreamIterator<Request, Response, Item, Stub extends io.grpc.stub.AbstractStub<Stub>>
implements Iterator<Response> {
public abstract class GenomicsStreamIterator<RequestT, ResponseT, ItemT, StubT extends io.grpc.stub.AbstractStub<StubT>>
implements Iterator<ResponseT> {
private static final Logger LOG = Logger.getLogger(GenomicsStreamIterator.class.getName());

protected final ManagedChannel genomicsChannel;
protected final Predicate<Item> shardPredicate;
protected final Stub stub;
protected final Request originalRequest;
protected final Predicate<ItemT> shardPredicate;
protected final StubT stub;
protected final RequestT originalRequest;

protected ExponentialBackOff backoff;

// Stateful members used to facilitate complex retry behavior for gRPC streams.
private Iterator<Response> delegate;
private Item lastSuccessfulDataItem;
private Iterator<ResponseT> delegate;
private ItemT lastSuccessfulDataItem;
private String idSentinel;

/**
Expand All @@ -67,8 +67,8 @@ public abstract class GenomicsStreamIterator<Request, Response, Item, Stub exten
* shard boundary and/or limit to SNPs only) or null for no filtering.
*/

protected GenomicsStreamIterator(ManagedChannel channel, Request request, String fields,
Predicate<Item> shardPredicate) {
protected GenomicsStreamIterator(ManagedChannel channel, RequestT request, String fields,
Predicate<ItemT> shardPredicate) {
this.originalRequest = request;
this.shardPredicate = shardPredicate;
this.genomicsChannel = channel;
Expand All @@ -84,23 +84,23 @@ protected GenomicsStreamIterator(ManagedChannel channel, Request request, String
idSentinel = null;
}

abstract Stub createStub(ManagedChannel genomicsChannel);
abstract StubT createStub(ManagedChannel genomicsChannel);

abstract Iterator<Response> createIteratorFromStub(Request request);
abstract Iterator<ResponseT> createIteratorFromStub(RequestT request);

abstract long getRequestStart(Request streamRequest);
abstract long getRequestStart(RequestT streamRequest);

abstract long getDataItemStart(Item dataItem);
abstract long getDataItemStart(ItemT dataItem);

abstract String getDataItemId(Item dataItem);
abstract String getDataItemId(ItemT dataItem);

abstract Request getRevisedRequest(long updatedStart);
abstract RequestT getRevisedRequest(long updatedStart);

abstract List<Item> getDataList(Response response);
abstract List<ItemT> getDataList(ResponseT response);

abstract Response buildResponse(Response response, Iterable<Item> dataList);
abstract ResponseT buildResponse(ResponseT response, Iterable<ItemT> dataList);

private Iterator<Response> createIterator(Request request) {
private Iterator<ResponseT> createIterator(RequestT request) {
while (true) {
try {
return createIteratorFromStub(request);
Expand Down Expand Up @@ -188,23 +188,23 @@ private void setStreamStateForRetry() {
/**
* @see java.util.Iterator#next()
*/
public Response next() {
Response response = delegate.next();
public ResponseT next() {
ResponseT response = delegate.next();
// TODO: Its more clean conceptually to do the same thing for all responses, but this could be a
// place where we're wasting a lot of time rebuilding response objects when nothing has actually
// changed.
return buildResponse(response, enforceShardPredicate(removeRepeatedData(getDataList(response))));
}

private List<Item> removeRepeatedData(List<Item> dataList) {
List<Item> filteredDataList = null;
private List<ItemT> removeRepeatedData(List<ItemT> dataList) {
List<ItemT> filteredDataList = null;
if (null == idSentinel) {
filteredDataList = dataList;
} else {
// Filter out previously returned data items.
filteredDataList = Lists.newArrayList();
boolean sentinelFound = false;
for (Item dataItem : dataList) {
for (ItemT dataItem : dataList) {
if (sentinelFound) {
filteredDataList.add(dataItem);
} else {
Expand All @@ -224,7 +224,7 @@ private List<Item> removeRepeatedData(List<Item> dataList) {
return filteredDataList;
}

private Iterable<Item> enforceShardPredicate(Iterable<Item> dataList) {
private Iterable<ItemT> enforceShardPredicate(Iterable<ItemT> dataList) {
if (null == shardPredicate) {
return dataList;
}
Expand Down

0 comments on commit 70d6414

Please sign in to comment.