Skip to content

Commit

Permalink
[HWKMETRICS-228] calculate rates at query time
Browse files Browse the repository at this point in the history
This commit also removes most of the code around scheduled task execution,
which includes the tasks for generating creates and creating tenants. The
latter is no longer neeed with the work done in HWKMETRICS-190.
  • Loading branch information
John Sanda committed Aug 21, 2015
1 parent 9406b7e commit f894164
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@
import org.hawkular.metrics.api.jaxrs.util.Eager;
import org.hawkular.metrics.api.jaxrs.util.VirtualClock;
import org.hawkular.metrics.core.api.MetricsService;
import org.hawkular.metrics.core.impl.CreateTenants;
import org.hawkular.metrics.core.impl.DataAccess;
import org.hawkular.metrics.core.impl.DataAccessImpl;
import org.hawkular.metrics.core.impl.DateTimeService;
import org.hawkular.metrics.core.impl.GenerateRate;
import org.hawkular.metrics.core.impl.MetricsServiceImpl;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.AbstractTrigger;
Expand Down Expand Up @@ -299,13 +297,13 @@ private void initTaskScheduler() {
}

private void initJobs() {
GenerateRate generateRates = new GenerateRate(metricsService);
CreateTenants createTenants = new CreateTenants(metricsService, dataAcces);

jobs.put(generateRates, taskScheduler.getTasks().filter(task -> task.getName().equals(GenerateRate.TASK_NAME))
.subscribe(generateRates));
jobs.put(createTenants, taskScheduler.getTasks().filter(task -> task.getName().equals(CreateTenants.TASK_NAME))
.subscribe(createTenants));
// GenerateRate generateRates = new GenerateRate(metricsService);
// CreateTenants createTenants = new CreateTenants(metricsService, dataAcces);
//
// jobs.put(generateRates, taskScheduler.getTasks().filter(task -> task.getName().equals(GenerateRate.TASK_NAME))
// .subscribe(generateRates));
// jobs.put(createTenants, taskScheduler.getTasks().filter(task -> task.getName().equals(CreateTenants.TASK_NAME))
// .subscribe(createTenants));
}

private DateTimeService createDateTimeService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@
import org.hawkular.metrics.api.jaxrs.util.Eager;
import org.hawkular.metrics.api.jaxrs.util.VirtualClock;
import org.hawkular.metrics.core.api.MetricsService;
import org.hawkular.metrics.core.impl.CreateTenants;
import org.hawkular.metrics.core.impl.DataAccess;
import org.hawkular.metrics.core.impl.DataAccessImpl;
import org.hawkular.metrics.core.impl.DateTimeService;
import org.hawkular.metrics.core.impl.GenerateRate;
import org.hawkular.metrics.core.impl.MetricsServiceImpl;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.AbstractTrigger;
Expand Down Expand Up @@ -299,13 +297,13 @@ private void initTaskScheduler() {
}

private void initJobs() {
GenerateRate generateRates = new GenerateRate(metricsService);
CreateTenants createTenants = new CreateTenants(metricsService, dataAcces);

jobs.put(generateRates, taskScheduler.getTasks().filter(task -> task.getName().equals(GenerateRate.TASK_NAME))
.subscribe(generateRates));
jobs.put(createTenants, taskScheduler.getTasks().filter(task -> task.getName().equals(CreateTenants.TASK_NAME))
.subscribe(createTenants));
// GenerateRate generateRates = new GenerateRate(metricsService);
// CreateTenants createTenants = new CreateTenants(metricsService, dataAcces);
//
// jobs.put(generateRates, taskScheduler.getTasks().filter(task -> task.getName().equals(GenerateRate.TASK_NAME))
// .subscribe(generateRates));
// jobs.put(createTenants, taskScheduler.getTasks().filter(task -> task.getName().equals(CreateTenants.TASK_NAME))
// .subscribe(createTenants));
}

private DateTimeService createDateTimeService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,27 @@ Observable<Map<MetricId, Set<DataPoint<AvailabilityType>>>> findAvailabilityByTa

Observable<Void> addCounterData(Observable<Metric<Long>> counters);

/**
* Queries for raw counter data points with the specified date range.
*
* @param id The counter id
* @param start The start time inclusive
* @param end The end time exclusive
* @return An observable that emits counter data points in ascending order
*/
Observable<DataPoint<Long>> findCounterData(MetricId id, long start, long end);

/**
* Fetches counter rate data points which are automatically generated for counter metrics. Note that rate data is
* generated only if the metric has been explicitly created via the {@link #createMetric(Metric)} method.
* Fetches counter data points and calculates per-minute rates. The start and end time are rounded down to the
* nearest minutes in order to separate data points into one minute buckets.
*
* @param id This is the id of the counter metric
* @param start The start time which is inclusive
* @param end The end time which is exclusive
*
* @return An Observable of {@link DataPoint data points} which are emitted in descending order. In other words,
* the most recent data is emitted first.
* @return An Observable of {@link DataPoint data points} which are emitted in ascending order. In other words,
* the most recent data is emitted first. If there are no data points for a particular bucket (i.e., minute), then
* null is emitted.
*/
Observable<DataPoint<Double>> findRateData(MetricId id, long start, long end);

Expand Down
6 changes: 6 additions & 0 deletions core/metrics-core-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@
<includes>
<include>**/*ITest*</include>
</includes>
<excludes>
<exclude>**/RatesITest.java</exclude>
<exclude>**/GenerateRateITest.java</exclude>
<exclude>**/CreateTenantsITest.java</exclude>
<exclude>**/CreateTenantsSchedulerITest.java</exclude>
</excludes>
<systemPropertyVariables>
<keyspace>${test.keyspace}</keyspace>
<nodes>${nodes}</nodes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ protected void initPreparedStatements() {
findCounterDataExclusive = session.prepare(
"SELECT time, m_tags, data_retention, l_value, tags FROM data " +
"WHERE tenant_id = ? AND type = ? AND metric = ? AND interval = ? AND dpart = ? AND time >= ? " +
"AND time < ?");
"AND time < ? " +
"ORDER BY time ASC");

findGaugeDataWithWriteTimeByDateRangeExclusive = session.prepare(
"SELECT time, m_tags, data_retention, n_value, tags, WRITETIME(n_value) FROM data " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.regex.Pattern;

Expand All @@ -66,6 +67,7 @@
import org.hawkular.metrics.tasks.api.TaskScheduler;
import org.hawkular.metrics.tasks.api.Trigger;
import org.hawkular.rx.cassandra.driver.RxUtil;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -355,20 +357,21 @@ public Observable<Void> createTenant(final Tenant tenant) {
throw new TenantAlreadyExistsException(tenant.getId());
}

Trigger trigger = new RepeatingTrigger.Builder()
.withDelay(1, MINUTES)
.withInterval(1, MINUTES)
.build();
Map<String, String> params = ImmutableMap.of("tenant", tenant.getId());
Observable<Void> ratesScheduled = taskScheduler.scheduleTask("generate-rates", tenant.getId(),
100, params, trigger).map(task -> null);
// Trigger trigger = new RepeatingTrigger.Builder()
// .withDelay(1, MINUTES)
// .withInterval(1, MINUTES)
// .build();
// Map<String, String> params = ImmutableMap.of("tenant", tenant.getId());
// Observable<Void> ratesScheduled = taskScheduler.scheduleTask("generate-rates", tenant.getId(),
// 100, params, trigger).map(task -> null);

Observable<Void> retentionUpdates = Observable.from(tenant.getRetentionSettings().entrySet())
.flatMap(entry -> dataAccess.updateRetentionsIndex(tenant.getId(), entry.getKey(),
ImmutableMap.of(makeSafe(entry.getKey().getText()), entry.getValue())))
.map(rs -> null);

return ratesScheduled.concatWith(retentionUpdates);
// return ratesScheduled.concatWith(retentionUpdates);
return retentionUpdates;
});
updates.subscribe(resultSet -> {
}, subscriber::onError, subscriber::onCompleted);
Expand All @@ -377,17 +380,19 @@ public Observable<Void> createTenant(final Tenant tenant) {

@Override
public Observable<Void> createTenants(long creationTime, Observable<String> tenantIds) {
return tenantIds.flatMap(tenantId -> dataAccess.insertTenant(tenantId).flatMap(resultSet -> {
Trigger trigger = new RepeatingTrigger.Builder()
.withDelay(1, MINUTES)
.withInterval(1, MINUTES)
.build();
Map<String, String> params = ImmutableMap.of(
"tenant", tenantId,
"creationTime", Long.toString(creationTime));

return taskScheduler.scheduleTask("generate-rates", tenantId, 100, params, trigger).map(task -> null);
}));
// return tenantIds.flatMap(tenantId -> dataAccess.insertTenant(tenantId).flatMap(resultSet -> {
// Trigger trigger = new RepeatingTrigger.Builder()
// .withDelay(1, MINUTES)
// .withInterval(1, MINUTES)
// .build();
// Map<String, String> params = ImmutableMap.of(
// "tenant", tenantId,
// "creationTime", Long.toString(creationTime));
//
// return taskScheduler.scheduleTask("generate-rates", tenantId, 100, params, trigger).map(task -> null);
// }));

return tenantIds.flatMap(tenantId -> dataAccess.insertTenant(tenantId).map(resultSet -> null));
}

@Override
Expand Down Expand Up @@ -646,11 +651,84 @@ public Observable<DataPoint<Long>> findCounterData(MetricId id, long start, long

@Override
public Observable<DataPoint<Double>> findRateData(MetricId id, long start, long end) {
MetricId rateId = new MetricId(id.getTenantId(), COUNTER_RATE, id.getName(), id.getInterval());
return time(gaugeReadLatency, () ->
dataAccess.findData(rateId, start, end)
.flatMap(Observable::from)
.map(Functions::getGaugeDataPoint));
DateTime startTime = dateTimeService.getTimeSlice(new DateTime(start), standardMinutes(1));
DateTime endTime = dateTimeService.getTimeSlice(new DateTime(end), standardMinutes(1));

Observable<DataPoint<Long>> rawDataPoints = findCounterData(id, startTime.getMillis(), endTime.getMillis());

return getCounterBuckets(rawDataPoints, startTime.getMillis(), endTime.getMillis())
.map(bucket -> bucket.isEmpty() ? null : new DataPoint<>(bucket.startTime, bucket.getDelta()));
}

private Observable<CounterBucket> getCounterBuckets(Observable<DataPoint<Long>> dataPoints, long startTime,
long endTime) {
return Observable.create(subscriber -> {
final AtomicReference<CounterBucket> bucketRef = new AtomicReference<>(new CounterBucket(startTime));
dataPoints.subscribe(
dataPoint -> {
while (!bucketRef.get().contains(dataPoint.getTimestamp())) {
subscriber.onNext(bucketRef.get());
bucketRef.set(new CounterBucket(bucketRef.get().endTime));
}
bucketRef.get().add(dataPoint);
},
subscriber::onError,
() -> {
subscriber.onNext(bucketRef.get());
CounterBucket bucket = new CounterBucket(bucketRef.get().endTime);
while (bucket.endTime <= endTime) {
subscriber.onNext(bucket);
bucket = new CounterBucket(bucket.endTime);
}
subscriber.onCompleted();
}
);
});
}

private class CounterBucket {
DataPoint<Long> first;
DataPoint<Long> last;
long startTime;
long endTime;

public CounterBucket(long startTime) {
this.startTime = startTime;
this.endTime = startTime + 60000;
}

public void add(DataPoint<Long> dataPoint) {
if (first == null && dataPoint.getTimestamp() >= startTime) {
first = dataPoint;
last = dataPoint;
} else if (dataPoint.getTimestamp() >= startTime && dataPoint.getTimestamp() < endTime) {
last = dataPoint;
}
}

public boolean contains(long time) {
return time >= startTime && time < endTime;
}

public boolean isEmpty() {
return first == null && last == null;
}

public Double getDelta() {
if (first == last) {
return ((double) first.getValue() / (endTime - startTime)) * 60000;
}
return (((double) last.getValue() - (double) first.getValue()) / (endTime - startTime)) * 60000;
}

@Override public String toString() {
return "CounterBucket{" +
"first=" + first +
", last=" + last +
", startTime=" + startTime +
", endTime=" + endTime +
'}';
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ protected <T> List<T> getOnNextEvents(Supplier<Observable<T>> fn) {
}

protected double calculateRate(double value, DateTime startTime, DateTime endTime) {
return (value / (endTime.getMillis() - startTime.getMillis())) * 1000;
return (value / (endTime.getMillis() - startTime.getMillis())) * 60000;
}

protected void assertIsEmpty(String msg, Observable<ResultSet> observable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ public void addAndFetchCounterData() throws Exception {
DateTime end = start.plusMinutes(20);
String tenantId = "counters-tenant";

metricsService.createTenant(new Tenant(tenantId)).toBlocking().lastOrDefault(null);
doAction(() -> metricsService.createTenant(new Tenant(tenantId)));

Metric<Long> counter = new Metric<>(new MetricId(tenantId, COUNTER, "c1"), asList(
new DataPoint<>(start.getMillis(), 10L),
Expand All @@ -485,15 +485,15 @@ public void addAndFetchCounterData() throws Exception {
new DataPoint<>(end.getMillis(), 45L)
));

metricsService.addCounterData(Observable.just(counter)).toBlocking().lastOrDefault(null);
doAction(() -> metricsService.addCounterData(Observable.just(counter)));

Observable<DataPoint<Long>> data = metricsService.findCounterData(new MetricId(tenantId, COUNTER, "c1"),
start.getMillis(), end.getMillis());
List<DataPoint<Long>> actual = toList(data);
List<DataPoint<Long>> expected = asList(
new DataPoint<>(start.plusMinutes(4).getMillis(), 25L),
new DataPoint<>(start.getMillis(), 10L),
new DataPoint<>(start.plusMinutes(2).getMillis(), 15L),
new DataPoint<>(start.getMillis(), 10L)
new DataPoint<>(start.plusMinutes(4).getMillis(), 25L)
);

assertEquals(actual, expected, "The counter data does not match the expected values");
Expand Down Expand Up @@ -528,6 +528,61 @@ public void addAndFetchGaugeDataAggregates() throws Exception {
assertMetricIndexMatches("t1", GAUGE, singletonList(m1));
}

@Test
public void findRates() {
DateTime start = dateTimeService.currentHour().minusHours(1);
String tenantId = "counter-rate-test";

Metric<Long> counter = new Metric<>(new MetricId(tenantId, COUNTER, "C1"), asList(
new DataPoint<>(start.plusMinutes(1).getMillis(), 10L),
new DataPoint<>(start.plusMinutes(1).plusSeconds(45).getMillis(), 17L),
new DataPoint<>(start.plusMinutes(2).plusSeconds(10).getMillis(), 29L),
new DataPoint<>(start.plusMinutes(2).plusSeconds(39).getMillis(), 41L),
new DataPoint<>(start.plusMinutes(2).plusSeconds(57).getMillis(), 49L),
new DataPoint<>(start.plusMinutes(3).plusSeconds(8).getMillis(), 56L),
new DataPoint<>(start.plusMinutes(3).plusSeconds(36).getMillis(), 67L),
new DataPoint<>(start.plusMinutes(4).plusSeconds(29).getMillis(), 84L)
));

doAction(() -> metricsService.addCounterData(Observable.just(counter)));

List<DataPoint<Double>> actual = getOnNextEvents(() -> metricsService.findRateData(counter.getId(),
start.getMillis(), start.plusMinutes(6).getMillis()));
List<DataPoint<Double>> expected = asList(
null,
new DataPoint<>(start.plusMinutes(1).getMillis(), calculateRate(17 - 10, start.plusMinutes(1),
start.plusMinutes(2))),
new DataPoint<>(start.plusMinutes(2).getMillis(), calculateRate(49 - 29, start.plusMinutes(2),
start.plusMinutes(3))),
new DataPoint<>(start.plusMinutes(3).getMillis(), calculateRate(67 - 56, start.plusMinutes(3),
start.plusMinutes(4))),
new DataPoint<>(start.plusMinutes(4).getMillis(), calculateRate(84, start.plusMinutes(4),
start.plusMinutes(5))),
null
);

assertEquals(actual, expected, "The rates do not match");
}

@Test
public void findRatesWhenNoDataIsFound() {
DateTime start = dateTimeService.currentHour().minusHours(1);
String tenantId = "counter-rate-test-no-data";

Metric<Long> counter = new Metric<>(new MetricId(tenantId, COUNTER, "C1"), asList(
new DataPoint<>(start.plusMinutes(1).getMillis(), 10L),
new DataPoint<>(start.plusMinutes(1).plusSeconds(45).getMillis(), 17L),
new DataPoint<>(start.plusMinutes(2).plusSeconds(10).getMillis(), 29L)
));

doAction(() -> metricsService.addCounterData(Observable.just(counter)));

List<DataPoint<Double>> actual = getOnNextEvents(() -> metricsService.findRateData(counter.getId(),
start.plusMinutes(3).getMillis(), start.plusMinutes(5).getMillis()));

assertEquals(actual, asList(null, null), "The rates do not match");
}

@Test
public void verifyTTLsSetOnAvailabilityData() throws Exception {
DateTime start = now().minusMinutes(10);
Expand Down

0 comments on commit f894164

Please sign in to comment.