Skip to content

Commit

Permalink
[HWKMETRICS-74] small refactoring for observables that do not emit an…
Browse files Browse the repository at this point in the history
…y items
  • Loading branch information
John Sanda committed May 14, 2015
1 parent 7632ceb commit aefbeb6
Showing 1 changed file with 10 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ private List<String> loadTenantIds() {
public Observable<Void> createMetric(final Metric<?> metric) {
ResultSetFuture future = dataAccess.insertMetricInMetricsIndex(metric);
Observable<ResultSet> indexUpdated = RxUtil.from(future, metricsTasks);
return Observable.create(subscriber -> {
return Observable.create(subscriber ->
indexUpdated.subscribe(
resultSet -> {
if (!resultSet.wasApplied()) {
Expand All @@ -467,6 +467,11 @@ public Observable<Void> createMetric(final Metric<?> metric) {
// If adding tags/retention fails, then we want to report the error to the
// client. Updating the retentions_idx table could also fail. We need to
// report that failure as well.
//
// The error handling is the same as it was with Guava futures. That is, if any
// future fails, we treat the entire client request as a failure. We probably
// eventually want to implement more fine-grained error handling where we can
// notify the subscriber of what exactly fails.
ResultSetFuture metadataFuture = dataAccess.addTagsAndDataRetention(metric);
Observable<ResultSet> metadataUpdated = RxUtil.from(metadataFuture, metricsTasks);
ResultSetFuture tagsFuture = dataAccess.insertIntoMetricsTagsIndex(metric,
Expand All @@ -484,18 +489,9 @@ public Observable<Void> createMetric(final Metric<?> metric) {
metricUpdates = Observable.merge(metadataUpdated, tagsUpdated);
}

metricUpdates.subscribe(
resultSets -> {},
// The error handling is the same as it was with Guava futures. That is, if any
// future fails, we treat the entire client request as a failure. We probably
// eventually want to implement more fine-grained error handling where we can
// notify the subscriber of what exactly fails.
subscriber::onError,
subscriber::onCompleted
);
metricUpdates.subscribe(new VoidSubscriber<>(subscriber));
}
});
});
}));
}

@Override
Expand Down Expand Up @@ -576,13 +572,8 @@ public Observable<Void> addGaugeData(Observable<Gauge> gaugeObservable) {
List<ResultSetFuture> inserts = Lists.newArrayList(insertsMap.values());
inserts.add(dataAccess.updateMetricsIndex(ImmutableList.copyOf(insertsMap.keySet())));
ListenableFuture<List<ResultSet>> insertsFuture = Futures.allAsList(inserts);
Observable<List<ResultSet>> insertsObservable = RxUtil.from(insertsFuture, metricsTasks);
insertsObservable.subscribe(
resultSets -> {
},
subscriber::onError,
subscriber::onCompleted
);
Observable<List<ResultSet>> dataInserted = RxUtil.from(insertsFuture, metricsTasks);
dataInserted.subscribe(new VoidSubscriber<>(subscriber));
});
});
}
Expand Down

0 comments on commit aefbeb6

Please sign in to comment.