Skip to content

Commit

Permalink
[HKMETRICS-124] add l_value column and clean up impl for inserting da…
Browse files Browse the repository at this point in the history
…ta points
  • Loading branch information
John Sanda committed Jun 12, 2015
1 parent 7c262d6 commit 12e25c6
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ Observable<ResultSet> updateTagsInMetricsIndex(Metric metric, Map<String, String

Observable<ResultSet> findMetricsInMetricsIndex(String tenantId, MetricType type);

Observable<Integer> insertData(Observable<GaugeAndTTL> gaugeObservable);
Observable<Integer> insertData(Metric<Double> metric, int ttl);

// Observable<Integer> insertCounterData(Observable)

Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.hawkular.metrics.core.impl;

import static com.datastax.driver.core.BatchStatement.Type.UNLOGGED;
import static org.hawkular.metrics.core.impl.TimeUUIDUtils.getTimeUUID;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -385,7 +386,7 @@ public Observable<ResultSet> addTagsAndDataRetention(Metric metric) {

@Override
public Observable<ResultSet> addTags(Metric metric, Map<String, String> tags) {
BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
BatchStatement batch = new BatchStatement(UNLOGGED);
batch.add(addMetricTagsToDataTable.bind(tags, metric.getTenantId(), metric.getType().getCode(),
metric.getId().getName(), metric.getId().getInterval().toString(), DPART));
batch.add(addTagsToMetricsIndex.bind(tags, metric.getTenantId(), metric.getType().getCode(),
Expand All @@ -395,7 +396,7 @@ public Observable<ResultSet> addTags(Metric metric, Map<String, String> tags) {

@Override
public Observable<ResultSet> deleteTags(Metric metric, Set<String> tags) {
BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
BatchStatement batch = new BatchStatement(UNLOGGED);
batch.add(deleteMetricTagsFromDataTable.bind(tags, metric.getTenantId(), metric.getType().getCode(),
metric.getId().getName(), metric.getId().getInterval().toString(), DPART));
batch.add(deleteTagsFromMetricsIndex.bind(tags, metric.getTenantId(), metric.getType().getCode(),
Expand All @@ -406,7 +407,7 @@ public Observable<ResultSet> deleteTags(Metric metric, Set<String> tags) {
@Override
public Observable<ResultSet> updateTagsInMetricsIndex(Metric metric, Map<String, String> additions,
Set<String> deletions) {
BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED)
BatchStatement batchStatement = new BatchStatement(UNLOGGED)
.add(addTagsToMetricsIndex.bind(additions, metric.getTenantId(),
metric.getType().getCode(), metric.getId().getInterval().toString(), metric.getId().getName()))
.add(deleteTagsFromMetricsIndex.bind(deletions, metric.getTenantId(), metric.getType().getCode(),
Expand All @@ -416,7 +417,7 @@ public Observable<ResultSet> updateTagsInMetricsIndex(Metric metric, Map<String,

@Override
public <T> ResultSetFuture updateMetricsIndex(List<Metric<T>> metrics) {
BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED);
BatchStatement batchStatement = new BatchStatement(UNLOGGED);
for (Metric metric : metrics) {
batchStatement.add(updateMetricsIndex.bind(metric.getTenantId(), metric.getType().getCode(),
metric.getId().getInterval().toString(), metric.getId().getName()));
Expand All @@ -426,7 +427,7 @@ public <T> ResultSetFuture updateMetricsIndex(List<Metric<T>> metrics) {

@Override
public <T> Observable<Integer> updateMetricsIndexRx(Observable<Metric<T>> metrics) {
return metrics.reduce(new BatchStatement(BatchStatement.Type.UNLOGGED), (batch, metric) -> {
return metrics.reduce(new BatchStatement(UNLOGGED), (batch, metric) -> {
batch.add(updateMetricsIndex.bind(metric.getTenantId(), metric.getType().getCode(),
metric.getId().getInterval().toString(), metric.getId().getName()));
return batch;
Expand All @@ -439,12 +440,11 @@ public Observable<ResultSet> findMetricsInMetricsIndex(String tenantId, MetricTy
}

@Override
public Observable<Integer> insertData(Observable<GaugeAndTTL> observable) {
return observable.flatMap(pair -> Observable.from(pair.gauge.getDataPoints())
.map(dataPoint -> bindDataPoint(pair.gauge, dataPoint, pair.ttl))
.reduce(new BatchStatement(BatchStatement.Type.UNLOGGED), BatchStatement::add)
.flatMap(batch -> rxSession.execute(batch).map(resultSet -> batch.size()))
);
public Observable<Integer> insertData(Metric<Double> gauge, int ttl) {
return Observable.from(gauge.getDataPoints())
.map(dataPoint -> bindDataPoint(gauge, dataPoint, ttl))
.reduce(new BatchStatement(UNLOGGED), BatchStatement::add)
.flatMap(batch -> rxSession.execute(batch).map(resultSet -> batch.size()));
}

private BoundStatement bindDataPoint(Metric gauge, DataPoint<Double> dataPoint, int ttl) {
Expand Down Expand Up @@ -531,7 +531,7 @@ public Observable<ResultSet> findAvailabilityData(Metric<AvailabilityType> metri
@Override
public Observable<ResultSet> deleteGaugeMetric(String tenantId, String metric, Interval interval, long dpart) {
return rxSession.execute(deleteGaugeMetric.bind(tenantId, MetricType.GAUGE.getCode(), metric,
interval.toString(), dpart));
interval.toString(), dpart));
}

@Override
Expand All @@ -543,7 +543,7 @@ public Observable<ResultSet> findAllGaugeMetrics() {
public Observable<ResultSet> insertGaugeTag(String tag, String tagValue, Metric<Double> metric,
Observable<TTLDataPoint<Double>> data) {
return data.reduce(
new BatchStatement(BatchStatement.Type.UNLOGGED),
new BatchStatement(UNLOGGED),
(batch, d) -> {
batch.add(insertGaugeTags.bind(metric.getTenantId(), tag, tagValue, MetricType.GAUGE.getCode(), metric
.getId().getName(), metric.getId().getInterval().toString(),
Expand All @@ -556,7 +556,7 @@ public Observable<ResultSet> insertGaugeTag(String tag, String tagValue, Metric<
public Observable<ResultSet> insertAvailabilityTag(String tag, String tagValue,
Metric<AvailabilityType> metric, Observable<TTLDataPoint<AvailabilityType>> data) {
return data.reduce(
new BatchStatement(BatchStatement.Type.UNLOGGED),
new BatchStatement(UNLOGGED),
(batch, a) -> {
batch.add(insertAvailabilityTags.bind(metric.getTenantId(), tag, tagValue,
MetricType.AVAILABILITY.getCode(), metric.getId().getName(), metric.getId().getInterval()
Expand Down Expand Up @@ -586,7 +586,7 @@ public Observable<ResultSet> findAvailabilityByTag(String tenantId, String tag,
@Override
public Observable<Integer> insertAvailabilityData(Metric<AvailabilityType> metric, int ttl) {
return Observable.from(metric.getDataPoints())
.reduce(new BatchStatement(BatchStatement.Type.UNLOGGED), (batchStatement, a) -> {
.reduce(new BatchStatement(UNLOGGED), (batchStatement, a) -> {
batchStatement.add(insertAvailability.bind(ttl, metric.getTags(), getBytes(a),
metric.getTenantId(), metric.getType().getCode(), metric.getId().getName(), metric.getId()
.getInterval().toString(), DPART, getTimeUUID(a.getTimestamp())));
Expand All @@ -612,7 +612,7 @@ public ResultSetFuture findDataRetentions(String tenantId, MetricType type) {

@Override
public ResultSetFuture updateRetentionsIndex(String tenantId, MetricType type, Set<Retention> retentions) {
BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED);
BatchStatement batchStatement = new BatchStatement(UNLOGGED);
for (Retention r : retentions) {
batchStatement.add(updateRetentionsIndex.bind(tenantId, type.getCode(), r.getId().getInterval().toString(),
r.getId().getName(), r.getValue()));
Expand All @@ -634,7 +634,7 @@ public Observable<ResultSet> deleteFromMetricsTagsIndex(Metric metric, Map<Strin

private Observable<ResultSet> executeTagsBatch(Map<String, String> tags,
BiFunction<String, String, BoundStatement> bindVars) {
BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED);
BatchStatement batchStatement = new BatchStatement(UNLOGGED);
tags.entrySet().stream().forEach(entry -> batchStatement.add(bindVars.apply(entry.getKey(), entry.getValue())));
return rxSession.execute(batchStatement);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,7 @@ public Observable<Void> addGaugeData(Observable<Metric<Double>> gaugeObservable)
// explicitly created, just not necessarily right away.

PublishSubject<Void> results = PublishSubject.create();
Observable<Integer> updates = dataAccess.insertData(
gaugeObservable.map(gauge -> new GaugeAndTTL(gauge, getTTL(gauge))));
Observable<Integer> updates = gaugeObservable.flatMap(g -> dataAccess.insertData(g, getTTL(g)));
// I am intentionally return zero for the number index updates because I want to measure and compare the
// throughput inserting data with and without the index updates. This will give us a better idea of how much
// over there is with the index updates.
Expand Down Expand Up @@ -508,6 +507,16 @@ public Observable<Void> addAvailabilityData(List<Metric<AvailabilityType>> metri
return results;
}

@Override
public Observable<Void> addCounterData(List<Metric<Long>> metrics) {
return null;
}

@Override
public Observable<DataPoint<Long>> findCounterData(String tenantId, MetricId id, long start, long end) {
return null;
}

@Override
public Observable<DataPoint<Double>> findGaugeData(String tenantId, MetricId id, Long start, Long end) {
// When we implement date partitioning, dpart will have to be determined based on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void insertAndFindGaugeRawData() throws Exception {
new DataPoint<>(end.getMillis(), 1.234)
));

dataAccess.insertData(Observable.just(new GaugeAndTTL(metric, DEFAULT_TTL))).toBlocking().last();
dataAccess.insertData(metric, DEFAULT_TTL).toBlocking().last();

Observable<ResultSet> observable = dataAccess.findData("tenant-1", new MetricId("metric-1"), start.getMillis(),
end.getMillis());
Expand Down Expand Up @@ -162,7 +162,7 @@ public void addMetadataToGaugeRawData() throws Exception {
new DataPoint<>(end.getMillis(), 1.234)
));

dataAccess.insertData(Observable.just(new GaugeAndTTL(metric, DEFAULT_TTL))).toBlocking().last();
dataAccess.insertData(metric, DEFAULT_TTL).toBlocking().last();

Observable<ResultSet> observable = dataAccess.findData("tenant-1", new MetricId("metric-1"), start.getMillis(),
end.getMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ public Observable<ResultSet> findMetricsInMetricsIndex(String tenantId, MetricTy
}

@Override
public Observable<Integer> insertData(Observable<GaugeAndTTL> gaugeObservable) {
return delegate.insertData(gaugeObservable);
public Observable<Integer> insertData(Metric<Double> gauge, int ttl) {
return delegate.insertData(gauge, ttl);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import com.codahale.metrics.MetricRegistry;
Expand Down Expand Up @@ -421,18 +420,12 @@ private void addGaugeDataInThePast(Metric<Double> metric, final Duration duratio
metricsService.setDataAccess(new DelegatingDataAccess(dataAccess) {

@Override
public Observable<Integer> insertData(Observable<GaugeAndTTL> observable) {
List<ResultSetFuture> futures = new ArrayList<>();
final AtomicInteger totalInserts = new AtomicInteger();
for (GaugeAndTTL pair : observable.toBlocking().toIterable()) {
totalInserts.addAndGet(pair.gauge.getDataPoints().size());
futures.add(insertData(pair.gauge, pair.ttl));
}
return Observable.from(futures)
.flatMap(future -> Observable.from(future).map(resultSet -> totalInserts.get()));
public Observable<Integer> insertData(Metric<Double> gauge, int ttl) {
return Observable.from(insertDataWithNewWriteTime(gauge, ttl))
.map(resultSet -> gauge.getDataPoints().size());
}

public ResultSetFuture insertData(Metric<Double> m, int ttl) {
public ResultSetFuture insertDataWithNewWriteTime(Metric<Double> m, int ttl) {
int actualTTL = ttl - duration.toStandardSeconds().getSeconds();
long writeTime = now().minus(duration).getMillis() * 1000;
BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.UNLOGGED);
Expand Down Expand Up @@ -1080,10 +1073,9 @@ public void availabilityTagTLLLessThanEqualTo(int availabilityTagTTL) {
}

@Override
public Observable<Integer> insertData(Observable<GaugeAndTTL> observable) {
observable.forEach(pair -> assertEquals(pair.ttl, gaugeTTL,
"The gauge TTL does not match the expected value when inserting data"));
return super.insertData(observable);
public Observable<Integer> insertData(Metric<Double> gauge, int ttl) {
assertEquals(ttl, gaugeTTL, "The gauge TTL does not match the expected value when inserting data");
return super.insertData(gauge, ttl);
}

@Override
Expand Down
1 change: 1 addition & 0 deletions schema-manager/src/main/resources/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ CREATE TABLE ${keyspace}.data (
m_tags map<text, text> static,
n_value double,
availability blob,
l_value bigint,
aggregates set<frozen <aggregate_data>>,
tags map<text, text>,
PRIMARY KEY ((tenant_id, type, metric, interval, dpart), time)
Expand Down

0 comments on commit 12e25c6

Please sign in to comment.