Skip to content

Commit

Permalink
[HWKMETRICS-129] remove usage of C* counter data type
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed Jun 12, 2015
1 parent c097e51 commit 722e050
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
*/
public class MetricId {

private String group;

private String name;

private Interval interval;
Expand All @@ -33,20 +31,11 @@ public MetricId(String name) {
this(name, Interval.NONE);
}

public MetricId(String group, String name) {
this.group = group;
this.name = name;
}

public MetricId(String name, Interval interval) {
this.name = name;
this.interval = interval;
}

public String getGroup() {
return group;
}

public String getName() {
return name;
}
Expand All @@ -60,14 +49,13 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MetricId metricId = (MetricId) o;
return java.util.Objects.equals(group, metricId.group) &&
java.util.Objects.equals(name, metricId.name) &&
return java.util.Objects.equals(name, metricId.name) &&
java.util.Objects.equals(interval, metricId.interval);
}

@Override
public int hashCode() {
return java.util.Objects.hash(group, name, interval);
return java.util.Objects.hash(name, interval);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public enum MetricType {

AVAILABILITY(1, "availability"),

COUNTER(2, "counter");
COUNTER(2, "counter"),

COUNTER_RATE(3, "counter_rate");

private int code;

Expand Down Expand Up @@ -62,6 +64,7 @@ public static MetricType fromCode(int code) {
case 0 : return GAUGE;
case 1 : return AVAILABILITY;
case 2 : return COUNTER;
case 3 : return COUNTER_RATE;
default: throw new IllegalArgumentException(code + " is not a recognized metric type");
}
}
Expand All @@ -71,6 +74,7 @@ public static MetricType fromTextCode(String textCode) {
case "gauge": return GAUGE;
case "availability": return AVAILABILITY;
case "counter": return COUNTER;
case "counter_rate": return COUNTER_RATE;
default: throw new IllegalArgumentException(textCode + " is not a recognized metric type code");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@
*/
package org.hawkular.metrics.core.api;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;

import com.google.common.util.concurrent.ListenableFuture;

import rx.Observable;

/**
Expand Down Expand Up @@ -84,14 +81,6 @@ Observable<BucketedOutput<AvailabilityBucketDataPoint>> findAvailabilityStats(Me
long start, long end, Buckets buckets
);

ListenableFuture<Void> updateCounter(Counter counter);

ListenableFuture<Void> updateCounters(Collection<Counter> counters);

ListenableFuture<List<Counter>> findCounters(String group);

ListenableFuture<List<Counter>> findCounters(String group, List<String> counterNames);

/** Check if a metric with the passed {id} has been stored in the system */
Observable<Boolean> idExists(String id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,20 @@
*/
package org.hawkular.metrics.core.impl;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import org.hawkular.metrics.core.api.AvailabilityType;
import org.hawkular.metrics.core.api.Counter;
import org.hawkular.metrics.core.api.DataPoint;
import org.hawkular.metrics.core.api.Interval;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.api.Retention;
import org.hawkular.metrics.core.api.Tenant;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;

import rx.Observable;

/**
Expand Down Expand Up @@ -105,10 +101,6 @@ Observable<ResultSet> insertAvailabilityTag(String tag, String tagValue,

Observable<ResultSet> findAvailabilityData(String tenantId, MetricId id, long startTime, long endTime);

ResultSetFuture updateCounter(Counter counter);

ResultSetFuture updateCounters(Collection<Counter> counters);

ResultSetFuture findDataRetentions(String tenantId, MetricType type);

ResultSetFuture updateRetentionsIndex(String tenantId, MetricType type, Set<Retention> retentions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,12 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;

import org.hawkular.metrics.core.api.AggregationTemplate;
import org.hawkular.metrics.core.api.AvailabilityType;
import org.hawkular.metrics.core.api.Counter;
import org.hawkular.metrics.core.api.DataPoint;
import org.hawkular.metrics.core.api.Interval;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.api.Retention;
import org.hawkular.metrics.core.api.RetentionSettings;
import org.hawkular.metrics.core.api.Tenant;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.hawkular.rx.cassandra.driver.RxSessionImpl;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.DataType;
Expand All @@ -54,7 +39,18 @@
import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;
import com.datastax.driver.core.utils.UUIDs;

import org.hawkular.metrics.core.api.AggregationTemplate;
import org.hawkular.metrics.core.api.AvailabilityType;
import org.hawkular.metrics.core.api.DataPoint;
import org.hawkular.metrics.core.api.Interval;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.api.Retention;
import org.hawkular.metrics.core.api.RetentionSettings;
import org.hawkular.metrics.core.api.Tenant;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.hawkular.rx.cassandra.driver.RxSessionImpl;
import rx.Observable;

/**
Expand Down Expand Up @@ -620,23 +616,6 @@ public Observable<ResultSet> findAvailabilityData(String tenantId, MetricId id,
.getInterval().toString(), DPART, getTimeUUID(startTime), getTimeUUID(endTime)));
}

@Override
public ResultSetFuture updateCounter(Counter counter) {
BoundStatement statement = updateCounter.bind(counter.getValue(), counter.getTenantId(), counter.getGroup(),
counter.getName());
return session.executeAsync(statement);
}

@Override
public ResultSetFuture updateCounters(Collection<Counter> counters) {
BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.COUNTER);
for (Counter counter : counters) {
batchStatement.add(updateCounter.bind(counter.getValue(), counter.getTenantId(), counter.getGroup(),
counter.getName()));
}
return session.executeAsync(batchStatement);
}

@Override
public ResultSetFuture findDataRetentions(String tenantId, MetricType type) {
return session.executeAsync(findDataRetentions.bind(tenantId, type.getCode()));
Expand Down Expand Up @@ -682,16 +661,6 @@ public ResultSetFuture updateRetentionsIndex(Metric metric) {
metric.getId().getInterval().toString(), metric.getId().getName(), metric.getDataRetention()));
}

public ResultSetFuture findCounters(String tenantId, String group) {
BoundStatement statement = findCountersByGroup.bind(tenantId, group);
return session.executeAsync(statement);
}

public ResultSetFuture findCounters(String tenantId, String group, List<String> names) {
BoundStatement statement = findCountersByGroupAndName.bind(tenantId, group, names);
return session.executeAsync(statement);
}

private KeyspaceMetadata getKeyspace() {
return session.getCluster().getMetadata().getKeyspace(session.getLoggedKeyspace());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
package org.hawkular.metrics.core.impl;

import static java.util.Comparator.comparingLong;
import static org.hawkular.metrics.core.api.MetricType.COUNTER;
import static org.hawkular.metrics.core.api.MetricType.COUNTER_RATE;
import static org.hawkular.metrics.core.api.MetricType.GAUGE;
import static org.hawkular.metrics.core.impl.Functions.getTTLAvailabilityDataPoint;
import static org.hawkular.metrics.core.impl.Functions.getTTLGaugeDataPoint;
import static org.joda.time.Hours.hours;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -55,7 +54,6 @@
import org.hawkular.metrics.core.api.AvailabilityType;
import org.hawkular.metrics.core.api.BucketedOutput;
import org.hawkular.metrics.core.api.Buckets;
import org.hawkular.metrics.core.api.Counter;
import org.hawkular.metrics.core.api.DataPoint;
import org.hawkular.metrics.core.api.GaugeBucketDataPoint;
import org.hawkular.metrics.core.api.Interval;
Expand Down Expand Up @@ -369,10 +367,12 @@ private List<String> loadTenantIds() {

@Override
public Observable<Void> createMetric(Metric<?> metric) {
Metric<?> denormalizedMetric = new Metric<>(metric.getTenantId(), metric.getType(),
denormalizeId(metric.getType(), metric.getId()), metric.getTags(), metric.getDataRetention());
if (metric.getType() == COUNTER_RATE) {
throw new IllegalArgumentException(metric + " cannot be created. " + COUNTER_RATE + " metrics are " +
"internally generated metrics and cannot be created by clients.");
}

ResultSetFuture future = dataAccess.insertMetricInMetricsIndex(denormalizedMetric);
ResultSetFuture future = dataAccess.insertMetricInMetricsIndex(metric);
Observable<ResultSet> indexUpdated = RxUtil.from(future, metricsTasks);
return Observable.create(subscriber -> indexUpdated.subscribe(resultSet -> {
if (!resultSet.wasApplied()) {
Expand All @@ -389,34 +389,18 @@ public Observable<Void> createMetric(Metric<?> metric) {
// eventually want to implement more fine-grained error handling where we can
// notify the subscriber of what exactly fails.
List<Observable<ResultSet>> updates = new ArrayList<>();
updates.add(dataAccess.addTagsAndDataRetention(denormalizedMetric));
updates.add(dataAccess.insertIntoMetricsTagsIndex(denormalizedMetric, denormalizedMetric.getTags()));
updates.add(dataAccess.addTagsAndDataRetention(metric));
updates.add(dataAccess.insertIntoMetricsTagsIndex(metric, metric.getTags()));

if (denormalizedMetric.getDataRetention() != null) {
updates.add(updateRetentionsIndex(denormalizedMetric));
if (metric.getDataRetention() != null) {
updates.add(updateRetentionsIndex(metric));
}

Observable.merge(updates).subscribe(new VoidSubscriber<>(subscriber));
}
}));
}

MetricId denormalizeId(MetricType type, MetricId id) {
if (type == COUNTER) {
return new MetricId(id.getGroup() + "$" + id.getName());
}
return id;
}

MetricId normalizeId(MetricType type, MetricId id) {
if (type == COUNTER) {
String[] parts = id.getName().split("\\$");
return new MetricId(parts[0], parts[1]);
} else {
return id;
}
}

private Observable<ResultSet> updateRetentionsIndex(Metric metric) {
ResultSetFuture dataRetentionFuture = dataAccess.updateRetentionsIndex(metric);
Observable<ResultSet> dataRetentionUpdated = RxUtil.from(dataRetentionFuture, metricsTasks);
Expand All @@ -428,7 +412,7 @@ private Observable<ResultSet> updateRetentionsIndex(Metric metric) {

@Override
public Observable<Metric> findMetric(final String tenantId, final MetricType type, final MetricId id) {
return dataAccess.findMetric(tenantId, type, denormalizeId(type, id))
return dataAccess.findMetric(tenantId, type, id)
.flatMap(Observable::from)
.map(row -> new Metric(tenantId, type, id, row.getMap(2, String.class, String.class),
row.getInt(3)));
Expand All @@ -438,8 +422,8 @@ public Observable<Metric> findMetric(final String tenantId, final MetricType typ
public Observable<Metric> findMetrics(String tenantId, MetricType type) {
return dataAccess.findMetricsInMetricsIndex(tenantId, type)
.flatMap(Observable::from)
.map(row -> new Metric(tenantId, type, normalizeId(type, new MetricId(row.getString(0),
Interval.parse(row.getString(1)))), row.getMap(2, String.class, String.class), row.getInt(3)));
.map(row -> new Metric(tenantId, type, new MetricId(row.getString(0), Interval.parse(row.getString(1))),
row.getMap(2, String.class, String.class), row.getInt(3)));
}

@Override
Expand Down Expand Up @@ -524,33 +508,6 @@ public Observable<Void> addAvailabilityData(List<Metric<AvailabilityType>> metri
return results;
}

@Override
public ListenableFuture<Void> updateCounter(Counter counter) {
// return Futures.transform(dataAccess.updateCounter(counter), TO_VOID);
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<Void> updateCounters(Collection<Counter> counters) {
// ResultSetFuture future = dataAccess.updateCounters(counters);
// return Futures.transform(future, TO_VOID);
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<List<Counter>> findCounters(String group) {
// ResultSetFuture future = dataAccess.findCounters(group);
// return Futures.transform(future, mapCounters, metricsTasks);
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<List<Counter>> findCounters(String group, List<String> counterNames) {
// ResultSetFuture future = dataAccess.findCounters(group, counterNames);
// return Futures.transform(future, mapCounters, metricsTasks);
throw new UnsupportedOperationException();
}

@Override
public Observable<DataPoint<Double>> findGaugeData(String tenantId, MetricId id, Long start, Long end) {
// When we implement date partitioning, dpart will have to be determined based on
Expand Down

0 comments on commit 722e050

Please sign in to comment.