Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kroepke committed Apr 15, 2015
1 parent 6ac7f79 commit a9b8a91
Showing 1 changed file with 38 additions and 12 deletions.
Expand Up @@ -468,7 +468,8 @@ private T handleResponse(Request request, Response response) throws IOException,
} }


if (!responseContentType.is(mediaType.withoutParameters())) { if (!responseContentType.is(mediaType.withoutParameters())) {
LOG.warn("We said we'd accept {} but got {} back, let's see how that's going to work out...", mediaType, responseContentType); LOG.error("Accept header was {} but response is {}. Failing.", mediaType, responseContentType);
throw new APIException(request, response);
} }
if (responseClass.equals(String.class)) { if (responseClass.equals(String.class)) {
return responseClass.cast(response.getResponseBody("UTF-8")); return responseClass.cast(response.getResponseBody("UTF-8"));
Expand All @@ -486,7 +487,7 @@ private T handleResponse(Request request, Response response) throws IOException,
if (responseContentType.is(MediaType.JSON_UTF_8.withoutParameters())) { if (responseContentType.is(MediaType.JSON_UTF_8.withoutParameters())) {
result = deserializeJson(response, responseClass); result = deserializeJson(response, responseClass);
} else { } else {
LOG.error("Don't know how to deserialize objects with content in {}, expected {}, failing.", responseContentType, mediaType); LOG.error("Cannot deserialize content type {} expected {}, failing.", responseContentType, mediaType);
throw new APIException(request, response); throw new APIException(request, response);
} }


Expand Down Expand Up @@ -523,22 +524,47 @@ private void ensureAuthentication() {
} }
} }


private class RequestAndFuture { private class RequestContext {
private final Node node;
private final Request request; private final Request request;
private final ListenableFuture<Response> listenableFuture; private final ListenableFuture<Response> listenableFuture;
public RequestAndFuture(Request request, ListenableFuture<Response> listenableFuture) {
public RequestContext(Node node, Request request, ListenableFuture<Response> listenableFuture) {
this.node = node;
this.request = request; this.request = request;
this.listenableFuture = listenableFuture; this.listenableFuture = listenableFuture;
} }

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

RequestContext that = (RequestContext) o;

if (!node.equals(that.node)) return false;
if (!request.equals(that.request)) return false;
return listenableFuture.equals(that.listenableFuture);

}

@Override
public int hashCode() {
int result = node.hashCode();
result = 31 * result + request.hashCode();
result = 31 * result + listenableFuture.hashCode();
return result;
}
} }

@Override @Override
public Map<Node, T> executeOnAll() throws APIException { public Map<Node, T> executeOnAll() throws APIException {
HashMap<Node, T> results = Maps.newHashMap(); HashMap<Node, T> results = Maps.newHashMap();
if (node == null && nodes == null) { if (node == null && nodes == null) {
nodes = serverNodes.all(); nodes = serverNodes.all();
} }


final Map<Node, RequestAndFuture> requests = Maps.newHashMap(); final Set<RequestContext> requestContexts = Sets.newHashSetWithExpectedSize(nodes.size());
final Collection<Response> responses = Lists.newArrayList(); final Collection<Response> responses = Lists.newArrayList();


ensureAuthentication(); ensureAuthentication();
Expand All @@ -547,11 +573,11 @@ public Map<Node, T> executeOnAll() throws APIException {
try { try {
final AsyncHttpClient.BoundRequestBuilder requestBuilder = requestBuilderForUrl(url); final AsyncHttpClient.BoundRequestBuilder requestBuilder = requestBuilderForUrl(url);
requestBuilder.addHeader(HttpHeaders.ACCEPT, mediaType.toString()); requestBuilder.addHeader(HttpHeaders.ACCEPT, mediaType.toString());
// we need it for the APIException


// we need to build the request in case we have to throw an APIException in handleResponse()
final Request request = requestBuilder.build(); final Request request = requestBuilder.build();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("API Request: {}", request.toString()); LOG.debug("API Request: {}", request);
} }
final ListenableFuture<Response> future = requestBuilder.execute(new AsyncCompletionHandler<Response>() { final ListenableFuture<Response> future = requestBuilder.execute(new AsyncCompletionHandler<Response>() {
@Override @Override
Expand All @@ -560,7 +586,7 @@ public Response onCompleted(Response response) throws Exception {
return response; return response;
} }
}); });
requests.put(currentNode, new RequestAndFuture(request, future)); requestContexts.add(new RequestContext(currentNode, request, future));
} catch (IOException e) { } catch (IOException e) {
LOG.error("Cannot execute request", e); LOG.error("Cannot execute request", e);
currentNode.markFailure(); currentNode.markFailure();
Expand All @@ -570,13 +596,13 @@ public Response onCompleted(Response response) throws Exception {
// Set 200 OK as standard if not defined. // Set 200 OK as standard if not defined.
ensureExpectedResponseCodes(); ensureExpectedResponseCodes();


for (Map.Entry<Node, RequestAndFuture> nodeAndRequest : requests.entrySet()) { for (RequestContext context : requestContexts) {
final Node node = nodeAndRequest.getKey(); final Node node = context.node;
final ListenableFuture<Response> future = nodeAndRequest.getValue().listenableFuture; final ListenableFuture<Response> future = context.listenableFuture;
try { try {
final Response response = future.get(timeoutValue, timeoutUnit); final Response response = future.get(timeoutValue, timeoutUnit);
node.touch(); node.touch();
final T result = handleResponse(nodeAndRequest.getValue().request, response); final T result = handleResponse(context.request, response);
results.put(node, result); results.put(node, result);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error("API call Interrupted", e); LOG.error("API call Interrupted", e);
Expand Down

0 comments on commit a9b8a91

Please sign in to comment.