Skip to content

Commit

Permalink
[HWKMETRICS-168] get RatesITest passing again
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed Aug 6, 2015
1 parent 4b38e1a commit df82328
Show file tree
Hide file tree
Showing 15 changed files with 338 additions and 799 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,23 @@ public enum State {
private Session session;

MetricsServiceLifecycle() {
// Create the shared instance now and initialize it with the C* session when ready
metricsService = new MetricsServiceImpl();
metricsService.setTaskScheduler(new TaskService() {
@Override
public void start() {
}

@Override
public void shutdown() {
}

@Override
public Observable<Task> scheduleTask(DateTime time, Task task) {
LOG.warn("Task scheduling is not yet supported");
return Observable.empty();
}
});
ThreadFactory threadFactory = r -> {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setName(MetricsService.class.getSimpleName().toLowerCase(Locale.ROOT) + "-lifecycle-thread");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,8 @@ public Observable<ResultSet> findData(MetricId id, long startTime, long endTime)
}

@Override
public Observable<ResultSet> findCounterData(MetricId id, long startTime, long endTime) {
return rxSession.execute(findCounterDataExclusive.bind(id.getTenantId(), COUNTER.getCode(), id.getName(),
public Observable<ResultSet> findCounterData(String tenantId, MetricId id, long startTime, long endTime) {
return rxSession.execute(findCounterDataExclusive.bind(tenantId, COUNTER.getCode(), id.getName(),
id.getInterval().toString(), DPART, getTimeUUID(startTime), getTimeUUID(endTime)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import static org.joda.time.Duration.standardMinutes;
import static org.joda.time.Duration.standardSeconds;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.hawkular.metrics.core.api.DataPoint;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricsService;
import org.hawkular.metrics.tasks.api.Task;
import org.joda.time.Duration;
import org.hawkular.metrics.tasks.api.Task2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
Expand All @@ -35,7 +38,7 @@
/**
* @author jsanda
*/
public class GenerateRate implements Action1<Task> {
public class GenerateRate implements Action1<Task2>, Function<Task2, Observable<Task2>> {

private static final Logger logger = LoggerFactory.getLogger(GenerateRate.class);

Expand All @@ -46,32 +49,56 @@ public GenerateRate(MetricsService metricsService) {
}

@Override
public void call(Task task) {
public void call(Task2 task) {
logger.info("Generating rate for {}", task);
MetricId id = new MetricId(task.getTenantId(), COUNTER_RATE, task.getSources().iterator().next());
long start = task.getTimeSlice().getMillis();
long end = task.getTimeSlice().plus(getDuration(task.getWindow())).getMillis();
metricsService.findCounterData(id, start, end)
MetricId id = new MetricId(task.getParameters().get("metricId"));
String tenantId = task.getParameters().get("tenantId");
long start = task.getTrigger().getTriggerTime();
long end = start + TimeUnit.MINUTES.toMillis(1);

CountDownLatch latch = new CountDownLatch(1);

logger.debug("start = {}, end = {}", start, end);
metricsService.findCounterData(tenantId, id, start, end)
.take(1)
.map(dataPoint -> dataPoint.getValue().doubleValue() / (end - start) * 1000)
.map(rate -> new Metric<>(id, singletonList(new DataPoint<>(start, rate))))
.doOnNext(dataPoint -> logger.debug("Data Point = {}", dataPoint))
.map(dataPoint -> ((dataPoint.getValue().doubleValue() / (end - start) * 1000)))
.map(rate -> new Metric<>(tenantId, COUNTER_RATE, id, singletonList(new DataPoint<>(start, rate))))
.flatMap(metric -> metricsService.addGaugeData(Observable.just(metric)))
.subscribe(
aVoid -> {
},
t -> logger.warn("Failed to persist rate data", t),
() -> logger.debug("Successfully persisted rate data for {}", task)
t -> {
logger.warn("Failed to persist rate data", t);
latch.countDown();
},
() -> {
logger.debug("Successfully persisted rate data");
latch.countDown();
}
);
try {
latch.await();
} catch (InterruptedException e) {
}
}

private Duration getDuration(int duration) {
// This is somewhat of a temporary hack until HWKMETRICS-142 is done. The time units
// for tasks are currently scheduled globally with the TaskServiceBuilder class. The
// system property below is the only hook we have right now for specifying and
// checking the time units used.
if ("seconds".equals(System.getProperty("hawkular.scheduler.time-units", "minutes"))) {
return standardSeconds(duration);
}
return standardMinutes(duration);
@Override
public Observable<Task2> apply(Task2 task) {
return Observable.defer(() -> {
MetricId id = new MetricId(task.getParameters().get("metricId"));
String tenantId = task.getParameters().get("tenantId");
long start = task.getTrigger().getTriggerTime();
long end = start + TimeUnit.MINUTES.toMillis(1);

logger.debug("start = {}, end = {}", start, end);
return metricsService.findCounterData(tenantId, id, start, end)
.take(1)
.doOnNext(dataPoint -> logger.debug("Data Point = {}", dataPoint))
.map(dataPoint -> ((dataPoint.getValue().doubleValue() / (end - start) * 1000)))
.map(rate -> new Metric<>(tenantId, COUNTER_RATE, id, singletonList(new DataPoint<>(start, rate))))
.flatMap(metric -> metricsService.addGaugeData(Observable.just(metric)))
.map(avoid -> task);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.hawkular.metrics.core.api.MetricType.GAUGE;
import static org.hawkular.metrics.core.impl.Functions.getTTLAvailabilityDataPoint;
import static org.hawkular.metrics.core.impl.Functions.getTTLGaugeDataPoint;
import static org.joda.time.DateTime.now;
import static org.joda.time.Hours.hours;

import java.util.ArrayList;
Expand All @@ -40,9 +39,23 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;

import java.util.regex.Pattern;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.hawkular.metrics.core.api.AvailabilityBucketDataPoint;
import org.hawkular.metrics.core.api.AvailabilityType;
import org.hawkular.metrics.core.api.BucketedOutput;
Expand All @@ -63,27 +76,15 @@
import org.hawkular.metrics.core.impl.transformers.ItemsToSetTransformer;
import org.hawkular.metrics.core.impl.transformers.TagsIndexRowTransformer;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.RepeatingTrigger;
import org.hawkular.metrics.tasks.api.Task;
import org.hawkular.metrics.tasks.api.TaskService;
import org.hawkular.metrics.tasks.api.TaskScheduler;
import org.hawkular.metrics.tasks.api.Trigger;
import org.hawkular.rx.cassandra.driver.RxUtil;
import org.joda.time.Duration;
import org.joda.time.Hours;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
Expand Down Expand Up @@ -140,7 +141,7 @@ public int hashCode() {

private DataAccess dataAccess;

private TaskService taskService;
private TaskScheduler taskScheduler;

private MetricRegistry metricRegistry;

Expand Down Expand Up @@ -310,8 +311,8 @@ void setDataAccess(DataAccess dataAccess) {
this.dataAccess = dataAccess;
}

public void setTaskService(TaskService taskService) {
this.taskService = taskService;
public void setTaskScheduler(TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
}

@Override
Expand Down Expand Up @@ -427,7 +428,15 @@ public Observable<Void> createMetric(Metric<?> metric) {
if (metric.getType() == COUNTER) {
Task task = TaskTypes.COMPUTE_RATE.createTask(metric.getTenantId(), metric.getId().getName() +
"$rate", metric.getId().getName());
taskService.scheduleTask(now(), task);
Trigger trigger = new RepeatingTrigger.Builder()
.withDelay(1, TimeUnit.MINUTES)
.withInterval(1, TimeUnit.MINUTES)
.build();
Map<String, String> params = ImmutableMap.of(
"metricId", metric.getId().getName(),
"tenantId", metric.getTenantId()
);
taskScheduler.scheduleTask("compute-rate", metric.getTenantId(), 1000, params, trigger);
}

Observable.merge(updates).subscribe(new VoidSubscriber<>(subscriber));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hawkular.metrics.core.impl;

import java.util.Map;

import org.hawkular.metrics.tasks.api.Task2;
import org.hawkular.metrics.tasks.api.TaskScheduler;
import org.hawkular.metrics.tasks.api.Trigger;
import org.hawkular.metrics.tasks.impl.Lease;
import rx.Observable;

/**
* @author jsanda
*/
public class FakeTaskScheduler implements TaskScheduler {

@Override
public Observable<Lease> start() {
return Observable.empty();
}

@Override
public Observable<Task2> scheduleTask(String name, String groupKey, int executionOrder,
Map<String, String> parameters, Trigger trigger) {
return Observable.empty();
}

@Override
public void shutdown() {
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void initClass() {
initSession();

metricsService = new MetricsServiceImpl();
metricsService.setTaskService(new FakeTaskService());
metricsService.setTaskScheduler(new FakeTaskScheduler());
metricsService.startUp(session, getKeyspace(), false, new MetricRegistry());
dataAccess = metricsService.getDataAccess();

Expand Down

0 comments on commit df82328

Please sign in to comment.