Skip to content

Commit

Permalink
refactor common code into helper methods
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed Jul 1, 2015
1 parent bb353a3 commit dc723db
Showing 1 changed file with 72 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import io.vertx.rxjava.core.http.HttpClientRequest;
import io.vertx.rxjava.core.http.HttpClientResponse;
import org.hawkular.metrics.rest.model.AvailabilityDataPoint;
import org.hawkular.metrics.rest.model.DataPoint;
import org.hawkular.metrics.rest.model.DataPoints;
import org.hawkular.metrics.rest.model.GaugeDataPoint;
import rx.Observable;
import rx.Observer;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

public class Client {
Expand Down Expand Up @@ -70,19 +72,7 @@ public void shutdown() throws InterruptedException {
}

public Observable<Void> addGaugeData(String tenant, List<DataPoints<GaugeDataPoint>> gauges) {
try {
String json = mapper.writeValueAsString(gauges);
HttpClientRequest request = httpClient.post(BASE_PATH + "gauges/data")
.putHeader("Content-Type", "application/json")
.putHeader(TENANT_HEADER, tenant);
WriteObserver writeObserver = new WriteObserver();
request.toObservable().subscribe(writeObserver);
request.end(json);

return writeObserver.getObservable();
} catch (JsonProcessingException e) {
throw new ClientException("Failed to parse " + gauges, e);
}
return addDataPoints(tenant, "gauges/data", gauges);
}

public Observable<Void> addGaugeData(String tenant, String gauge, List<GaugeDataPoint> dataPoints) {
Expand All @@ -91,12 +81,7 @@ public Observable<Void> addGaugeData(String tenant, String gauge, List<GaugeData
HttpClientRequest request = httpClient.post(BASE_PATH + "gauges/" + gauge + "/data")
.putHeader("Content-Type", "application/json")
.putHeader(TENANT_HEADER, tenant);
// Observable<HttpClientResponse> responseObservable = request.toObservable();
// responseObservable.subscribe(
// response -> System.out.println("status: {code: " + response.statusCode() + ", message: " +
// response.statusMessage() + "}"),
// Throwable::printStackTrace
// );

WriteObserver writeObserver = new WriteObserver();
request.toObservable().subscribe(writeObserver);
request.end(json);
Expand All @@ -108,103 +93,83 @@ public Observable<Void> addGaugeData(String tenant, String gauge, List<GaugeData
}

public Observable<Void> addAvailabilty(String tenant, List<DataPoints<AvailabilityDataPoint>> dataPoints) {
return addDataPoints(tenant, "availability/data", dataPoints);
}

// public Observable<Void> addCounterData(String tenant, List<DataPoints<CounterDataPoint>> dataPoints) {
// try {
// String json = mapper.writeValueAsString(dataPoints);
// HttpClientRequest request
// } catch (JsonProcessingException e) {
// throw new ClientException("Failed to parse data points", e);
// }
// }

private <T extends DataPoint> Observable<Void> addDataPoints(String tenant, String path,
List<DataPoints<T>> dataPoints) {
try {
PublishSubject<Void> subject = PublishSubject.create();
String json = mapper.writeValueAsString(dataPoints);
HttpClientRequest request = httpClient.post(BASE_PATH + "availability/data")
HttpClientRequest request = httpClient.post(BASE_PATH + path)
.putHeader("Content-Type", "application/json")
.putHeader(TENANT_HEADER, tenant);
WriteObserver writeObserver = new WriteObserver();
request.toObservable().subscribe(writeObserver);
request.toObservable().subscribe(
response -> {
if (response.statusCode() != 200) {
subject.onError(new WriteException(response.statusMessage(), response.statusCode()));
}
},
t -> subject.onError(new ClientException("There was an unexpected error while adding data", t)),
subject::onCompleted

);
request.end(json);

return writeObserver.getObservable();
return subject;
} catch (JsonProcessingException e) {
throw new ClientException("Failed to parse data points", e);
}
}

private class WriteObserver implements Observer<HttpClientResponse> {

private PublishSubject<Void> subject = PublishSubject.create();

public Observable<Void> getObservable() {
return subject;
}

@Override
public void onCompleted() {
subject.onCompleted();
}

@Override
public void onError(Throwable e) {
subject.onError(new ClientException("There was an unexpected error while adding data", e));
}

@Override
public void onNext(HttpClientResponse response) {
if (response.statusCode() != 200) {
subject.onError(new WriteException(response.statusMessage(), response.statusCode()));
}
}
public Observable<GaugeDataPoint> findGaugeData(String tenantId, String gauge, long start, long end) {
String path = "gauges/" + gauge + "/data?start=" + start + "&end=" + end;
return getDataPoints(tenantId, path, this::getGaugeDataPoints);
}

public Observable<GaugeDataPoint> findGaugeData(String tenantId, String gauge, long start, long end) {
PublishSubject<GaugeDataPoint> subject = PublishSubject.create();
HttpClientRequest request = httpClient.get(BASE_PATH + "gauges/" + gauge + "/data?start=" + start + "&end=" +
end)
.putHeader(TENANT_HEADER, tenantId)
.putHeader("Content-Type", "application/json");
Observable<GaugeDataPoint> observable = request.toObservable()
.flatMap(response -> {
if (response.statusCode() == 200 || response.statusCode() == 204) {
return response.toObservable();
}
throw new ReadException(response.statusMessage(), response.statusCode());
})
.flatMap(buffer -> Observable.from(getGaugeDataPoints(buffer)));
observable.subscribe(
subject::onNext,
t -> {
if (t instanceof ReadException) {
subject.onError(t);
} else {
subject.onError(new ClientException("There was an unexpected error while reading data", t));
}
},
subject::onCompleted
);
request.end();
return subject;
public Observable<AvailabilityDataPoint> findAvailabilityData(String tenantId, String metric, long start,
long end) {
String path = "availability/" + metric + "/data?start=" + start + "&end=" + end;
return getDataPoints(tenantId, path, this::getAvailabilityDataPoints);
}

public Observable<AvailabilityDataPoint> findAvailabilityData(String tenantId, String metric, long start, long
end) {
PublishSubject<AvailabilityDataPoint> subject = PublishSubject.create();
HttpClientRequest request = httpClient.get(BASE_PATH + "availability/" + metric + "/data?start=" + start +
"&end=" + end)
private <T extends DataPoint> Observable<T> getDataPoints(String tenantId, String path,
Func1<Buffer, List<T>> getDataPoints) {

PublishSubject<T> subject = PublishSubject.create();
HttpClientRequest request = httpClient.get(BASE_PATH + path)
.putHeader(TENANT_HEADER, tenantId)
.putHeader("Content-Type", "application/json");
Observable<AvailabilityDataPoint> observable = request.toObservable()
Observable<T> observable = request.toObservable()
.flatMap(response -> {
if (response.statusCode() == 200 || response.statusCode() == 204) {
return response.toObservable();
}
throw new ReadException(response.statusMessage(), response.statusCode());
})
.flatMap(buffer -> Observable.from(getAvailabilityDataPoints(buffer)));
.flatMap(buffer -> Observable.from(getDataPoints.call(buffer)));
observable.subscribe(
subject::onNext,
t -> {
if (t instanceof ReadException) {
subject.onError(t);
} else {
subject.onError(new ClientException("There was an unexpected error while reading data", t));
subject.onError(new ClientException("There was an unexpected error while adding data", t));
}
},
subject::onCompleted
);
request.end();

return subject;
}

Expand All @@ -224,5 +189,30 @@ private List<AvailabilityDataPoint> getAvailabilityDataPoints(Buffer buffer) thr
}
}

private class WriteObserver implements Observer<HttpClientResponse> {

private PublishSubject<Void> subject = PublishSubject.create();

public Observable<Void> getObservable() {
return subject;
}

@Override
public void onCompleted() {
subject.onCompleted();
}

@Override
public void onError(Throwable e) {
subject.onError(new ClientException("There was an unexpected error while adding data", e));
}

@Override
public void onNext(HttpClientResponse response) {
if (response.statusCode() != 200) {
subject.onError(new WriteException(response.statusMessage(), response.statusCode()));
}
}
}

}

0 comments on commit dc723db

Please sign in to comment.