Skip to content

Commit

Permalink
Merge pull request #863 from hawkular/HWKMETRICS-721
Browse files Browse the repository at this point in the history
HWKMETRICS-721
  • Loading branch information
John Sanda committed Sep 19, 2017
2 parents 282c203 + 1ef6dd3 commit 014d6b7
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,15 @@ public DeleteTenant(RxSession session, MetricsService metricsService) {
public Completable call(JobDetails details) {
String tenantId = details.getParameters().get("tenantId");


// The concat operator is used instead of merge to ensure things execute in order. The deleteMetricData
// method queries the metrics index, so we want to update the index only after we have finished deleting
// data.
return Completable.fromObservable(
deleteMetricData(tenantId)
.concatWith(deleteTenant(tenantId))
.concatWith(deleteRetentions(tenantId))
.concatWith(deleteMetricsIndex(tenantId))
.concatWith(deleteTags(tenantId))
)
return deleteMetricData(tenantId)
.concatWith(deleteTenant(tenantId))
.concatWith(deleteRetentions(tenantId))
.concatWith(deleteMetricsIndex(tenantId))
.concatWith(deleteTags(tenantId))
.toCompletable()
.doOnCompleted(() -> logger.infof("Finished deleting " + tenantId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ public interface DataAccess {

<T> Observable<ResultSet> addTags(Metric<T> metric, Map<String, String> tags);

<T> Observable<ResultSet> deleteTags(Metric<T> metric, Map<String, String> tags);

<T> Observable<ResultSet> deleteFromMetricsIndexAndTags(MetricId<T> id, Map<String, String> tags);
<T> Observable<ResultSet> deleteTags(Metric<T> metric, Set<String> tags);

<T> Observable<Integer> updateMetricsIndex(Observable<Metric<T>> metrics);

Expand Down Expand Up @@ -114,6 +112,10 @@ <T> Observable<ResultSet> updateRetentionsIndex(String tenantId, MetricType<T> t

<T> ResultSetFuture updateRetentionsIndex(Metric<T> metric);

<T> Observable<ResultSet> insertIntoMetricsTagsIndex(Metric<T> metric, Map<String, String> tags);

<T> Observable<ResultSet> deleteFromMetricsTagsIndex(MetricId<T> id, Map<String, String> tags);

Observable<Row> findMetricsByTagName(String tenantId, String tag);

Observable<Row> findMetricsByTagNameValue(String tenantId, String tag, String tvalue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -61,7 +62,6 @@

import com.datastax.driver.core.AbstractTableMetadata;
import com.datastax.driver.core.AggregateMetadata;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.CodecRegistry;
Expand Down Expand Up @@ -777,41 +777,17 @@ public Observable<Row> getTagNamesWithType() {
@Override
public <T> Observable<ResultSet> addTags(Metric<T> metric, Map<String, String> tags) {
MetricId<T> metricId = metric.getMetricId();
BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED);

batch.add(addTagsToMetricsIndex.bind(tags, metricId.getTenantId(), metricId.getType().getCode(),
metricId.getName()));
tags.forEach((key, value) -> batch.add(insertMetricsTagsIndex.bind(metricId.getTenantId(), key, value,
metricId.getType().getCode(), metricId.getName())));

return rxSession.execute(batch)
.compose(applyWriteRetryPolicy("Failed to insert metric tags for metric id " + metricId));
BoundStatement stmt = addTagsToMetricsIndex.bind(tags, metricId.getTenantId(), metricId.getType().getCode(),
metricId.getName());
return rxSession.execute(stmt);
}

@Override
public <T> Observable<ResultSet> deleteTags(Metric<T> metric, Map<String, String> tags) {
public <T> Observable<ResultSet> deleteTags(Metric<T> metric, Set<String> tags) {
MetricId<T> metricId = metric.getMetricId();
BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED);

batch.add(deleteTagsFromMetricsIndex.bind(tags.keySet(), metricId.getTenantId(), metricId.getType().getCode(),
metricId.getName()));
tags.forEach((key, value) -> batch.add(deleteMetricsTagsIndex.bind(metricId.getTenantId(), key, value,
metricId.getType().getCode(), metricId.getName())));

return rxSession.execute(batch)
.compose(applyWriteRetryPolicy("Failed to delete metric tags for metric id " + metricId));
}

@Override
public <T> Observable<ResultSet> deleteFromMetricsIndexAndTags(MetricId<T> id, Map<String, String> tags) {
BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED);

batch.add(deleteMetricFromMetricsIndex.bind(id.getTenantId(), id.getType().getCode(), id.getName()));
tags.forEach((key, value) -> batch.add(deleteMetricsTagsIndex.bind(id.getTenantId(), key, value,
id.getType().getCode(), id.getName())));

return rxSession.execute(batch)
.compose(applyWriteRetryPolicy("Failed to delete metric and tags for metric id " + id));
BoundStatement stmt = deleteTagsFromMetricsIndex.bind(tags, metricId.getTenantId(),
metricId.getType().getCode(), metricId.getName());
return rxSession.execute(stmt);
}

@Override
Expand Down Expand Up @@ -877,17 +853,14 @@ private Observable<PreparedStatement> getPrepForAllTempTables(TempStatement ts)
return Observable.from(prepMap.entrySet())
.map(Map.Entry::getValue)
.map(pMap -> pMap.get(getMapKey(MetricType.UNDEFINED, ts)));

}

@Override
public Observable<Row> findAllMetricIdentifiersInData() {
return getPrepForAllTempTables(TempStatement.LIST_ALL_METRICS_FROM_TABLE)
.map(PreparedStatement::bind)
.flatMap(b -> rxSession.executeAndFetch(b))
.concatWith(
rxSession.executeAndFetch(findAllMetricsInData.bind())
.concatWith(rxSession.executeAndFetch(findAllMetricsInDataCompressed.bind())));
.flatMap(b -> rxSession.executeAndFetch(b.bind()))
.mergeWith(rxSession.executeAndFetch(findAllMetricsInData.bind()))
.mergeWith(rxSession.executeAndFetch(findAllMetricsInDataCompressed.bind()));
}

/*
Expand All @@ -910,15 +883,15 @@ private Observable.Transformer<BoundStatement, Integer> applyMicroBatching() {
.flatMap(g -> g.compose(new BoundBatchStatementTransformer()))
.flatMap(batch -> rxSession
.execute(batch)
.compose(applyWriteRetryPolicy("Failed to insert batch of data points"))
.compose(applyInsertRetryPolicy())
.map(resultSet -> batch.size())
);
}

/*
* Apply our current retry policy to the insert behavior
*/
private <T> Observable.Transformer<T, T> applyWriteRetryPolicy(String msg) {
private <T> Observable.Transformer<T, T> applyInsertRetryPolicy() {
return tObservable -> tObservable
.retryWhen(errors -> {
Observable<Integer> range = Observable.range(1, 2);
Expand All @@ -931,8 +904,7 @@ private <T> Observable.Transformer<T, T> applyWriteRetryPolicy(String msg) {
})
.flatMap(retryCount -> {
long delay = (long) Math.min(Math.pow(2, retryCount) * 1000, 3000);
log.debug(msg);
log.debugf("Retrying batch insert in %d ms", delay);
log.debug("Retrying batch insert in " + delay + " ms");
return Observable.timer(delay, TimeUnit.MILLISECONDS);
});
});
Expand Down Expand Up @@ -1278,6 +1250,26 @@ public <T> Observable<ResultSet> updateRetentionsIndex(String tenantId, MetricTy
.flatMap(rxSession::execute);
}

@Override
public <T> Observable<ResultSet> insertIntoMetricsTagsIndex(Metric<T> metric, Map<String, String> tags) {
MetricId<T> metricId = metric.getMetricId();
return tagsUpdates(tags, (name, value) -> insertMetricsTagsIndex.bind(metricId.getTenantId(), name, value,
metricId.getType().getCode(), metricId.getName()));
}

@Override
public <T> Observable<ResultSet> deleteFromMetricsTagsIndex(MetricId<T> id, Map<String, String> tags) {
return tagsUpdates(tags, (name, value) -> deleteMetricsTagsIndex.bind(id.getTenantId(), name, value,
id.getType().getCode(), id.getName()));
}

private Observable<ResultSet> tagsUpdates(Map<String, String> tags,
BiFunction<String, String, BoundStatement> bindVars) {
return Observable.from(tags.entrySet())
.map(entry -> bindVars.apply(entry.getKey(), entry.getValue()))
.flatMap(rxSession::execute);
}

@Override
public Observable<Row> findMetricsByTagName(String tenantId, String tag) {
return rxSession.executeAndFetch(findMetricsByTagName.bind(tenantId, tag));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public Observable<Void> createMetric(Metric<?> metric, boolean overwrite) {
// eventually want to implement more fine-grained error handling where we can
// notify the subscriber of what exactly fails.
List<Observable<ResultSet>> updates = new ArrayList<>();
updates.add(dataAccess.addTags(metric, metric.getTags()));
updates.add(dataAccess.insertIntoMetricsTagsIndex(metric, metric.getTags()));

if (metric.getDataRetention() != null) {
updates.add(updateRetentionsIndex(metric));
Expand Down Expand Up @@ -615,20 +615,22 @@ public Observable<Void> addTags(Metric<?> metric, Map<String, String> tags) {

this.updateMetricExpiration(metric.getMetricId());

return dataAccess.addTags(metric, tags).map(l -> null);
return dataAccess.insertIntoMetricsTagsIndex(metric, tags).concatWith(dataAccess.addTags(metric, tags))
.toList().map(l -> null);
}

@Override
public Observable<Void> deleteTags(Metric<?> metric, Set<String> tags) {
return getMetricTags(metric.getMetricId())
.map(loadedTags -> {
if (tags != null) {
loadedTags.keySet().retainAll(tags);
}
loadedTags.keySet().retainAll(tags);
return loadedTags;
})
.flatMap(tagsToDelete -> dataAccess.deleteTags(metric, tagsToDelete))
.map(r -> null);
.flatMap(tagsToDelete -> {
return dataAccess.deleteTags(metric, tagsToDelete.keySet()).mergeWith(
dataAccess.deleteFromMetricsTagsIndex(metric.getMetricId(), tagsToDelete)).toList()
.map(r -> null);
});
}

@Override
Expand Down Expand Up @@ -1089,21 +1091,20 @@ private <T> T time(Timer timer, Callable<T> callable) {
}

@Override
@SuppressWarnings("unchecked")
public <T> Observable<Void> deleteMetric(MetricId<T> id) {
//NOTE: compressed data is not deleted due to the using TWCS compaction strategy
// for the compressed data table.

return getMetricTags(id)
.flatMap(tags -> dataAccess.deleteFromMetricsIndexAndTags(id, tags))
.concatWith(dataAccess.deleteMetricData(id))
.concatWith(dataAccess.deleteMetricFromRetentionIndex(id))
.concatWith(dataAccess.deleteFromMetricExpirationIndex(id))
.doOnError(Throwable::printStackTrace)
// .concatMap(r -> dataAccess.deleteMetricData(id))
// .concatMap(r -> dataAccess.deleteMetricFromRetentionIndex(id))
// .concatMap(r -> dataAccess.deleteFromMetricExpirationIndex(id)))
Observable<Void> result = dataAccess.getMetricTags(id)
.map(row -> row.getMap(0, String.class, String.class))
.defaultIfEmpty(new HashMap<>())
.flatMap(map -> dataAccess.deleteFromMetricsTagsIndex(id, map))
.map(r -> null);
result = result.mergeWith(dataAccess.deleteMetricFromMetricsIndex(id).map(r -> null))
.mergeWith(dataAccess.deleteMetricData(id).map(r -> null))
.mergeWith(dataAccess.deleteMetricFromRetentionIndex(id).map(r -> null))
.mergeWith(dataAccess.deleteFromMetricExpirationIndex(id).map(r -> null));

return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,10 @@ public <T> Observable<ResultSet> addTags(Metric<T> metric, Map<String, String> t
}

@Override
public <T> Observable<ResultSet> deleteTags(Metric<T> metric, Map<String, String> tags) {
public <T> Observable<ResultSet> deleteTags(Metric<T> metric, Set<String> tags) {
return delegate.deleteTags(metric, tags);
}

@Override
public <T> Observable<ResultSet> deleteFromMetricsIndexAndTags(MetricId<T> id, Map<String, String> tags) {
return delegate.deleteFromMetricsIndexAndTags(id, tags);
}

@Override
public <T> Observable<Integer> updateMetricsIndex(Observable<Metric<T>> metrics) {
return delegate.updateMetricsIndex(metrics);
Expand Down Expand Up @@ -208,6 +203,16 @@ public <T> ResultSetFuture updateRetentionsIndex(Metric<T> metric) {
return delegate.updateRetentionsIndex(metric);
}

@Override
public <T> Observable<ResultSet> insertIntoMetricsTagsIndex(Metric<T> metric, Map<String, String> tags) {
return delegate.insertIntoMetricsTagsIndex(metric, tags);
}

@Override
public <T> Observable<ResultSet> deleteFromMetricsTagsIndex(MetricId<T> id, Map<String, String> tags) {
return delegate.deleteFromMetricsTagsIndex(id, tags);
}

@Override
public Observable<Row> findMetricsByTagName(String tenantId, String tag) {
return delegate.findMetricsByTagName(tenantId, tag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ private <T, V> void createAndDeleteMetrics(MetricType<T> mType, T[] dataPointVal
Map<String, String> actualTags = metricsService.getMetricTags(mId).toBlocking().lastOrDefault(null);
assertEquals(actualTags, m.getTags());

doAction(() -> metricsService.deleteMetric(mId));
metricsService.deleteMetric(mId).toBlocking().lastOrDefault(null);
deletedMetrics.add(m);

for (Metric<T> checkMetric : mList) {
Expand Down

0 comments on commit 014d6b7

Please sign in to comment.