Skip to content

Commit

Permalink
[HWKMETRICS-130] initial impl for generating/storing rate data
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed Jun 17, 2015
1 parent 8be150c commit 0eac721
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ Observable<Map<MetricId, Set<DataPoint<AvailabilityType>>>> findAvailabilityByTa

Observable<DataPoint<Long>> findCounterData(String tenantId, MetricId id, long start, long end);

Observable<DataPoint<Double>> findRateData(String tenantId, MetricId id, long start, long end);

/**
* <p>
* For a specified date range, return a list of periods in which the predicate evaluates to true for each
Expand Down
6 changes: 6 additions & 0 deletions core/metrics-core-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>hawkular-metrics-task-queue</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>hawkular-rx-java-driver</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ Observable<ResultSet> updateTagsInMetricsIndex(Metric metric, Map<String, String

Observable<ResultSet> findCounterData(String tenantId, MetricId id, long startTime, long endTime);

Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime);
Observable<ResultSet> findData(String tenantId, MetricId id, MetricType type, long startTime, long endTime);

Observable<ResultSet> findData(Metric<Double> metric, long startTime, long endTime, Order order);

Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime,
Observable<ResultSet> findData(String tenantId, MetricId id, MetricType type, long startTime, long endTime,
boolean includeWriteTime);

Observable<ResultSet> findData(Metric<Double> metric, long timestamp, boolean includeWriteTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ protected void initPreparedStatements() {
findCounterDataExclusive = session.prepare(
"SELECT time, m_tags, data_retention, l_value, tags FROM data " +
"WHERE tenant_id = ? AND type = ? AND metric = ? AND interval = ? AND dpart = ? AND time >= ? " +
"AND time < ? ORDER BY time ASC");
"AND time < ?");

findGaugeDataWithWriteTimeByDateRangeExclusive = session.prepare(
"SELECT time, m_tags, data_retention, n_value, tags, WRITETIME(n_value) FROM data " +
Expand Down Expand Up @@ -473,8 +473,8 @@ private BoundStatement bindDataPoint(PreparedStatement statement, Metric<?> metr
}

@Override
public Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime) {
return findData(tenantId, id, startTime, endTime, false);
public Observable<ResultSet> findData(String tenantId, MetricId id, MetricType type, long startTime, long endTime) {
return findData(tenantId, id, type, startTime, endTime, false);
}

@Override
Expand All @@ -498,16 +498,15 @@ public Observable<ResultSet> findData(Metric<Double> metric, long startTime, lon
}

@Override
public Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime,
public Observable<ResultSet> findData(String tenantId, MetricId id, MetricType type, long startTime, long endTime,
boolean includeWriteTime) {
if (includeWriteTime) {
return rxSession.execute(findGaugeDataWithWriteTimeByDateRangeExclusive.bind(tenantId,
MetricType.GAUGE.getCode(), id.getName(), id.getInterval().toString(), DPART,
getTimeUUID(startTime), getTimeUUID(endTime)));
return rxSession.execute(findGaugeDataWithWriteTimeByDateRangeExclusive.bind(tenantId, type.getCode(),
id.getName(),
id.getInterval().toString(), DPART, getTimeUUID(startTime), getTimeUUID(endTime)));
} else {
return rxSession.execute(findGaugeDataByDateRangeExclusive.bind(tenantId, MetricType.GAUGE.getCode(),
id.getName(), id.getInterval().toString(), DPART, getTimeUUID(startTime),
getTimeUUID(endTime)));
return rxSession.execute(findGaugeDataByDateRangeExclusive.bind(tenantId, type.getCode(), id.getName(),
id.getInterval().toString(), DPART, getTimeUUID(startTime), getTimeUUID(endTime)));
}
}

Expand All @@ -516,11 +515,11 @@ public Observable<ResultSet> findData(Metric<Double> metric, long timestamp,
boolean includeWriteTime) {
if (includeWriteTime) {
return rxSession.execute(findGaugeDataWithWriteTimeByDateRangeInclusive.bind(metric.getTenantId(),
MetricType.GAUGE.getCode(), metric.getId().getName(), metric.getId().getInterval().toString(),
metric.getType().getCode(), metric.getId().getName(), metric.getId().getInterval().toString(),
DPART, UUIDs.startOf(timestamp), UUIDs.endOf(timestamp)));
} else {
return rxSession.execute(findGaugeDataByDateRangeInclusive.bind(metric.getTenantId(),
MetricType.GAUGE.getCode(), metric.getId().getName(), metric.getId().getInterval().toString(),
metric.getType().getCode(), metric.getId().getName(), metric.getId().getInterval().toString(),
DPART, UUIDs.startOf(timestamp), UUIDs.endOf(timestamp)));
}
}
Expand Down Expand Up @@ -581,14 +580,14 @@ public Observable<ResultSet> insertGaugeTag(String tag, String tagValue, Metric<
public Observable<ResultSet> insertAvailabilityTag(String tag, String tagValue,
Metric<AvailabilityType> metric, Observable<TTLDataPoint<AvailabilityType>> data) {
return data.reduce(
new BatchStatement(UNLOGGED),
(batch, a) -> {
batch.add(insertAvailabilityTags.bind(metric.getTenantId(), tag, tagValue,
MetricType.AVAILABILITY.getCode(), metric.getId().getName(), metric.getId().getInterval()
.toString(), getTimeUUID(a.getDataPoint().getTimestamp()), getBytes(a.getDataPoint()),
a.getTTL()));
return batch;
}).flatMap(rxSession::execute);
new BatchStatement(UNLOGGED),
(batch, a) -> {
batch.add(insertAvailabilityTags.bind(metric.getTenantId(), tag, tagValue,
MetricType.AVAILABILITY.getCode(), metric.getId().getName(), metric.getId().getInterval()
.toString(), getTimeUUID(a.getDataPoint().getTimestamp()), getBytes(a.getDataPoint()),
a.getTTL()));
return batch;
}).flatMap(rxSession::execute);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
*/
package org.hawkular.metrics.core.impl;

import static java.util.Collections.singletonList;
import static java.util.Comparator.comparingLong;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
import static org.hawkular.metrics.core.api.MetricType.COUNTER_RATE;
import static org.hawkular.metrics.core.api.MetricType.GAUGE;
import static org.hawkular.metrics.core.impl.Functions.getTTLAvailabilityDataPoint;
import static org.hawkular.metrics.core.impl.Functions.getTTLGaugeDataPoint;
import static org.joda.time.DateTime.now;
import static org.joda.time.Hours.hours;

import java.util.ArrayList;
Expand All @@ -36,6 +39,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Predicate;

import com.codahale.metrics.Meter;
Expand Down Expand Up @@ -68,6 +72,9 @@
import org.hawkular.metrics.core.api.Tenant;
import org.hawkular.metrics.core.api.TenantAlreadyExistsException;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.Task;
import org.hawkular.metrics.tasks.api.TaskService;
import org.hawkular.metrics.tasks.api.TaskType;
import org.hawkular.rx.cassandra.driver.RxUtil;
import org.joda.time.Duration;
import org.joda.time.Hours;
Expand Down Expand Up @@ -140,8 +147,18 @@ public int hashCode() {
private final Map<DataRetentionKey, Integer> dataRetentions = new ConcurrentHashMap<>();

private ListeningExecutorService metricsTasks;

private DataAccess dataAccess;

private List<TaskType> taskTypes = singletonList(
new TaskType()
.setName("counter-rate")
.setSegments(10)
.setSegmentOffsets(10)
.setFactory(this::generateRate));

private TaskService taskService;

private MetricRegistry metricRegistry;

/**
Expand Down Expand Up @@ -213,6 +230,10 @@ void loadDataRetentions() {
}
}

void unloadDataRetentions() {
dataRetentions.clear();
}

private void initMetrics() {
gaugeInserts = metricRegistry.meter("gauge-inserts");
availabilityInserts = metricRegistry.meter("availability-inserts");
Expand All @@ -222,10 +243,6 @@ private void initMetrics() {
counterReadLatency = metricRegistry.timer("counter-read-latency");
}

void unloadDataRetentions() {
dataRetentions.clear();
}

private static class MergeDataPointTagsFunction<T extends DataPoint> implements
Func1<List<Map<MetricId, Set<T>>>, Map<MetricId, Set<T>>> {

Expand Down Expand Up @@ -301,6 +318,14 @@ void setDataAccess(DataAccess dataAccess) {
this.dataAccess = dataAccess;
}

List<TaskType> getTaskTypes() {
return taskTypes;
}

public void setTaskService(TaskService taskService) {
this.taskService = taskService;
}

@Override
public Observable<Void> createTenant(final Tenant tenant) {
return dataAccess.insertTenant(tenant).flatMap(
Expand Down Expand Up @@ -408,6 +433,14 @@ public Observable<Void> createMetric(Metric<?> metric) {
updates.add(updateRetentionsIndex(metric));
}

if (metric.getType() == COUNTER) {
TaskType generateRatesType = taskTypes.get(0);
// TODO DO not hard code task interval and window
Task task = generateRatesType.createTask(metric.getId().getName() + "$rate",
metric.getId().getName(), 5, 5);
taskService.scheduleTask(now(), task);
}

Observable.merge(updates).subscribe(new VoidSubscriber<>(subscriber));
}
}));
Expand Down Expand Up @@ -545,16 +578,23 @@ public Observable<DataPoint<Long>> findCounterData(String tenantId, MetricId id,
.map(Functions::getCounterDataPoint));
}

@Override
public Observable<DataPoint<Double>> findRateData(String tenantId, MetricId id, long start, long end) {
return time(gaugeReadLatency, () ->
dataAccess.findData(tenantId, id, COUNTER_RATE, start, end)
.flatMap(Observable::from)
.map(Functions::getGaugeDataPoint));
}

@Override
public Observable<DataPoint<Double>> findGaugeData(String tenantId, MetricId id, Long start, Long end) {
// When we implement date partitioning, dpart will have to be determined based on
// the start and end params. And it is possible the the date range spans multiple
// date partitions.
return time(gaugeReadLatency, () ->
dataAccess.findData(tenantId, id, start, end)
dataAccess.findData(tenantId, id, GAUGE, start, end)
.flatMap(Observable::from)
.map(Functions::getGaugeDataPoint)
);
.map(Functions::getGaugeDataPoint));
}

@Override
Expand All @@ -564,7 +604,7 @@ public Observable<BucketedOutput<GaugeBucketDataPoint>> findGaugeStats(
// When we implement date partitioning, dpart will have to be determined based on
// the start and end params. And it is possible the the date range spans multiple
// date partitions.
return dataAccess.findData(metric.getTenantId(), metric.getId(), start, end)
return dataAccess.findData(metric.getTenantId(), metric.getId(), GAUGE, start, end)
.flatMap(Observable::from)
.map(Functions::getGaugeDataPoint)
.toList()
Expand Down Expand Up @@ -619,8 +659,8 @@ public Observable<Boolean> idExists(final String id) {
// metrics since they could efficiently be inserted in a single batch statement.
public Observable<Void> tagGaugeData(Metric<Double> metric, final Map<String, String> tags, long start,
long end) {
Observable<ResultSet> findDataObservable = dataAccess.findData(metric.getTenantId(), metric.getId(), start, end,
true);
Observable<ResultSet> findDataObservable = dataAccess.findData(metric.getTenantId(), metric.getId(), GAUGE,
start, end, true);
return tagGaugeData(findDataObservable, tags, metric);
}

Expand Down Expand Up @@ -758,4 +798,26 @@ private <T> T time(Timer timer, Callable<T> callable) {
throw new RuntimeException("There was an error during a timed event", e);
}
}

private Consumer<Task> generateRate() {
return task -> {
logger.info("Generating rate for {}", task);
// TODO Store tenant id with task
String tenantId = "rate-tenant";
MetricId id = new MetricId(task.getSources().iterator().next());
long end = task.getTimeSlice().getMillis();
long start = task.getTimeSlice().minusSeconds(task.getWindow()).getMillis();
logger.debug("start = {}, end = {}", start, end);
findCounterData(tenantId, id, start, end)
.take(1)
.map(dataPoint -> ((dataPoint.getValue().doubleValue() / (end - start) * 1000)))
.map(rate -> new Metric<>(tenantId, COUNTER_RATE, id, singletonList(new DataPoint<>(start, rate))))
.flatMap(metric -> addGaugeData(Observable.just(metric)))
.subscribe(
aVoid -> {},
t -> logger.warn("Failed to persist rate data", t),
() -> logger.debug("Successfully persisted rate data")
);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ public void insertAndFindGaugeRawData() throws Exception {

dataAccess.insertData(metric, DEFAULT_TTL).toBlocking().last();

Observable<ResultSet> observable = dataAccess.findData("tenant-1", new MetricId("metric-1"), start.getMillis(),
end.getMillis());
Observable<ResultSet> observable = dataAccess.findData("tenant-1", new MetricId("metric-1"), GAUGE,
start.getMillis(), end.getMillis());
List<DataPoint<Double>> actual = ImmutableList.copyOf(observable
.flatMap(Observable::from)
.map(Functions::getGaugeDataPoint)
Expand Down Expand Up @@ -164,8 +164,8 @@ public void addMetadataToGaugeRawData() throws Exception {

dataAccess.insertData(metric, DEFAULT_TTL).toBlocking().last();

Observable<ResultSet> observable = dataAccess.findData("tenant-1", new MetricId("metric-1"), start.getMillis(),
end.getMillis());
Observable<ResultSet> observable = dataAccess.findData("tenant-1", new MetricId("metric-1"), GAUGE,
start.getMillis(), end.getMillis());
List<DataPoint<Double>> actual = ImmutableList.copyOf(observable
.flatMap(Observable::from)
.map(Functions::getGaugeDataPoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ public Observable<ResultSet> findCounterData(String tenantId, MetricId id, long
}

@Override
public Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime) {
return delegate.findData(tenantId, id, startTime, endTime);
public Observable<ResultSet> findData(String tenantId, MetricId id, MetricType type, long startTime, long endTime) {
return delegate.findData(tenantId, id, type, startTime, endTime);
}

@Override
Expand All @@ -136,9 +136,9 @@ public Observable<ResultSet> findData(Metric<Double> metric, long startTime, lon
}

@Override
public Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime,
public Observable<ResultSet> findData(String tenantId, MetricId id, MetricType type, long startTime, long endTime,
boolean includeWriteTime) {
return delegate.findData(tenantId, id, startTime, endTime, includeWriteTime);
return delegate.findData(tenantId, id, type, startTime, endTime, includeWriteTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected void resetDB() {
}

protected String getKeyspace() {
return System.getProperty("keyspace", "hawkular");
return System.getProperty("keyspace", "hawkulartest");
}

protected DateTime hour0() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

/**
* @author jsanda
Expand Down Expand Up @@ -161,8 +160,6 @@ public void start() {
@Override
public void shutdown() {
logger.info("Shutting down");

tasksExecuted.onCompleted();
leaseService.shutdown();
ticker.shutdownNow();
scheduler.shutdownNow();
Expand Down

0 comments on commit 0eac721

Please sign in to comment.