Skip to content

Commit

Permalink
retryPolicy to dropTempTable and add some comments to the compressBlock
Browse files Browse the repository at this point in the history
(cherry picked from commit b2b2335)
  • Loading branch information
Michael Burman authored and burmanm committed Jul 4, 2017
1 parent d17ca72 commit 492d462
Showing 1 changed file with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -750,15 +750,20 @@ public Completable compressBlock(long startTimeSlice, int pageSize, int maxConcu
return Completable.fromObservable(
dataAccess.findAllDataFromBucket(startTimeSlice, pageSize, maxConcurrency)
.flatMap(rows -> rows
// Each time the tokenrange changes inside the query, create new window, publish allows
// reuse of the observable in two distinct processing phases
.publish(p -> p.window(
p.map(Row::getPartitionKeyToken)
.distinctUntilChanged()))
// ConcatMap so we don't mess the order as that's important in the compression job
.concatMap(o -> {
// Cache the first key from the observable so we can use it to create a key later
Observable<Row> sharedRows = o.share();
Observable<CompressedPointContainer> compressed =
sharedRows.compose(new TempTableCompressTransformer(startTimeSlice));
Observable<Row> keyTake = sharedRows.take(1);

// Merge the first row with the compressed package to be able to write to Cassandra
return compressed.zipWith(keyTake, (cpc, r) -> {
MetricId<?> metricId =
new MetricId(r.getString(0), MetricType.fromCode(r.getByte(1)),
Expand All @@ -769,13 +774,10 @@ public Completable compressBlock(long startTimeSlice, int pageSize, int maxConcu
});
}), maxConcurrency)
.flatMap(rs -> rs)
.doOnError(Throwable::printStackTrace)
.doOnCompleted(() -> {
log.infof("Compress part completed");
dataAccess.dropTempTable(startTimeSlice)
.subscribeOn(Schedulers.io())
.subscribe();
})
.doOnCompleted(() -> dataAccess.dropTempTable(startTimeSlice)
.compose(applyRetryPolicy())
.subscribeOn(Schedulers.io())
.subscribe())
);
}

Expand Down

0 comments on commit 492d462

Please sign in to comment.