Skip to content

Commit

Permalink
Revert "[HWKMETRICS-660] use logged batches for adding tags"
Browse files Browse the repository at this point in the history
This reverts commit 4dbd5a3.

# Conflicts:
#	core/metrics-core-service/src/main/java/org/hawkular/metrics/core/service/DataAccessImpl.java
#	core/metrics-core-service/src/main/java/org/hawkular/metrics/core/
  • Loading branch information
Stefan Negrea committed Sep 12, 2017
1 parent 282c203 commit fa2e8e0
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ public interface DataAccess {

<T> Observable<ResultSet> addTags(Metric<T> metric, Map<String, String> tags);

<T> Observable<ResultSet> deleteTags(Metric<T> metric, Map<String, String> tags);

<T> Observable<ResultSet> deleteFromMetricsIndexAndTags(MetricId<T> id, Map<String, String> tags);
<T> Observable<ResultSet> deleteTags(Metric<T> metric, Set<String> tags);

<T> Observable<Integer> updateMetricsIndex(Observable<Metric<T>> metrics);

Expand Down Expand Up @@ -114,6 +112,10 @@ <T> Observable<ResultSet> updateRetentionsIndex(String tenantId, MetricType<T> t

<T> ResultSetFuture updateRetentionsIndex(Metric<T> metric);

<T> Observable<ResultSet> insertIntoMetricsTagsIndex(Metric<T> metric, Map<String, String> tags);

<T> Observable<ResultSet> deleteFromMetricsTagsIndex(MetricId<T> id, Map<String, String> tags);

Observable<Row> findMetricsByTagName(String tenantId, String tag);

Observable<Row> findMetricsByTagNameValue(String tenantId, String tag, String tvalue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -61,7 +62,6 @@

import com.datastax.driver.core.AbstractTableMetadata;
import com.datastax.driver.core.AggregateMetadata;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.CodecRegistry;
Expand Down Expand Up @@ -777,41 +777,17 @@ public Observable<Row> getTagNamesWithType() {
@Override
public <T> Observable<ResultSet> addTags(Metric<T> metric, Map<String, String> tags) {
MetricId<T> metricId = metric.getMetricId();
BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED);

batch.add(addTagsToMetricsIndex.bind(tags, metricId.getTenantId(), metricId.getType().getCode(),
metricId.getName()));
tags.forEach((key, value) -> batch.add(insertMetricsTagsIndex.bind(metricId.getTenantId(), key, value,
metricId.getType().getCode(), metricId.getName())));

return rxSession.execute(batch)
.compose(applyWriteRetryPolicy("Failed to insert metric tags for metric id " + metricId));
BoundStatement stmt = addTagsToMetricsIndex.bind(tags, metricId.getTenantId(), metricId.getType().getCode(),
metricId.getName());
return rxSession.execute(stmt);
}

@Override
public <T> Observable<ResultSet> deleteTags(Metric<T> metric, Map<String, String> tags) {
public <T> Observable<ResultSet> deleteTags(Metric<T> metric, Set<String> tags) {
MetricId<T> metricId = metric.getMetricId();
BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED);

batch.add(deleteTagsFromMetricsIndex.bind(tags.keySet(), metricId.getTenantId(), metricId.getType().getCode(),
metricId.getName()));
tags.forEach((key, value) -> batch.add(deleteMetricsTagsIndex.bind(metricId.getTenantId(), key, value,
metricId.getType().getCode(), metricId.getName())));

return rxSession.execute(batch)
.compose(applyWriteRetryPolicy("Failed to delete metric tags for metric id " + metricId));
}

@Override
public <T> Observable<ResultSet> deleteFromMetricsIndexAndTags(MetricId<T> id, Map<String, String> tags) {
BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED);

batch.add(deleteMetricFromMetricsIndex.bind(id.getTenantId(), id.getType().getCode(), id.getName()));
tags.forEach((key, value) -> batch.add(deleteMetricsTagsIndex.bind(id.getTenantId(), key, value,
id.getType().getCode(), id.getName())));

return rxSession.execute(batch)
.compose(applyWriteRetryPolicy("Failed to delete metric and tags for metric id " + id));
BoundStatement stmt = deleteTagsFromMetricsIndex.bind(tags, metricId.getTenantId(),
metricId.getType().getCode(), metricId.getName());
return rxSession.execute(stmt);
}

@Override
Expand Down Expand Up @@ -910,15 +886,15 @@ private Observable.Transformer<BoundStatement, Integer> applyMicroBatching() {
.flatMap(g -> g.compose(new BoundBatchStatementTransformer()))
.flatMap(batch -> rxSession
.execute(batch)
.compose(applyWriteRetryPolicy("Failed to insert batch of data points"))
.compose(applyInsertRetryPolicy())
.map(resultSet -> batch.size())
);
}

/*
* Apply our current retry policy to the insert behavior
*/
private <T> Observable.Transformer<T, T> applyWriteRetryPolicy(String msg) {
private <T> Observable.Transformer<T, T> applyInsertRetryPolicy() {
return tObservable -> tObservable
.retryWhen(errors -> {
Observable<Integer> range = Observable.range(1, 2);
Expand All @@ -931,8 +907,7 @@ private <T> Observable.Transformer<T, T> applyWriteRetryPolicy(String msg) {
})
.flatMap(retryCount -> {
long delay = (long) Math.min(Math.pow(2, retryCount) * 1000, 3000);
log.debug(msg);
log.debugf("Retrying batch insert in %d ms", delay);
log.debug("Retrying batch insert in " + delay + " ms");
return Observable.timer(delay, TimeUnit.MILLISECONDS);
});
});
Expand Down Expand Up @@ -1278,6 +1253,26 @@ public <T> Observable<ResultSet> updateRetentionsIndex(String tenantId, MetricTy
.flatMap(rxSession::execute);
}

@Override
public <T> Observable<ResultSet> insertIntoMetricsTagsIndex(Metric<T> metric, Map<String, String> tags) {
MetricId<T> metricId = metric.getMetricId();
return tagsUpdates(tags, (name, value) -> insertMetricsTagsIndex.bind(metricId.getTenantId(), name, value,
metricId.getType().getCode(), metricId.getName()));
}

@Override
public <T> Observable<ResultSet> deleteFromMetricsTagsIndex(MetricId<T> id, Map<String, String> tags) {
return tagsUpdates(tags, (name, value) -> deleteMetricsTagsIndex.bind(id.getTenantId(), name, value,
id.getType().getCode(), id.getName()));
}

private Observable<ResultSet> tagsUpdates(Map<String, String> tags,
BiFunction<String, String, BoundStatement> bindVars) {
return Observable.from(tags.entrySet())
.map(entry -> bindVars.apply(entry.getKey(), entry.getValue()))
.flatMap(rxSession::execute);
}

@Override
public Observable<Row> findMetricsByTagName(String tenantId, String tag) {
return rxSession.executeAndFetch(findMetricsByTagName.bind(tenantId, tag));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public Observable<Void> createMetric(Metric<?> metric, boolean overwrite) {
// eventually want to implement more fine-grained error handling where we can
// notify the subscriber of what exactly fails.
List<Observable<ResultSet>> updates = new ArrayList<>();
updates.add(dataAccess.addTags(metric, metric.getTags()));
updates.add(dataAccess.insertIntoMetricsTagsIndex(metric, metric.getTags()));

if (metric.getDataRetention() != null) {
updates.add(updateRetentionsIndex(metric));
Expand Down Expand Up @@ -615,20 +615,22 @@ public Observable<Void> addTags(Metric<?> metric, Map<String, String> tags) {

this.updateMetricExpiration(metric.getMetricId());

return dataAccess.addTags(metric, tags).map(l -> null);
return dataAccess.addTags(metric, tags).mergeWith(dataAccess.insertIntoMetricsTagsIndex(metric, tags))
.toList().map(l -> null);
}

@Override
public Observable<Void> deleteTags(Metric<?> metric, Set<String> tags) {
return getMetricTags(metric.getMetricId())
.map(loadedTags -> {
if (tags != null) {
loadedTags.keySet().retainAll(tags);
}
loadedTags.keySet().retainAll(tags);
return loadedTags;
})
.flatMap(tagsToDelete -> dataAccess.deleteTags(metric, tagsToDelete))
.map(r -> null);
.flatMap(tagsToDelete -> {
return dataAccess.deleteTags(metric, tagsToDelete.keySet()).mergeWith(
dataAccess.deleteFromMetricsTagsIndex(metric.getMetricId(), tagsToDelete)).toList()
.map(r -> null);
});
}

@Override
Expand Down Expand Up @@ -1089,21 +1091,23 @@ private <T> T time(Timer timer, Callable<T> callable) {
}

@Override
@SuppressWarnings("unchecked")
public <T> Observable<Void> deleteMetric(MetricId<T> id) {
Observable<Void> result = dataAccess.getMetricTags(id)
.map(row -> row.getMap(0, String.class, String.class))
.defaultIfEmpty(new HashMap<>())
.flatMap(map -> dataAccess.deleteFromMetricsTagsIndex(id, map))
.toList()
.flatMap(r -> dataAccess.deleteMetricFromMetricsIndex(id))
.flatMap(r -> null);

//NOTE: compressed data is not deleted due to the using TWCS compaction strategy
// for the compressed data table.
result.mergeWith(dataAccess.deleteMetricData(id).flatMap(r -> null));

return getMetricTags(id)
.flatMap(tags -> dataAccess.deleteFromMetricsIndexAndTags(id, tags))
.concatWith(dataAccess.deleteMetricData(id))
.concatWith(dataAccess.deleteMetricFromRetentionIndex(id))
.concatWith(dataAccess.deleteFromMetricExpirationIndex(id))
.doOnError(Throwable::printStackTrace)
// .concatMap(r -> dataAccess.deleteMetricData(id))
// .concatMap(r -> dataAccess.deleteMetricFromRetentionIndex(id))
// .concatMap(r -> dataAccess.deleteFromMetricExpirationIndex(id)))
.map(r -> null);
result.mergeWith(dataAccess.deleteMetricFromRetentionIndex(id).flatMap(r -> null));
result.mergeWith(dataAccess.deleteFromMetricExpirationIndex(id).flatMap(r -> null));

return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,10 @@ public <T> Observable<ResultSet> addTags(Metric<T> metric, Map<String, String> t
}

@Override
public <T> Observable<ResultSet> deleteTags(Metric<T> metric, Map<String, String> tags) {
public <T> Observable<ResultSet> deleteTags(Metric<T> metric, Set<String> tags) {
return delegate.deleteTags(metric, tags);
}

@Override
public <T> Observable<ResultSet> deleteFromMetricsIndexAndTags(MetricId<T> id, Map<String, String> tags) {
return delegate.deleteFromMetricsIndexAndTags(id, tags);
}

@Override
public <T> Observable<Integer> updateMetricsIndex(Observable<Metric<T>> metrics) {
return delegate.updateMetricsIndex(metrics);
Expand Down Expand Up @@ -208,6 +203,16 @@ public <T> ResultSetFuture updateRetentionsIndex(Metric<T> metric) {
return delegate.updateRetentionsIndex(metric);
}

@Override
public <T> Observable<ResultSet> insertIntoMetricsTagsIndex(Metric<T> metric, Map<String, String> tags) {
return delegate.insertIntoMetricsTagsIndex(metric, tags);
}

@Override
public <T> Observable<ResultSet> deleteFromMetricsTagsIndex(MetricId<T> id, Map<String, String> tags) {
return delegate.deleteFromMetricsTagsIndex(id, tags);
}

@Override
public Observable<Row> findMetricsByTagName(String tenantId, String tag) {
return delegate.findMetricsByTagName(tenantId, tag);
Expand Down

0 comments on commit fa2e8e0

Please sign in to comment.