Skip to content

Commit

Permalink
[HWKMETRICS-422] Add metrics index caches to avoid updating the metri…
Browse files Browse the repository at this point in the history
…cs index if it was recently update. The max cache size in 20k items with automatica eviction of 1 hour since last read.
  • Loading branch information
Stefan Negrea committed Jun 24, 2016
1 parent c141952 commit 36ca4ff
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -80,8 +81,11 @@
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -178,6 +182,8 @@ public int hashCode() {
*/
private Map<MetricType<?>, Func1<Row, ? extends DataPoint<?>>> dataPointMappers;

private Map<MetricType<?>, Cache<String, Boolean>> metricsIdxCaches;

private int defaultTTL = Duration.standardDays(7).toStandardSeconds().getSeconds();

private int maxStringSize;
Expand Down Expand Up @@ -257,10 +263,21 @@ public void startUp(Session session, String keyspace, boolean resetDb, boolean c
.put(STRING, Functions::getStringDataPoint)
.build();

Builder<MetricType<?>, Cache<String, Boolean>> metricsIdxCachesBuilder = ImmutableMap
.<MetricType<?>, Cache<String, Boolean>> builder();
MetricType.userTypes().stream().forEach(
t -> metricsIdxCachesBuilder.put(t,
CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).maximumSize(20000).build()));
metricsIdxCaches = metricsIdxCachesBuilder.build();

initStringSize(session);
initMetrics();
}

public void clearMetricsIdxCaches() {
metricsIdxCaches.values().stream().forEach(m -> m.invalidateAll());
}

void loadDataRetentions() {
List<String> tenantIds = loadTenantIds();
CountDownLatch latch = new CountDownLatch(tenantIds.size() * 2);
Expand Down Expand Up @@ -628,23 +645,13 @@ public Observable<Void> deleteTags(Metric<?> metric, Set<String> tags) {
public <T> Observable<Void> addDataPoints(MetricType<T> metricType, Observable<Metric<T>> metrics) {
checkArgument(metricType != null, "metricType is null");

// We write to both the data and the metrics_idx tables. Each metric can have one or more data points. We
// currently write a separate batch statement for each metric.
// We write to the data table on every call and to the metrics_idx table only if the metric id is not
// in cache. Each metric can have one or more data points. We currently write a separate batch statement
// for each metric.
//
// TODO Is there additional overhead of using batch statement when there is only a single insert?
// If there is overhead, then we should avoid using batch statements when the metric has only a single
// data point which could be quite often.
//
// The metrics_idx table stores the metric id along with any tags, and data retention. The update we perform
// here though only inserts the metric id (i.e., name and interval). We need to revisit this logic. The original
// intent for updating metrics_idx here is that even if the client does not explicitly create the metric, we
// still have it in metrics_idx. In reality, I think clients will be explicitly creating metrics. This will
// certainly be the case with the full, integrated hawkular server.
//
// TODO Determine how much overhead is caused by updating metrics_idx on every write
// If there much overhead, then we might not want to update the index every time we insert data. Maybe
// we periodically update it in the background, so we will still be aware of metrics that have not been
// explicitly created, just not necessarily right away.

Meter meter = getInsertMeter(metricType);
Func2<Metric<T>, Integer, Observable<Integer>> inserter = getInserter(metricType);
Expand All @@ -655,7 +662,18 @@ public <T> Observable<Void> addDataPoints(MetricType<T> metricType, Observable<M
.doOnNext(i -> insertedDataPointEvents.onNext(metric)))
.doOnNext(meter::mark);

Observable<Integer> indexUpdates = dataAccess.updateMetricsIndex(metrics)
Cache<String, Boolean> metricsIdxCache = metricsIdxCaches.get(metricType);

Observable<Metric<T>> updateIdxMetrics = metrics.filter(metric -> {
if (metricsIdxCache.getIfPresent(metric.getId()) == null) {
metricsIdxCache.put(metric.getId(), true);
return true;
}

return false;
});

Observable<Integer> indexUpdates = dataAccess.updateMetricsIndex(updateIdxMetrics)
.doOnNext(batchSize -> log.tracef("Inserted %d %s metrics into metrics_idx", batchSize, metricType));

return updates.mergeWith(indexUpdates)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public void initMethod() {
session.execute("TRUNCATE leases");

metricsService.setDataAccess(dataAccess);
metricsService.clearMetricsIdxCaches();

NumericDataPointCollector.createPercentile = defaultCreatePercentile;
}

Expand Down

0 comments on commit 36ca4ff

Please sign in to comment.