Skip to content

Commit

Permalink
[HWKMETRICS-613] Reduce the number of writes to the expiration for da…
Browse files Browse the repository at this point in the history
…ta points by updating the retention index only when a compressed block is persisted.
  • Loading branch information
Stefan Negrea committed Mar 28, 2017
1 parent 5e78205 commit cc02682
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -41,6 +41,7 @@
import rx.Completable;
import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

/**
* @author Michael Burman
Expand Down Expand Up @@ -126,9 +127,14 @@ public Completable call(JobDetails jobDetails) {
.map(Metric::getMetricId)
.filter(m -> (m.getType() == GAUGE || m.getType() == COUNTER || m.getType() == AVAILABILITY));

PublishSubject<Metric<?>> subject = PublishSubject.create();
subject.doOnNext(metric -> {
this.metricsService.updateMetricExpiration(Observable.just(metric));
});

// Fetch all partition keys and compress the previous timeSlice
// TODO Optimization - new worker per token - use parallelism in Cassandra (with configured parallelism)
return metricsService.compressBlock(metricIds, startOfSlice, endOfSlice, pageSize)
return metricsService.compressBlock(metricIds, startOfSlice, endOfSlice, pageSize, subject)
.doOnError(t -> logger.warn("Failed to compress data", t))
.doOnCompleted(() -> {
stopwatch.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import rx.Completable;
import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

/**
* Interface that defines the functionality of the Metrics Service.
Expand Down Expand Up @@ -184,7 +185,7 @@ <T> Observable<DataPoint<T>> findDataPoints(MetricId<T> id, long start, long end
* @return onComplete when job is done
*/
Completable compressBlock(Observable<? extends MetricId<?>> metrics, long startTimeSlice, long endTimeSlice,
int pageSize);
int pageSize, PublishSubject<Metric<?>> metricIdSubject);

<T> Observable<NamedDataPoint<T>> findDataPoints(List<MetricId<T>> ids, long start, long end, int limit,
Order order);
Expand Down Expand Up @@ -354,4 +355,6 @@ Observable<List<NumericBucketPoint>> findRateStats(MetricId<? extends Number> id
Observable<Metric<?>> insertedDataEvents();

<T> Func1<Metric<T>, Boolean> idFilter(String regexp);

<T> void updateMetricExpiration(Observable<Metric<T>> metric);
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,25 +221,21 @@ public void startUp(Session session, String keyspace, boolean resetDb, boolean c
.put(GAUGE, metric -> {
@SuppressWarnings("unchecked")
Observable<Metric<Double>> gauge = (Observable<Metric<Double>>) metric;
this.updateMetricExpiration(gauge);
return dataAccess.insertGaugeDatas(gauge, this::getTTL);
})
.put(COUNTER, metric -> {
@SuppressWarnings("unchecked")
Observable<Metric<Long>> counter = (Observable<Metric<Long>>) metric;
this.updateMetricExpiration(counter);
return dataAccess.insertCounterDatas(counter, this::getTTL);
})
.put(AVAILABILITY, metric -> {
@SuppressWarnings("unchecked")
Observable<Metric<AvailabilityType>> avail = (Observable<Metric<AvailabilityType>>) metric;
this.updateMetricExpiration(avail);
return dataAccess.insertAvailabilityDatas(avail, this::getTTL);
})
.put(STRING, metric -> {
@SuppressWarnings("unchecked")
Observable<Metric<String>> string = (Observable<Metric<String>>) metric;
this.updateMetricExpiration(string);
return dataAccess.insertStringDatas(string, this::getTTL, maxStringSize);
})
.build();
Expand Down Expand Up @@ -715,19 +711,21 @@ private <T> Observable.Transformer<T, T> applyRetryPolicy() {

@Override
@SuppressWarnings("unchecked")
public Completable compressBlock(Observable<? extends MetricId<?>> metrics, long startTimeSlice, long
endTimeSlice, int pageSize) {
public Completable compressBlock(Observable<? extends MetricId<?>> metrics, long startTimeSlice,
long endTimeSlice, int pageSize, PublishSubject<Metric<?>> subject) {

return Completable.fromObservable(metrics
.compose(applyRetryPolicy())
.concatMap(metricId ->
findDataPoints(metricId, startTimeSlice, endTimeSlice, 0, ASC, pageSize)
.compose(applyRetryPolicy())
.compose(new DataPointCompressTransformer(metricId.getType(), startTimeSlice))
.concatMap(cpc -> dataAccess.deleteAndInsertCompressedGauge(metricId, startTimeSlice,
(CompressedPointContainer) cpc, startTimeSlice, endTimeSlice, getTTL(metricId))
.compose(applyRetryPolicy())
)));
.concatMap(metricId -> findDataPoints(metricId, startTimeSlice, endTimeSlice, 0, ASC, pageSize)
.compose(applyRetryPolicy())
.compose(new DataPointCompressTransformer(metricId.getType(), startTimeSlice))
.map(cpc -> {
subject.onNext(new Metric<>(metricId, getTTL(metricId)));
return cpc;
})
.concatMap(cpc -> dataAccess.deleteAndInsertCompressedGauge(metricId, startTimeSlice,
(CompressedPointContainer) cpc, startTimeSlice, endTimeSlice, getTTL(metricId))
.compose(applyRetryPolicy()))));
}

@Override
Expand Down Expand Up @@ -1026,10 +1024,21 @@ public <T> Observable<Void> deleteMetric(MetricId<T> id) {
return result;
}

private <T> void updateMetricExpiration(Observable<Metric<T>> metric) {
metric.forEach(m -> ListenableFutureObservable
.from(dataAccess.updateMetricExpirationIndex(m.getMetricId(),
System.currentTimeMillis() + this.getTTL(m.getMetricId())), metricsTasks)
.doOnError(t -> log.error("Failure to update expiration index", t)));
@Override
public <T> void updateMetricExpiration(Observable<Metric<T>> metric) {
final long dayToMilliseconds = 24 * 3600 * 1000;
metric.filter(m -> !MetricType.STRING.equals(m.getType()))
.forEach(m -> {
long expiration = 0;
if(m.getDataRetention()!= null) {
expiration = System.currentTimeMillis() + m.getDataRetention() * dayToMilliseconds;
} else {
expiration = System.currentTimeMillis() + this.getTTL(m.getMetricId()) * dayToMilliseconds;
}

ListenableFutureObservable
.from(dataAccess.updateMetricExpirationIndex(m.getMetricId(), expiration), metricsTasks)
.doOnError(t -> log.error("Failure to update expiration index", t));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import rx.Completable;
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;

public class CounterITest extends BaseMetricsITest {

Expand Down Expand Up @@ -261,7 +262,7 @@ public void addAndCompressData() throws Exception {

Completable compressCompletable =
metricsService.compressBlock(Observable.just(mId), startSlice.getMillis(), endSlice.getMillis(),
COMPRESSION_PAGE_SIZE).doOnError(Throwable::printStackTrace);
COMPRESSION_PAGE_SIZE, PublishSubject.create()).doOnError(Throwable::printStackTrace);

TestSubscriber<Void> testSubscriber = new TestSubscriber<>();
compressCompletable.subscribe(testSubscriber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import rx.Observable;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;

/**
* @author John Sanda
Expand Down Expand Up @@ -371,7 +372,7 @@ public void addAndCompressData() throws Exception {

Completable compressCompletable =
metricsService.compressBlock(Observable.just(mId), startSlice.getMillis(), endSlice.getMillis(),
COMPRESSION_PAGE_SIZE).doOnError(Throwable::printStackTrace);
COMPRESSION_PAGE_SIZE, PublishSubject.create()).doOnError(Throwable::printStackTrace);

TestSubscriber<Void> testSubscriber = new TestSubscriber<>();
compressCompletable.subscribe(testSubscriber);
Expand Down Expand Up @@ -450,7 +451,7 @@ public void addDataForSingleGaugeAndFindWithLimit() throws Exception {

Completable compressCompletable =
metricsService.compressBlock(Observable.just(mId), startSlice.getMillis(), endSlice.getMillis(),
COMPRESSION_PAGE_SIZE);
COMPRESSION_PAGE_SIZE, PublishSubject.create());

TestSubscriber<Void> testSubscriber = new TestSubscriber<>();
compressCompletable.subscribe(testSubscriber);
Expand Down

0 comments on commit cc02682

Please sign in to comment.