Skip to content

Commit

Permalink
add some error handling for write requests
Browse files Browse the repository at this point in the history
This commit introduces a couple exception classes - WriteException and
ClientException. A WriteException is thrown when the server returns a non-200
status code. A ClientException is thrown when the http request observable emits
an error. This would indicate a problem with the client itself.
  • Loading branch information
John Sanda committed Jul 1, 2015
1 parent 2173c0f commit db7083b
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 121 deletions.
156 changes: 47 additions & 109 deletions clients/rest-client/src/main/java/org/hawkular/metrics/rest/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.hawkular.metrics.rest;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,28 +35,22 @@
import org.hawkular.metrics.rest.model.DataPoints;
import org.hawkular.metrics.rest.model.GaugeDataPoint;
import rx.Observable;
import rx.Observer;
import rx.subjects.PublishSubject;

public class Client {

private static final String BASE_PATH = "/hawkular/metrics/";

private static final String TENANT_HEADER = "Hawkular-Tenant";

private Vertx vertx;

private HttpClient httpClient;

private String host;

private int port;

private String baseURI;

private ObjectMapper mapper;

public Client(String host, int port) {
// this.host = host;
// this.port = port;
this.baseURI = baseURI;
this.vertx = Vertx.vertx();
HttpClientOptions options = new HttpClientOptions().setDefaultHost(host).setDefaultPort(port);
httpClient = vertx.createHttpClient(options);
Expand All @@ -75,49 +68,75 @@ public void shutdown() throws InterruptedException {
latch.await(5, TimeUnit.SECONDS);
}

public Observable<HttpClientResponse> addGaugeData(String tenant, List<DataPoints<GaugeDataPoint>> gauges) {
public Observable<Void> addGaugeData(String tenant, List<DataPoints<GaugeDataPoint>> gauges) {
try {
String json = mapper.writeValueAsString(gauges);
PublishSubject<HttpClientResponse> subject = PublishSubject.create();
HttpClientRequest request = httpClient.post(BASE_PATH + "gauges/data")
.putHeader("Content-Type", "application/json")
.putHeader("Hawkular-Tenant", tenant);
request.toObservable().subscribe(subject::onNext, subject::onError, subject::onCompleted);
.putHeader(TENANT_HEADER, tenant);
WriteObserver writeObserver = new WriteObserver();
request.toObservable().subscribe(writeObserver);
request.end(json);
return subject;

return writeObserver.getObservable();
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to parse " + gauges, e);
throw new ClientException("Failed to parse " + gauges, e);
}
}

public Observable<HttpClientResponse> addGaugeData(String tenant, String gauge, List<GaugeDataPoint> dataPoints) {
public Observable<Void> addGaugeData(String tenant, String gauge, List<GaugeDataPoint> dataPoints) {
try {
PublishSubject<HttpClientResponse> subject = PublishSubject.create();
String json = mapper.writeValueAsString(dataPoints);
// HttpClientRequest request = httpClient.post(baseURI + "/gauges/" + gauge + "/data")
HttpClientRequest request = httpClient.post(BASE_PATH + "gauges/" + gauge + "/data")
.putHeader("Content-Type", "application/json")
.putHeader("Hawkular-Tenant", tenant);
// .write(json.toString());
.putHeader(TENANT_HEADER, tenant);
// Observable<HttpClientResponse> responseObservable = request.toObservable();
// responseObservable.subscribe(
// response -> System.out.println("status: {code: " + response.statusCode() + ", message: " +
// response.statusMessage() + "}"),
// Throwable::printStackTrace
// );
request.toObservable().subscribe(subject::onNext, subject::onError, subject::onCompleted);
WriteObserver writeObserver = new WriteObserver();
request.toObservable().subscribe(writeObserver);
request.end(json);
return subject;

return writeObserver.getObservable();
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to parse data points", e);
throw new ClientException("Failed to parse data points", e);
}
}

private class WriteObserver implements Observer<HttpClientResponse> {

private PublishSubject<Void> subject = PublishSubject.create();

public Observable<Void> getObservable() {
return subject;
}

@Override
public void onCompleted() {
subject.onCompleted();
}

@Override
public void onError(Throwable e) {
subject.onError(new ClientException("There was an unexpecte error while adding data", e));
}

@Override
public void onNext(HttpClientResponse response) {
if (response.statusCode() != 200) {
subject.onError(new WriteException(response.statusMessage(), response.statusCode()));
}
}
}

public Observable<GaugeDataPoint> findGaugeData(String tenantId, String gauge, long start, long end) {
PublishSubject<GaugeDataPoint> subject = PublishSubject.create();
HttpClientRequest request = httpClient.get(BASE_PATH + "gauges/" + gauge + "/data?start=" + start + "&end=" +
end)
.putHeader("Hawkular-Tenant", tenantId)
.putHeader(TENANT_HEADER, tenantId)
.putHeader("Content-Type", "application/json");
Observable<GaugeDataPoint> observable = request.toObservable()
.flatMap(HttpClientResponse::toObservable)
Expand All @@ -129,92 +148,11 @@ public Observable<GaugeDataPoint> findGaugeData(String tenantId, String gauge, l

private List<GaugeDataPoint> getGaugeDataPoints(Buffer buffer) throws RuntimeException {
try {
return mapper.readValue(buffer.toString("UTF-8"), new TypeReference<List<GaugeDataPoint>>() {});
return mapper.readValue(buffer.toString("UTF-8"), new TypeReference<List<GaugeDataPoint>>() {
});
} catch (IOException e) {
throw new RuntimeException("Failed to parse response", e);
}
}

// public static void main(String[] args) throws InterruptedException {
// Client client = new Client("localhost", 8080);
//// long end = System.currentTimeMillis();
//// long start = end - TimeUnit.MILLISECONDS.convert(8, TimeUnit.HOURS);
//// client.findGaugeData("Vert.x", "Test1", start, end).subscribe(
//// System.out::println,
//// t -> {
//// t.printStackTrace();
//// try {
//// client.shutdown();
//// } catch (InterruptedException e) {
//// e.printStackTrace();
//// }
//// },
//// () -> {
//// try {
//// client.shutdown();
//// } catch (InterruptedException e) {
//// e.printStackTrace();
//// }
//// }
//// );
//
//
//// List<GaugeDataPoint> dataPoints = asList(
//// new GaugeDataPoint(System.currentTimeMillis() - 500, 10.1),
//// new GaugeDataPoint(System.currentTimeMillis() - 400, 11.1112),
//// new GaugeDataPoint(System.currentTimeMillis() - 300, 13.4783)
//// );
//// client.addGaugeData("Vert.x", "TEST3", dataPoints).subscribe(
//// response -> System.out.println("status: {code: " + response.statusCode() + ", message: " +
//// response.statusMessage() + "}"),
//// t -> {
//// t.printStackTrace();
//// try {
//// client.shutdown();
//// } catch (InterruptedException e) {
//// e.printStackTrace();
//// }
//// },
//// () -> {
//// try {
//// client.shutdown();
//// } catch (InterruptedException e) {
//// e.printStackTrace();
//// }
//// }
//// );
//
// client.addGaugeData("Multi-Metric-Test", createDataPoints()).subscribe(
// response -> System.out.println("status: {code: " + response.statusCode() + ", message: " +
// response.statusMessage() + "}"),
// t -> {
// t.printStackTrace();
// try {
// client.shutdown();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// },
// () -> {
// try {
// client.shutdown();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// );
// }

private static List<DataPoints<GaugeDataPoint>> createDataPoints() {
List<DataPoints<GaugeDataPoint>> dataPointsList = new ArrayList<>();
for (int i = 0; i < 5; ++i) {
List<GaugeDataPoint> dataPoints = new ArrayList<>();
for (int j = 0; j < 3; ++j) {
dataPoints.add(new GaugeDataPoint(System.currentTimeMillis() - (j * 500), j * Math.PI));
}
dataPointsList.add(new DataPoints<>("G" + i, dataPoints));
throw new ClientException("Failed to parse response", e);
}
return dataPointsList;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.rest;

/**
* Indicates that something went wrong with the client itself while processing a request and/or response.
*/
public class ClientException extends RuntimeException {

public ClientException(String message) {
super(message);
}

public ClientException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.rest;

/**
* Thrown if a write request fails on the server. The server will send back a status code other than 200 when this
* happens.
*/
public class WriteException extends Exception {

private int status;

public WriteException(String message, int status) {
super(message);
this.status = status;
}

/**
* The HTTP status code reported by the server
*/
public int getStatus() {
return status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.vertx.rxjava.core.http.HttpClientResponse;
import org.hawkular.metrics.rest.model.DataPoints;
import org.hawkular.metrics.rest.model.GaugeDataPoint;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -70,16 +69,12 @@ public void addDataPointsForOneGauge() throws Exception {
new GaugeDataPoint(startTime, 10.1)
);

TestSubscriber<HttpClientResponse> writeSubscriber = new TestSubscriber<>();
TestSubscriber<Void> writeSubscriber = new TestSubscriber<>();

client.addGaugeData(tenantId, gauge, dataPoints).subscribe(writeSubscriber);

writeSubscriber.awaitTerminalEvent();
writeSubscriber.assertNoErrors();
assertEquals(writeSubscriber.getOnNextEvents().size(), 1, "Expected to get back one http response");

HttpClientResponse response = writeSubscriber.getOnNextEvents().get(0);
assertEquals(200, response.statusCode(), "Expected a 200 status when data points are successfully stored");

TestSubscriber<GaugeDataPoint> readSubscriber = new TestSubscriber<>();
client.findGaugeData(tenantId, gauge, startTime, endTime).subscribe(readSubscriber);
Expand Down Expand Up @@ -108,16 +103,12 @@ public void addDataPointsForMultipleGauges() throws Exception {
))
);

TestSubscriber<HttpClientResponse> writeSubscriber = new TestSubscriber<>();
TestSubscriber<Void> writeSubscriber = new TestSubscriber<>();

client.addGaugeData(tenantId, gauges).subscribe(writeSubscriber);

writeSubscriber.awaitTerminalEvent();
writeSubscriber.assertNoErrors();
assertEquals(writeSubscriber.getOnNextEvents().size(), 1, "Expected to get back one http response");

HttpClientResponse response = writeSubscriber.getOnNextEvents().get(0);
assertEquals(200, response.statusCode(), "Expected a 200 status when data points are successfully stored");

TestSubscriber<GaugeDataPoint> readSubscriber = new TestSubscriber<>();
client.findGaugeData(tenantId, prefix + 0, startTime, endTime).subscribe(readSubscriber);
Expand All @@ -127,7 +118,7 @@ public void addDataPointsForMultipleGauges() throws Exception {
assertEquals(readSubscriber.getOnNextEvents(), gauges.get(0).getData(), "The data points do not match");

readSubscriber = new TestSubscriber<>();

client.findGaugeData(tenantId, prefix + 1, startTime, endTime).subscribe(readSubscriber);

readSubscriber.awaitTerminalEvent();
Expand Down

0 comments on commit db7083b

Please sign in to comment.