Skip to content

Commit

Permalink
[HWKMETRICS-74] small refactoring for storing data
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed May 19, 2015
1 parent b030c1d commit c598865
Showing 1 changed file with 16 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -610,22 +610,22 @@ public Observable<Void> addGaugeData(Observable<Gauge> gaugeObservable) {
// pass. This means that the behavior basically remains the same as before. The idea here is to implement a
// pub/sub workflow.

return Observable.create(subscriber -> {
Map<Gauge, ResultSetFuture> insertsMap = new HashMap<>();
gaugeObservable.subscribe(
gauge -> {
int ttl = getTTL(gauge);
insertsMap.put(gauge, dataAccess.insertData(gauge, ttl));
},
t -> logger.warn("There was an error receive gauge data to insert", t),
() -> {
List<ResultSetFuture> inserts = Lists.newArrayList(insertsMap.values());
inserts.add(dataAccess.updateMetricsIndex(ImmutableList.copyOf(insertsMap.keySet())));
ListenableFuture<List<ResultSet>> insertsFuture = Futures.allAsList(inserts);
Observable<List<ResultSet>> dataInserted = RxUtil.from(insertsFuture, metricsTasks);
dataInserted.subscribe(new VoidSubscriber<>(subscriber));
});
});
return Observable.create(subscriber -> gaugeObservable
.reduce(new HashMap<>(), (HashMap<Gauge, ResultSetFuture> map, Gauge gauge) -> {
int ttl = getTTL(gauge);
ResultSetFuture future = dataAccess.insertData(gauge, ttl);
map.put(gauge, future);
return map;
})
.map(insertsMap -> {
List<ResultSetFuture> inserts = Lists.newArrayList(insertsMap.values());
inserts.add(dataAccess.updateMetricsIndex(ImmutableList.copyOf(insertsMap.keySet())));
return inserts;
})
.map(Futures::allAsList)
.map(insertsFuture -> RxUtil.from(insertsFuture, metricsTasks))
.subscribe(new VoidSubscriber<>(subscriber))
);
}

@Override
Expand Down

0 comments on commit c598865

Please sign in to comment.