Skip to content

Commit

Permalink
[HWKMETRICS-136] add instrumentation for reading/writing raw data
Browse files Browse the repository at this point in the history
This commit includes instrumentation for both gauges and availability.
  • Loading branch information
John Sanda committed Jun 10, 2015
1 parent 982c3aa commit feb0669
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import javax.enterprise.inject.Produces;
import javax.inject.Inject;

import com.codahale.metrics.MetricRegistry;
import org.hawkular.metrics.api.jaxrs.config.Configurable;
import org.hawkular.metrics.api.jaxrs.config.ConfigurationProperty;
import org.hawkular.metrics.api.jaxrs.util.Eager;
Expand Down Expand Up @@ -163,7 +164,11 @@ private void startMetricsService() {
return;
}
try {
metricsService.startUp(session, keyspace, Boolean.parseBoolean(resetDb));
// TODO Set up a managed metric registry
// We want a managed registry that can be shared by the JAX-RS endpoint and the core. Then we can expose
// the registered metrics in various ways such as new REST endpoints, JMX, or via different
// com.codahale.metrics.Reporter instances.
metricsService.startUp(session, keyspace, Boolean.parseBoolean(resetDb), new MetricRegistry());
LOG.info("Metrics service started");
state = State.STARTED;
} catch (Throwable t) {
Expand Down
11 changes: 11 additions & 0 deletions core/metrics-core-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${datastax.driver.version}</version>
<exclusions>
<exclusion>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand All @@ -83,6 +89,11 @@
<version>${joda.time.version}</version>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>

<dependency>
<groupId>net.sf.trove4j</groupId>
<artifactId>trove4j</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ Observable<ResultSet> updateTagsInMetricsIndex(Metric metric, Map<String, String

<T> ResultSetFuture updateMetricsIndex(List<Metric<T>> metrics);

<T> Observable<ResultSet> updateMetricsIndexRx(Observable<Metric<T>> metrics);
<T> Observable<Integer> updateMetricsIndexRx(Observable<Metric<T>> metrics);

Observable<ResultSet> findMetricsInMetricsIndex(String tenantId, MetricType type);

Observable<ResultSet> insertData(Observable<GaugeAndTTL> gaugeObservable);
Observable<Integer> insertData(Observable<GaugeAndTTL> gaugeObservable);

Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime);

Expand Down Expand Up @@ -101,7 +101,7 @@ Observable<ResultSet> insertAvailabilityTag(String tag, String tagValue,

Observable<ResultSet> findAvailabilityByTag(String tenantId, String tag, String tagValue);

Observable<ResultSet> insertAvailabilityData(Metric<AvailabilityType> metric, int ttl);
Observable<Integer> insertAvailabilityData(Metric<AvailabilityType> metric, int ttl);

Observable<ResultSet> findAvailabilityData(String tenantId, MetricId id, long startTime, long endTime);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,12 +438,12 @@ public <T> ResultSetFuture updateMetricsIndex(List<Metric<T>> metrics) {
}

@Override
public <T> Observable<ResultSet> updateMetricsIndexRx(Observable<Metric<T>> metrics) {
public <T> Observable<Integer> updateMetricsIndexRx(Observable<Metric<T>> metrics) {
return metrics.reduce(new BatchStatement(BatchStatement.Type.UNLOGGED), (batch, metric) -> {
batch.add(updateMetricsIndex.bind(metric.getTenantId(), metric.getType().getCode(),
metric.getId().getInterval().toString(), metric.getId().getName()));
return batch;
}).flatMap(rxSession::execute);
}).flatMap(batch -> rxSession.execute(batch).map(resultSet -> batch.size()));
}

@Override
Expand All @@ -452,11 +452,11 @@ public Observable<ResultSet> findMetricsInMetricsIndex(String tenantId, MetricTy
}

@Override
public Observable<ResultSet> insertData(Observable<GaugeAndTTL> observable) {
public Observable<Integer> insertData(Observable<GaugeAndTTL> observable) {
return observable.flatMap(pair -> Observable.from(pair.gauge.getDataPoints())
.map(dataPoint -> bindDataPoint(pair.gauge, dataPoint, pair.ttl))
.reduce(new BatchStatement(BatchStatement.Type.UNLOGGED), BatchStatement::add)
.flatMap(rxSession::execute)
.flatMap(batch -> rxSession.execute(batch).map(resultSet -> batch.size()))
);
}

Expand Down Expand Up @@ -597,17 +597,15 @@ public Observable<ResultSet> findAvailabilityByTag(String tenantId, String tag,
}

@Override
public Observable<ResultSet> insertAvailabilityData(Metric<AvailabilityType> metric, int ttl) {
return Observable
.from(metric.getDataPoints())
.reduce(
new BatchStatement(BatchStatement.Type.UNLOGGED),
(batchStatement, a) -> {
public Observable<Integer> insertAvailabilityData(Metric<AvailabilityType> metric, int ttl) {
return Observable.from(metric.getDataPoints())
.reduce(new BatchStatement(BatchStatement.Type.UNLOGGED), (batchStatement, a) -> {
batchStatement.add(insertAvailability.bind(ttl, metric.getTags(), getBytes(a),
metric.getTenantId(), metric.getType().getCode(), metric.getId().getName(), metric.getId()
.getInterval().toString(), DPART, getTimeUUID(a.getTimestamp())));
metric.getTenantId(), metric.getType().getCode(), metric.getId().getName(),
metric.getId().getInterval().toString(), DPART, getTimeUUID(a.getTimestamp())));
return batchStatement;
}).flatMap(rxSession::execute);
})
.flatMap(batch -> rxSession.execute(batch).map(resultSet -> batch.size()));
}

private ByteBuffer getBytes(DataPoint<AvailabilityType> dataPoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.hawkular.metrics.core.impl;

import static java.util.Comparator.comparingLong;

import static org.hawkular.metrics.core.api.MetricType.GAUGE;
import static org.hawkular.metrics.core.impl.Functions.getTTLAvailabilityDataPoint;
import static org.hawkular.metrics.core.impl.Functions.getTTLGaugeDataPoint;
Expand All @@ -33,11 +32,24 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.function.Predicate;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.hawkular.metrics.core.api.AvailabilityBucketDataPoint;
import org.hawkular.metrics.core.api.AvailabilityType;
import org.hawkular.metrics.core.api.BucketedOutput;
Expand All @@ -62,17 +74,6 @@
import org.joda.time.Hours;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
Expand All @@ -81,6 +82,7 @@
* @author John Sanda
*/
public class MetricsServiceImpl implements MetricsService {

private static final Logger logger = LoggerFactory.getLogger(MetricsServiceImpl.class);

/**
Expand Down Expand Up @@ -141,7 +143,14 @@ public int hashCode() {
private ListeningExecutorService metricsTasks;
private DataAccess dataAccess;

public void startUp(Session session, String keyspace, boolean resetDb) {
private MetricRegistry metricRegistry;

private Meter gaugeInserts;
private Meter availabilityInserts;
private Timer gaugeReadLatency;
private Timer availabilityReadLatency;

public void startUp(Session session, String keyspace, boolean resetDb, MetricRegistry metricRegistry) {
SchemaManager schemaManager = new SchemaManager(session);
if (resetDb) {
schemaManager.dropKeyspace(keyspace);
Expand All @@ -153,6 +162,9 @@ public void startUp(Session session, String keyspace, boolean resetDb) {
metricsTasks = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4, new MetricsThreadFactory()));
dataAccess = new DataAccessImpl(session);
loadDataRetentions();

this.metricRegistry = metricRegistry;
initMetrics();
}

void loadDataRetentions() {
Expand All @@ -177,6 +189,13 @@ void loadDataRetentions() {
}
}

private void initMetrics() {
gaugeInserts = metricRegistry.meter("gauge-inserts");
availabilityInserts = metricRegistry.meter("availability-inserts");
gaugeReadLatency = metricRegistry.timer("gauge-read-latency");
availabilityReadLatency = metricRegistry.timer("availability-write-latency");
}

void unloadDataRetentions() {
dataRetentions.clear();
}
Expand Down Expand Up @@ -429,11 +448,14 @@ public Observable<Void> addGaugeData(Observable<Metric<Double>> gaugeObservable)
// explicitly created, just not necessarily right away.

PublishSubject<Void> results = PublishSubject.create();
Observable<ResultSet> dataInserted = dataAccess.insertData(
Observable<Integer> updates = dataAccess.insertData(
gaugeObservable.map(gauge -> new GaugeAndTTL(gauge, getTTL(gauge))));
Observable<ResultSet> indexUpdated = dataAccess.updateMetricsIndexRx(gaugeObservable);
dataInserted.concatWith(indexUpdated).subscribe(
resultSet -> {},
// I am intentionally return zero for the number index updates because I want to measure and compare the
// throughput inserting data with and without the index updates. This will give us a better idea of how much
// over there is with the index updates.
Observable<Integer> indexUpdates = dataAccess.updateMetricsIndexRx(gaugeObservable).map(count -> 0);
updates.concatWith(indexUpdates).subscribe(
gaugeInserts::mark,
results::onError,
() -> {
results.onNext(null);
Expand All @@ -444,12 +466,23 @@ public Observable<Void> addGaugeData(Observable<Metric<Double>> gaugeObservable)

@Override
public Observable<Void> addAvailabilityData(List<Metric<AvailabilityType>> metrics) {
return Observable.from(metrics)
PublishSubject<Void> results = PublishSubject.create();
Observable<Metric<AvailabilityType>> availabilities = Observable.from(metrics);
Observable<Integer> updates = availabilities
.filter(a -> !a.getDataPoints().isEmpty())
.flatMap(a -> dataAccess.insertAvailabilityData(a, getTTL(a)))
.doOnCompleted(() -> dataAccess.updateMetricsIndex(metrics))
.doOnError(t -> logger.warn("Failed to add availability data", t))
.map(r -> null);
.flatMap(a -> dataAccess.insertAvailabilityData(a, getTTL(a)));
// I am intentionally return zero for the number index updates because I want to measure and compare the
// throughput inserting data with and without the index updates. This will give us a better idea of how much
// over there is with the index updates.
Observable<Integer> indexUpdates = dataAccess.updateMetricsIndexRx(availabilities).map(count -> 0);
updates.concatWith(indexUpdates).subscribe(
availabilityInserts::mark,
results::onError,
() -> {
results.onNext(null);
results.onCompleted();
});
return results;
}

@Override
Expand Down Expand Up @@ -484,9 +517,11 @@ public Observable<DataPoint<Double>> findGaugeData(String tenantId, MetricId id,
// When we implement date partitioning, dpart will have to be determined based on
// the start and end params. And it is possible the the date range spans multiple
// date partitions.
return dataAccess.findData(tenantId, id, start, end)
.flatMap(Observable::from)
.map(Functions::getGaugeDataPoint);
return time(gaugeReadLatency, () ->
dataAccess.findData(tenantId, id, start, end)
.flatMap(Observable::from)
.map(Functions::getGaugeDataPoint)
);
}

@Override
Expand All @@ -512,15 +547,17 @@ public Observable<DataPoint<AvailabilityType>> findAvailabilityData(String tenan
@Override
public Observable<DataPoint<AvailabilityType>> findAvailabilityData(String tenantId, MetricId id, long start,
long end, boolean distinct) {
Observable<DataPoint<AvailabilityType>> availabilityData = dataAccess.findAvailabilityData(tenantId, id, start,
end)
.flatMap(Observable::from)
.map(Functions::getAvailabilityDataPoint);
if (distinct) {
return availabilityData.distinctUntilChanged(DataPoint::getValue);
} else {
return availabilityData;
}
return time(availabilityReadLatency, () -> {
Observable<DataPoint<AvailabilityType>> availabilityData = dataAccess.findAvailabilityData(tenantId, id,
start, end)
.flatMap(Observable::from)
.map(Functions::getAvailabilityDataPoint);
if (distinct) {
return availabilityData.distinctUntilChanged(DataPoint::getValue);
} else {
return availabilityData;
}
});
}

@Override
Expand Down Expand Up @@ -678,4 +715,14 @@ public void shutdown() {
metricsTasks.shutdown();
unloadDataRetentions();
}

private <T> T time(Timer timer, Callable<T> callable) {
try {
// TODO Should this method always return an observable?
// If so, than we should return Observable.error(e) in the catch block
return timer.time(callable);
} catch (Exception e) {
throw new RuntimeException("There was an error during a timed event", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public <T> ResultSetFuture updateMetricsIndex(List<Metric<T>> metrics) {
}

@Override
public <T> Observable<ResultSet> updateMetricsIndexRx(Observable<Metric<T>> metrics) {
public <T> Observable<Integer> updateMetricsIndexRx(Observable<Metric<T>> metrics) {
return delegate.updateMetricsIndexRx(metrics);
}

Expand All @@ -114,7 +114,7 @@ public Observable<ResultSet> findMetricsInMetricsIndex(String tenantId, MetricTy
}

@Override
public Observable<ResultSet> insertData(Observable<GaugeAndTTL> gaugeObservable) {
public Observable<Integer> insertData(Observable<GaugeAndTTL> gaugeObservable) {
return delegate.insertData(gaugeObservable);
}

Expand Down Expand Up @@ -196,7 +196,7 @@ public Observable<ResultSet> findAvailabilityByTag(String tenantId, String tag,
}

@Override
public Observable<ResultSet> insertAvailabilityData(Metric<AvailabilityType> metric, int ttl) {
public Observable<Integer> insertAvailabilityData(Metric<AvailabilityType> metric, int ttl) {
return delegate.insertAvailabilityData(metric, ttl);
}

Expand Down

0 comments on commit feb0669

Please sign in to comment.