Skip to content

Commit

Permalink
[HWKMETRICS-200] initial support for calculating rates per tenant
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed Aug 12, 2015
1 parent 94d4dd8 commit 15dcbd0
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@
*/
package org.hawkular.metrics.core.impl;

import static java.util.Collections.singletonList;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
import static org.hawkular.metrics.core.api.MetricType.COUNTER_RATE;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.hawkular.metrics.core.api.DataPoint;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricId;
Expand All @@ -33,6 +26,13 @@
import rx.Observable;
import rx.functions.Action1;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.singletonList;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
import static org.hawkular.metrics.core.api.MetricType.COUNTER_RATE;

/**
* @author jsanda
*/
Expand All @@ -49,32 +49,34 @@ public GenerateRate(MetricsService metricsService) {
@Override
public void call(Task2 task) {
logger.info("Generating rate for {}", task);
MetricId id = new MetricId(task.getParameters().get("tenantId"), COUNTER, task.getParameters().get("metricId"));
String tenant = task.getParameters().get("tenant");
long start = task.getTrigger().getTriggerTime();
long end = start + TimeUnit.MINUTES.toMillis(1);

Observable<Metric<Double>> rates = metricsService.findMetrics(tenant, COUNTER)
.flatMap(counter -> metricsService.findCounterData(counter.getId(), start, end)
.take(1)
.map(dataPoint -> ((dataPoint.getValue().doubleValue() / (end - start) * 1000)))
.map(rate -> new Metric<>(new MetricId(tenant, COUNTER_RATE, counter.getId().getName()),
singletonList(new DataPoint<>(start, rate)))));
Observable<Void> updates = metricsService.addGaugeData(rates);

CountDownLatch latch = new CountDownLatch(1);

logger.debug("start = {}, end = {}", start, end);
metricsService.findCounterData(id, start, end)
.take(1)
.doOnNext(dataPoint -> logger.debug("Data Point = {}", dataPoint))
.map(dataPoint -> ((dataPoint.getValue().doubleValue() / (end - start) * 1000)))
.map(rate -> new Metric<>(new MetricId(id.getTenantId(), COUNTER_RATE, id.getName()),
singletonList(new DataPoint<>(start, rate))))
.flatMap(metric -> metricsService.addGaugeData(Observable.just(metric)))
.subscribe(
aVoid -> {
},
t -> {
logger.warn("Failed to persist rate data", t);
latch.countDown();
},
() -> {
logger.debug("Successfully persisted rate data");
latch.countDown();
}
);
updates.subscribe(
aVoid -> {},
t -> {
logger.warn("There was an error persisting rates for {tenant= " + tenant + ", start= " +
start + ", end= " + end + "}", t);
latch.countDown();
},
() -> {
logger.debug("Successfully persisted rate data for {tenant= " + tenant + ", start= " +
start + ", end= " + end + "}");
latch.countDown();
}
);

try {
latch.await();
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static java.util.Comparator.comparingLong;

import static org.hawkular.metrics.core.api.MetricType.AVAILABILITY;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
import static org.hawkular.metrics.core.api.MetricType.COUNTER_RATE;
import static org.hawkular.metrics.core.api.MetricType.GAUGE;
import static org.hawkular.metrics.core.impl.Functions.getTTLAvailabilityDataPoint;
Expand Down Expand Up @@ -323,6 +322,14 @@ public Observable<Void> createTenant(final Tenant tenant) {
if (!resultSet.wasApplied()) {
throw new TenantAlreadyExistsException(tenant.getId());
}

Trigger trigger = new RepeatingTrigger.Builder()
.withDelay(1, TimeUnit.MINUTES)
.withInterval(1, TimeUnit.MINUTES)
.build();
Map<String, String> params = ImmutableMap.of("tenant", tenant.getId());
taskScheduler.scheduleTask("generate-rates", tenant.getId(), 100, params, trigger);

return Observable.from(tenant.getRetentionSettings().entrySet())
.flatMap(entry -> dataAccess.updateRetentionsIndex(tenant.getId(), entry.getKey(),
ImmutableMap.of(makeSafe(entry.getKey().getText()), entry.getValue())));
Expand Down Expand Up @@ -382,18 +389,6 @@ public Observable<Void> createMetric(Metric<?> metric) {
updates.add(updateRetentionsIndex(metric));
}

if (metric.getType() == COUNTER) {
Trigger trigger = new RepeatingTrigger.Builder()
.withDelay(1, TimeUnit.MINUTES)
.withInterval(1, TimeUnit.MINUTES)
.build();
Map<String, String> params = ImmutableMap.of(
"metricId", metric.getId().getName(),
"tenantId", metric.getTenantId()
);
taskScheduler.scheduleTask("compute-rate", metric.getTenantId(), 1000, params, trigger);
}

Observable.merge(updates).subscribe(new VoidSubscriber<>(subscriber));
}
}));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.impl;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.UUID.randomUUID;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
import static org.joda.time.Duration.standardMinutes;
import static org.testng.Assert.assertEquals;

import java.util.List;

import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableMap;
import org.hawkular.metrics.core.api.DataPoint;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.tasks.api.SingleExecutionTrigger;
import org.hawkular.metrics.tasks.api.Trigger;
import org.hawkular.metrics.tasks.impl.Task2Impl;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import rx.Observable;

/**
* This class tests counter rates by directly calling {@link GenerateRate}. There is no
* task scheduler running for these tests.
*/
public class GenerateRateITest extends MetricsITest {

private static Logger logger = LoggerFactory.getLogger(GenerateRateITest.class);

private MetricsServiceImpl metricsService;

private DateTimeService dateTimeService;

@BeforeClass
public void initClass() {
initSession();

dateTimeService = new DateTimeService();

metricsService = new MetricsServiceImpl();
metricsService.setTaskScheduler(new FakeTaskScheduler());

metricsService.startUp(session, getKeyspace(), false, new MetricRegistry());
}

@BeforeMethod
public void initMethod() {
session.execute("TRUNCATE metrics_idx");
session.execute("TRUNCATE data");
}

@Test
public void generateRates() {
DateTime start = dateTimeService.getTimeSlice(DateTime.now(), standardMinutes(1)).minusMinutes(5);
String tenant = "rates-test";

Metric<Long> c1 = new Metric<>(new MetricId(tenant, COUNTER, "C1"));
Metric<Long> c2 = new Metric<>(new MetricId(tenant, COUNTER, "C2"));
Metric<Long> c3 = new Metric<>(new MetricId(tenant, COUNTER, "C3"));

doAction(() -> metricsService.createMetric(c1));
doAction(() -> metricsService.createMetric(c2));
doAction(() -> metricsService.createMetric(c3));

doAction(() -> metricsService.addCounterData(Observable.from(asList(
new Metric<>(c1.getId(), asList(new DataPoint<>(start.getMillis(), 10L),
new DataPoint<>(start.plusSeconds(30).getMillis(), 25L))),
new Metric<>(c2.getId(), asList(new DataPoint<>(start.getMillis(), 100L),
new DataPoint<>(start.plusSeconds(30).getMillis(), 165L))),
new Metric<>(c3.getId(), asList(new DataPoint<>(start.getMillis(), 42L),
new DataPoint<>(start.plusSeconds(30).getMillis(), 77L)))
))));

GenerateRate generateRate = new GenerateRate(metricsService);

Trigger trigger = new SingleExecutionTrigger(start.getMillis());
Task2Impl task = new Task2Impl(randomUUID(), tenant, 0, "generate-rates", ImmutableMap.of("tenant", tenant),
trigger);

generateRate.call(task);

List<DataPoint<Double>> c1Rate = getOnNextEvents(() -> metricsService.findRateData(c1.getId(),
start.getMillis(), start.plusMinutes(1).getMillis()));
List<DataPoint<Double>> c2Rate = getOnNextEvents(() -> metricsService.findRateData(c2.getId(),
start.getMillis(), start.plusMinutes(1).getMillis()));
List<DataPoint<Double>> c3Rate = getOnNextEvents(() -> metricsService.findRateData(c3.getId(),
start.getMillis(), start.plusMinutes(1).getMillis()));

assertEquals(c1Rate, singletonList(new DataPoint<>(start.getMillis(), calculateRate(25, start,
start.plusMinutes(1)))));
assertEquals(c2Rate, singletonList(new DataPoint<>(start.getMillis(), calculateRate(165, start,
start.plusMinutes(1)))));
assertEquals(c3Rate, singletonList(new DataPoint<>(start.getMillis(), calculateRate(77, start,
start.plusMinutes(1)))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
*/
package org.hawkular.metrics.core.impl;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.joda.time.DateTime.now;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
Expand All @@ -32,6 +35,8 @@
import org.hawkular.rx.cassandra.driver.RxSession;
import org.hawkular.rx.cassandra.driver.RxSessionImpl;
import org.joda.time.DateTime;
import rx.Observable;
import rx.observers.TestSubscriber;

/**
* @author John Sanda
Expand Down Expand Up @@ -86,4 +91,44 @@ protected <V> V getUninterruptibly(ListenableFuture<V> future) throws ExecutionE
return Uninterruptibles.getUninterruptibly(future, FUTURE_TIMEOUT, TimeUnit.SECONDS);
}

/**
* This method take a function that produces an Observable that has side effects, like
* inserting rows into the database. A {@link TestSubscriber} is subscribed to the
* Observable. The subscriber blocks up to five seconds waiting for a terminal event
* from the Observable.
*
* @param fn A function that produces an Observable with side effects
*/
protected void doAction(Supplier<Observable<Void>> fn) {
TestSubscriber<Void> subscriber = new TestSubscriber<>();
Observable<Void> observable = fn.get();
observable.subscribe(subscriber);
subscriber.awaitTerminalEvent(5, TimeUnit.SECONDS);
subscriber.assertNoErrors();
subscriber.assertCompleted();
}

/**
* This method takes a function that produces an Observable. The method blocks up to
* five seconds until the Observable emits a terminal event. The items that the
* Observable emits are then returned.
*
* @param fn A function that produces an Observable
* @param <T> The type of items emitted by the Observable
* @return A list of the items emitted by the Observable
*/
protected <T> List<T> getOnNextEvents(Supplier<Observable<T>> fn) {
TestSubscriber<T> subscriber = new TestSubscriber<>();
Observable<T> observable = fn.get();
observable.subscribe(subscriber);
subscriber.awaitTerminalEvent(5, SECONDS);
subscriber.assertNoErrors();
subscriber.assertCompleted();

return subscriber.getOnNextEvents();
}

protected double calculateRate(double value, DateTime startTime, DateTime endTime) {
return (value / (endTime.getMillis() - startTime.getMillis())) * 1000;
}
}

0 comments on commit 15dcbd0

Please sign in to comment.