Skip to content

Commit

Permalink
Remove all the findTypeData / insertTypeData from the DataAccess and …
Browse files Browse the repository at this point in the history
…replace with single generic methods
  • Loading branch information
burmanm committed Jul 4, 2017
1 parent f1cafe9 commit 301c92a
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 426 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,7 @@ public interface DataAccess {

Observable<Row> findAllMetricsInData();

Observable<Integer> insertGaugeDatas(Observable<Metric<Double>> gauges,
Function<MetricId<Double>, Integer> ttlFunc);

Observable<Integer> insertGaugeData(Metric<Double> metric);

Observable<Integer> insertGaugeData(Metric<Double> metric, int ttl);
<T> Observable<Integer> insertData(Observable<Metric<T>> metrics);

Observable<Integer> insertStringDatas(Observable<Metric<String>> strings,
Function<MetricId<String>, Integer> ttlFetcher, int maxSize);
Expand All @@ -81,31 +76,23 @@ Observable<Integer> insertStringDatas(Observable<Metric<String>> strings,

Observable<Integer> insertStringData(Metric<String> metric, int ttl, int maxSize);

Observable<Integer> insertCounterDatas(Observable<Metric<Long>> counters,
Function<MetricId<Long>, Integer> ttlFetcher);

Observable<Integer> insertCounterData(Metric<Long> counter);

Observable<Integer> insertCounterData(Metric<Long> counter, int ttl);

Observable<Row> findCounterData(MetricId<Long> id, long startTime, long endTime, int limit, Order order,
int pageSize);

Observable<Row> findCompressedData(MetricId<?> id, long startTime, long endTime, int limit, Order
order);

Observable<Row> findTempGaugeData(MetricId<Double> id, long startTime, long endTime, int limit, Order order,
int pageSize);
<T> Observable<Row> findTempData(MetricId<T> id, long startTime, long endTime, int limit, Order order,
int pageSize);

Observable<Row> findGaugeData(MetricId<Double> id, long startTime, long endTime, int limit, Order order,
int pageSize);
<T> Observable<Row> findOldData(MetricId<T> id, long startTime, long endTime, int limit, Order order,
int pageSize);

Observable<Row> findStringData(MetricId<String> id, long startTime, long endTime, int limit, Order order,
int pageSize);
int pageSize);

@Deprecated
Observable<Row> findAvailabilityData(MetricId<AvailabilityType> id, long startTime, long endTime, int limit,
Order order, int pageSize);

@Deprecated
Observable<Row> findAvailabilityData(MetricId<AvailabilityType> id, long timestamp);

<T> Observable<ResultSet> deleteMetricData(MetricId<T> id);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ public static DataPoint<Long> getCounterDataPoint(Row row) {
row.getMap(COUNTER_COLS.TAGS.ordinal(), String.class, String.class));
}

public static DataPoint<Long> getTempCounterDataPoint(Row row) {
return new DataPoint<>(
row.getTimestamp(COUNTER_COLS.TIME.ordinal()).toInstant().toEpochMilli(),
row.getLong(COUNTER_COLS.VALUE.ordinal()),
row.getMap(COUNTER_COLS.TAGS.ordinal(), String.class, String.class));
}

public static DataPoint<String> getStringDataPoint(Row row) {
return new DataPoint<>(
UUIDs.unixTimestamp(row.getUUID(STRING_COLS.TIME.ordinal())),
Expand All @@ -104,6 +111,13 @@ public static DataPoint<AvailabilityType> getAvailabilityDataPoint(Row row) {
row.getMap(COUNTER_COLS.TAGS.ordinal(), String.class, String.class));
}

public static DataPoint<AvailabilityType> getTempAvailabilityDataPoint(Row row) {
return new DataPoint<>(
row.getTimestamp(AVAILABILITY_COLS.TIME.ordinal()).toInstant().toEpochMilli(),
AvailabilityType.fromBytes(row.getBytes(AVAILABILITY_COLS.AVAILABILITY.ordinal())),
row.getMap(COUNTER_COLS.TAGS.ordinal(), String.class, String.class));
}

public static Tenant getTenant(Row row) {
String tenantId = row.getString(0);
Map<MetricType<?>, Integer> retentions = row.getMap(1, String.class, Integer.class).entrySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static com.google.common.base.Preconditions.checkArgument;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -166,7 +167,6 @@ public int hashCode() {
*/
private Map<MetricType<?>, Func1<Observable<? extends Metric<?>>, Observable<Integer>>> pointsInserter;


/**
* Measurements of the throughput of inserting data points.
*/
Expand Down Expand Up @@ -229,12 +229,12 @@ public void startUp(Session session, String keyspace, boolean resetDb, boolean c
.put(GAUGE, metric -> {
@SuppressWarnings("unchecked")
Observable<Metric<Double>> gauge = (Observable<Metric<Double>>) metric;
return dataAccess.insertGaugeDatas(gauge, this::getTTL);
return dataAccess.insertData(gauge);
})
.put(COUNTER, metric -> {
@SuppressWarnings("unchecked")
Observable<Metric<Long>> counter = (Observable<Metric<Long>>) metric;
return dataAccess.insertCounterDatas(counter, this::getTTL);
return dataAccess.insertData(counter);
})
.put(AVAILABILITY, metric -> {
@SuppressWarnings("unchecked")
Expand All @@ -251,21 +251,11 @@ public void startUp(Session session, String keyspace, boolean resetDb, boolean c
dataPointFinders = ImmutableMap
.<MetricType<?>, Func6<? extends MetricId<?>, Long, Long, Integer, Order, Integer,
Observable<Row>>>builder()
.put(GAUGE, (metricId, start, end, limit, order, pageSize) -> {
@SuppressWarnings("unchecked")
MetricId<Double> gaugeId = (MetricId<Double>) metricId;
return dataAccess.findGaugeData(gaugeId, start, end, limit, order, pageSize);
})
.put(AVAILABILITY, (metricId, start, end, limit, order, pageSize) -> {
@SuppressWarnings("unchecked")
MetricId<AvailabilityType> availabilityId = (MetricId<AvailabilityType>) metricId;
return dataAccess.findAvailabilityData(availabilityId, start, end, limit, order, pageSize);
})
.put(COUNTER, (metricId, start, end, limit, order, pageSize) -> {
@SuppressWarnings("unchecked")
MetricId<Long> counterId = (MetricId<Long>) metricId;
return dataAccess.findCounterData(counterId, start, end, limit, order, pageSize);
})
.put(STRING, (metricId, start, end, limit, order, pageSize) -> {
@SuppressWarnings("unchecked")
MetricId<String> stringId = (MetricId<String>) metricId;
Expand All @@ -282,6 +272,8 @@ public void startUp(Session session, String keyspace, boolean resetDb, boolean c

tempDataPointMappers = ImmutableMap.<MetricType<?>, Func1<Row, ? extends DataPoint<?>>> builder()
.put(GAUGE, Functions::getTempGaugeDataPoint)
.put(COUNTER, Functions::getTempCounterDataPoint)
.put(AVAILABILITY, Functions::getTempAvailabilityDataPoint)
.build();

initConfiguration(session);
Expand Down Expand Up @@ -616,7 +608,6 @@ public Observable<Void> deleteTags(Metric<?> metric, Set<String> tags) {
.map(r -> null);
}


@Override
public <T> Observable<Void> addDataPoints(MetricType<T> metricType, Observable<Metric<T>> metrics) {
checkArgument(metricType != null, "metricType is null");
Expand Down Expand Up @@ -668,45 +659,45 @@ public <T> Observable<DataPoint<T>> findDataPoints(MetricId<T> metricId, long st
// know the last successful compressjob time here)
// TODO Out-of-order writes and a time window to insert to temp tables

Observable<DataPoint<T>> uncompressedPoints = finder.call(metricId, start, end, limit, safeOrder, pageSize)
.map(mapper);
Func1<Row, DataPoint<T>> tempMapper = (Func1<Row, DataPoint<T>>) tempDataPointMappers.get(metricType);

// Calls mostly deprecated methods..
Observable<DataPoint<T>> uncompressedPoints = dataAccess.findOldData(metricId, start, end, limit, safeOrder,
pageSize).map(mapper).doOnError(Throwable::printStackTrace);

Observable<DataPoint<T>> compressedPoints =
dataAccess.findCompressedData(metricId, sliceStart, end, limit, safeOrder)
.compose(new DataPointDecompressTransformer(metricType, safeOrder, limit, start, end));

Observable<DataPoint<T>> tempStoragePoints = dataAccess.findTempData(metricId, start, end, limit,
safeOrder, pageSize)
.map(tempMapper);

Comparator<DataPoint<T>> comparator = getDataPointComparator(safeOrder);
List<Observable<? extends DataPoint<T>>> sources = new ArrayList<>(3);
sources.add(uncompressedPoints);
sources.add(compressedPoints);

if(metricType == GAUGE) {
Func1<Row, DataPoint<T>> tempMapper = (Func1<Row, DataPoint<T>>) tempDataPointMappers.get(metricType);

// TODO This should be pluggable storage .. where we could do the queries as well. Just make it use
// dataAccess in this Cassandra temporary table solution
// Write an interface that allows all the necessary queries (mm.. kinda like MetricsService then I
// guess.. iiks)
// TODO This should be pluggable storage .. where we could do the queries as well. Just make it use
// dataAccess in this Cassandra temporary table solution
// Write an interface that allows all the necessary queries (mm.. kinda like MetricsService then I
// guess.. iiks)

Observable<DataPoint<T>> tempStoragePoints = dataAccess.findTempGaugeData((MetricId<Double>) metricId, start, end, limit,
safeOrder, pageSize)
.map(tempMapper);

sources.add(tempStoragePoints);
}
Observable<DataPoint<T>> dataPoints = SortedMerge.create(sources, comparator, false)
Observable<DataPoint<T>> dataPoints = SortedMerge.create(Arrays.asList(uncompressedPoints,
compressedPoints, tempStoragePoints), comparator, false)
.distinctUntilChanged(
(tDataPoint, tDataPoint2) -> comparator.compare(tDataPoint, tDataPoint2) == 0);

if(limit > 0) {
dataPoints = dataPoints.take(limit);
}

results = dataPoints;
} else {
results = finder.call(metricId, start, end, limit, safeOrder, pageSize).map(mapper);
return dataPoints;
}
Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>> finder =
getDataPointFinder(metricType);

results = finder.call(metricId, start, end, limit, safeOrder, pageSize).map(mapper);
return results.doOnCompleted(context::stop);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ protected <V> V getUninterruptibly(ListenableFuture<V> future) throws ExecutionE
*/
protected <T> void doAction(Supplier<Observable<T>> fn) {
TestSubscriber<T> subscriber = new TestSubscriber<>();
Observable<T> observable = fn.get();
Observable<T> observable = fn.get().doOnError(Throwable::printStackTrace);
observable.subscribe(subscriber);
subscriber.awaitTerminalEvent(5, TimeUnit.SECONDS);
subscriber.assertNoErrors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ public void insertAndFindGaugeRawData() throws Exception {
new DataPoint<>(end.getMillis(), 1.234)
));

dataAccess.insertGaugeData(metric, DEFAULT_TTL).toBlocking().last();
dataAccess.insertData(Observable.just(metric)).toBlocking().last();

Observable<Row> observable = dataAccess.findGaugeData(new MetricId<>("tenant-1", GAUGE, "metric-1"),
Observable<Row> observable = dataAccess.findTempData(new MetricId<>("tenant-1", GAUGE, "metric-1"),
start.getMillis(), end.getMillis(), 0, Order.DESC, DEFAULT_PAGE_SIZE);
List<DataPoint<Double>> actual = ImmutableList.copyOf(observable
.map(Functions::getGaugeDataPoint)
Expand Down Expand Up @@ -150,9 +150,9 @@ public void addMetadataToGaugeRawData() throws Exception {
new DataPoint<>(end.getMillis(), 1.234)
));

dataAccess.insertGaugeData(metric, DEFAULT_TTL).toBlocking().last();
dataAccess.insertData(Observable.just(metric)).toBlocking().last();

Observable<Row> observable = dataAccess.findGaugeData(new MetricId<>("tenant-1", GAUGE, "metric-1"),
Observable<Row> observable = dataAccess.findTempData(new MetricId<>("tenant-1", GAUGE, "metric-1"),
start.getMillis(), end.getMillis(), 0, Order.DESC, DEFAULT_PAGE_SIZE);
List<DataPoint<Double>> actual = ImmutableList.copyOf(observable
.map(Functions::getGaugeDataPoint)
Expand Down Expand Up @@ -198,7 +198,7 @@ public void findAllMetricsPartitionKeys() throws Exception {
new Metric<>(new MetricId<>("t1", GAUGE, "m2"), singletonList(new DataPoint<>(start+1, 0.1))),
new Metric<>(new MetricId<>("t1", GAUGE, "m3"), singletonList(new DataPoint<>(start+2, 0.1))),
new Metric<>(new MetricId<>("t1", GAUGE, "m4"), singletonList(new DataPoint<>(start+3, 0.1)))))
.flatMap(m -> dataAccess.insertGaugeData(m, DEFAULT_TTL))
.flatMap(m -> dataAccess.insertData(Observable.just(m)))
.toBlocking().lastOrDefault(null);

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,8 @@ public Observable<Row> findAllMetricsInData() {
}

@Override
public Observable<Integer> insertGaugeDatas(Observable<Metric<Double>> gauges,
Function<MetricId<Double>, Integer> ttlFunction) {
return delegate.insertGaugeDatas(gauges, ttlFunction);
}

@Override
public Observable<Integer> insertGaugeData(Metric<Double> metric) {
return delegate.insertGaugeData(metric);
}

@Override
public Observable<Integer> insertGaugeData(Metric<Double> gauge, int ttl) {
return delegate.insertGaugeData(gauge, ttl);
public <T> Observable<Integer> insertData(Observable<Metric<T>> metrics) {
return delegate.insertData(metrics);
}

@Override
Expand All @@ -139,55 +128,32 @@ public Observable<Integer> insertStringDatas(Observable<Metric<String>> strings,
return delegate.insertStringDatas(strings, ttlFetcher, maxSize);
}

@Override
public Observable<Integer> insertCounterData(Metric<Long> counter, int ttl) {
return delegate.insertCounterData(counter, ttl);
}

@Override
public Observable<Row> findCounterData(MetricId<Long> id, long startTime, long endTime, int limit,
Order order, int pageSize) {
return delegate.findCounterData(id, startTime, endTime, limit, order, pageSize);
}

@Override
public Observable<Row> findCompressedData(MetricId<?> id, long startTime, long endTime, int limit, Order order) {
return delegate.findCompressedData(id, startTime, endTime, limit, order);
}

@Override
public Observable<Row> findTempGaugeData(MetricId<Double> id, long startTime, long endTime, int limit, Order order,
int pageSize) {
return delegate.findTempGaugeData(id, startTime, endTime, limit, order, pageSize);
public <T> Observable<Row> findTempData(MetricId<T> id, long startTime, long endTime, int limit, Order order,
int pageSize) {
return delegate.findTempData(id, startTime, endTime, limit, order, pageSize);
}

@Override
public Observable<Row> findGaugeData(MetricId<Double> id, long startTime, long endTime, int limit, Order order,
int pageSize) {
return delegate.findGaugeData(id, startTime, endTime, limit, order, pageSize);
public <T> Observable<Row> findOldData(MetricId<T> id, long startTime, long endTime, int limit, Order order,
int pageSize) {
return delegate.findOldData(id, startTime, endTime, limit, order, pageSize);
}

@Override
public Observable<Integer> insertStringData(Metric<String> metric, int maxSize) {
return delegate.insertStringData(metric, maxSize);
}

@Override
public Observable<Integer> insertCounterData(Metric<Long> counter) {
return null;
}

@Override
public Observable<Integer> insertStringData(Metric<String> metric, int ttl, int maxSize) {
return delegate.insertStringData(metric, ttl, maxSize);
}

@Override
public Observable<Integer> insertCounterDatas(Observable<Metric<Long>> counters,
Function<MetricId<Long>, Integer> ttlFetcher) {
return delegate.insertCounterDatas(counters, ttlFetcher);
}

@Override
public Observable<Row> findStringData(MetricId<String> id, long startTime, long endTime, int limit, Order order,
int pageSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,6 @@ public void setAvailabilityTTL(int availabilityTTL) {
this.availabilityTTL = availabilityTTL;
}

@Override
public Observable<Integer> insertGaugeData(Metric<Double> gauge, int ttl) {
assertEquals(ttl, gaugeTTL, "The gauge TTL does not match the expected value when inserting data");
return super.insertGaugeData(gauge, ttl);
}

@Override
public Observable<Integer> insertAvailabilityData(Metric<AvailabilityType> metric, int ttl) {
assertEquals(ttl, availabilityTTL, "The availability data TTL does not match the expected value when " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void addAndFetchCounterData() throws Exception {
doAction(() -> metricsService.addDataPoints(COUNTER, Observable.just(counter)));

Observable<DataPoint<Long>> data = metricsService.findDataPoints(new MetricId<>(tenantId, COUNTER, "c1"),
start.getMillis(), end.getMillis(), 0, Order.DESC);
start.getMillis(), end.getMillis(), 0, Order.DESC).doOnError(Throwable::printStackTrace);
List<DataPoint<Long>> actual = toList(data);
List<DataPoint<Long>> expected = asList(
new DataPoint<>(start.plusMinutes(4).getMillis(), 25L),
Expand Down

0 comments on commit 301c92a

Please sign in to comment.