Skip to content

Commit

Permalink
Revert "[HWKMETRICS-660] use logged batches for adding tags"
Browse files Browse the repository at this point in the history
This reverts commit 8530dd6.
  • Loading branch information
Stefan Negrea committed Sep 25, 2017
1 parent 2d37bcb commit a0b5252
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 88 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -18,6 +18,7 @@
package org.hawkular.metrics.core.service;

import java.util.Map;
import java.util.Set;

import org.hawkular.metrics.core.service.compress.CompressedPointContainer;
import org.hawkular.metrics.model.AvailabilityType;
Expand Down Expand Up @@ -58,9 +59,7 @@ public interface DataAccess {

<T> Observable<ResultSet> addTags(Metric<T> metric, Map<String, String> tags);

<T> Observable<ResultSet> deleteTags(Metric<T> metric, Map<String, String> tags);

<T> Observable<ResultSet> deleteFromMetricsIndexAndTags(MetricId<T> id, Map<String, String> tags);
<T> Observable<ResultSet> deleteTags(Metric<T> metric, Set<String> tags);

<T> Observable<Integer> updateMetricsIndex(Observable<Metric<T>> metrics);

Expand Down Expand Up @@ -110,6 +109,10 @@ <T> Observable<ResultSet> updateRetentionsIndex(String tenantId, MetricType<T> t

<T> ResultSetFuture updateRetentionsIndex(Metric<T> metric);

<T> Observable<ResultSet> insertIntoMetricsTagsIndex(Metric<T> metric, Map<String, String> tags);

<T> Observable<ResultSet> deleteFromMetricsTagsIndex(Metric<T> metric, Map<String, String> tags);

Observable<Row> findMetricsByTagName(String tenantId, String tag);

Observable<Row> findMetricsByTagNameValue(String tenantId, String tag, String tvalue);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -28,12 +28,11 @@
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

import org.hawkular.metrics.core.service.compress.CompressedPointContainer;
import org.hawkular.metrics.core.service.log.CoreLogger;
import org.hawkular.metrics.core.service.log.CoreLogging;
import org.hawkular.metrics.core.service.transformers.BatchStatementTransformer;
import org.hawkular.metrics.model.AvailabilityType;
import org.hawkular.metrics.model.DataPoint;
Expand All @@ -45,27 +44,22 @@
import org.hawkular.rx.cassandra.driver.RxSession;
import org.hawkular.rx.cassandra.driver.RxSessionImpl;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.utils.UUIDs;

import rx.Observable;
import rx.exceptions.Exceptions;

/**
*
* @author John Sanda
*/
public class DataAccessImpl implements DataAccess {

private static final CoreLogger log = CoreLogging.getCoreLogger(DataAccessImpl.class);

public static final long DPART = 0;
private Session session;

Expand Down Expand Up @@ -205,8 +199,6 @@ public class DataAccessImpl implements DataAccess {

private PreparedStatement findMetricsByTagNameValue;

private PreparedStatement deleteMetricFromMetricsIndex;

public DataAccessImpl(Session session) {
this.session = session;
rxSession = new RxSessionImpl(session);
Expand Down Expand Up @@ -534,10 +526,6 @@ protected void initPreparedStatements() {
"SELECT tenant_id, type, metric " +
"FROM metrics_tags_idx " +
"WHERE tenant_id = ? AND tname = ? AND tvalue = ?");

deleteMetricFromMetricsIndex = session.prepare(
"DELETE FROM metrics_idx " +
"WHERE tenant_id = ? AND type = ? AND metric = ?");
}

@Override
Expand Down Expand Up @@ -611,41 +599,17 @@ public Observable<Row> getTagNamesWithType() {
@Override
public <T> Observable<ResultSet> addTags(Metric<T> metric, Map<String, String> tags) {
MetricId<T> metricId = metric.getMetricId();
BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED);

batch.add(addTagsToMetricsIndex.bind(tags, metricId.getTenantId(), metricId.getType().getCode(),
metricId.getName()));
tags.forEach((key, value) -> batch.add(insertMetricsTagsIndex.bind(metricId.getTenantId(), key, value,
metricId.getType().getCode(), metricId.getName())));

return rxSession.execute(batch)
.compose(applyWriteRetryPolicy("Failed to insert metric tags for metric id " + metricId));
BoundStatement stmt = addTagsToMetricsIndex.bind(tags, metricId.getTenantId(), metricId.getType().getCode(),
metricId.getName());
return rxSession.execute(stmt);
}

@Override
public <T> Observable<ResultSet> deleteTags(Metric<T> metric, Map<String, String> tags) {
public <T> Observable<ResultSet> deleteTags(Metric<T> metric, Set<String> tags) {
MetricId<T> metricId = metric.getMetricId();
BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED);

batch.add(deleteTagsFromMetricsIndex.bind(tags.keySet(), metricId.getTenantId(), metricId.getType().getCode(),
metricId.getName()));
tags.forEach((key, value) -> batch.add(deleteMetricsTagsIndex.bind(metricId.getTenantId(), key, value,
metricId.getType().getCode(), metricId.getName())));

return rxSession.execute(batch)
.compose(applyWriteRetryPolicy("Failed to delete metric tags for metric id " + metricId));
}

@Override
public <T> Observable<ResultSet> deleteFromMetricsIndexAndTags(MetricId<T> id, Map<String, String> tags) {
BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED);

batch.add(deleteMetricFromMetricsIndex.bind(id.getTenantId(), id.getType().getCode(), id.getName()));
tags.forEach((key, value) -> batch.add(deleteMetricsTagsIndex.bind(id.getTenantId(), key, value,
id.getType().getCode(), id.getName())));

return rxSession.execute(batch)
.compose(applyWriteRetryPolicy("Failed to delete metric and tags for metric id " + id));
BoundStatement stmt = deleteTagsFromMetricsIndex.bind(tags, metricId.getTenantId(),
metricId.getType().getCode(), metricId.getName());
return rxSession.execute(stmt);
}

@Override
Expand Down Expand Up @@ -988,6 +952,27 @@ public <T> Observable<ResultSet> updateRetentionsIndex(String tenantId, MetricTy
.flatMap(rxSession::execute);
}

@Override
public <T> Observable<ResultSet> insertIntoMetricsTagsIndex(Metric<T> metric, Map<String, String> tags) {
MetricId<T> metricId = metric.getMetricId();
return tagsUpdates(tags, (name, value) -> insertMetricsTagsIndex.bind(metricId.getTenantId(), name, value,
metricId.getType().getCode(), metricId.getName()));
}

@Override
public <T> Observable<ResultSet> deleteFromMetricsTagsIndex(Metric<T> metric, Map<String, String> tags) {
MetricId<T> metricId = metric.getMetricId();
return tagsUpdates(tags, (name, value) -> deleteMetricsTagsIndex.bind(metricId.getTenantId(), name, value,
metricId.getType().getCode(), metricId.getName()));
}

private Observable<ResultSet> tagsUpdates(Map<String, String> tags,
BiFunction<String, String, BoundStatement> bindVars) {
return Observable.from(tags.entrySet())
.map(entry -> bindVars.apply(entry.getKey(), entry.getValue()))
.flatMap(rxSession::execute);
}

@Override
public Observable<Row> findMetricsByTagName(String tenantId, String tag) {
return rxSession.executeAndFetch(findMetricsByTagName.bind(tenantId, tag));
Expand Down Expand Up @@ -1055,27 +1040,4 @@ public <T> Observable<ResultSet> deleteAndInsertCompressedGauge(MetricId<T> id,
public Observable<Row> findAllMetricsFromTagsIndex() {
return rxSession.executeAndFetch(findAllMetricsFromTagsIndex.bind());
}

/*
* Apply our current retry policy to the insert behavior
*/
private <T> Observable.Transformer<T, T> applyWriteRetryPolicy(String msg) {
return tObservable -> tObservable
.retryWhen(errors -> {
Observable<Integer> range = Observable.range(1, 2);
return errors
.zipWith(range, (t, i) -> {
if (t instanceof DriverException) {
return i;
}
throw Exceptions.propagate(t);
})
.flatMap(retryCount -> {
long delay = (long) Math.min(Math.pow(2, retryCount) * 1000, 3000);
log.debug(msg);
log.debugf("Retrying batch insert in %d ms", delay);
return Observable.timer(delay, TimeUnit.MILLISECONDS);
});
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -487,7 +487,7 @@ public Observable<Void> createMetric(Metric<?> metric, boolean overwrite) {
// eventually want to implement more fine-grained error handling where we can
// notify the subscriber of what exactly fails.
List<Observable<ResultSet>> updates = new ArrayList<>();
updates.add(dataAccess.addTags(metric, metric.getTags()));
updates.add(dataAccess.insertIntoMetricsTagsIndex(metric, metric.getTags()));

if (metric.getDataRetention() != null) {
updates.add(updateRetentionsIndex(metric));
Expand Down Expand Up @@ -588,20 +588,21 @@ public Observable<Void> addTags(Metric<?> metric, Map<String, String> tags) {
return Observable.error(e);
}

return dataAccess.addTags(metric, tags).map(l -> null);
return dataAccess.addTags(metric, tags).mergeWith(dataAccess.insertIntoMetricsTagsIndex(metric, tags))
.toList().map(l -> null);
}

@Override
public Observable<Void> deleteTags(Metric<?> metric, Set<String> tags) {
return getMetricTags(metric.getMetricId())
.map(loadedTags -> {
if (tags != null) {
loadedTags.keySet().retainAll(tags);
}
loadedTags.keySet().retainAll(tags);
return loadedTags;
})
.flatMap(tagsToDelete -> dataAccess.deleteTags(metric, tagsToDelete))
.map(r -> null);
.flatMap(tagsToDelete -> {
return dataAccess.deleteTags(metric, tagsToDelete.keySet()).mergeWith(
dataAccess.deleteFromMetricsTagsIndex(metric, tagsToDelete)).toList().map(r -> null);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -18,6 +18,7 @@
package org.hawkular.metrics.core.service;

import java.util.Map;
import java.util.Set;

import org.hawkular.metrics.core.service.compress.CompressedPointContainer;
import org.hawkular.metrics.model.AvailabilityType;
Expand Down Expand Up @@ -94,15 +95,10 @@ public <T> Observable<ResultSet> addTags(Metric<T> metric, Map<String, String> t
}

@Override
public <T> Observable<ResultSet> deleteTags(Metric<T> metric, Map<String, String> tags) {
public <T> Observable<ResultSet> deleteTags(Metric<T> metric, Set<String> tags) {
return delegate.deleteTags(metric, tags);
}

@Override
public <T> Observable<ResultSet> deleteFromMetricsIndexAndTags(MetricId<T> id, Map<String, String> tags) {
return delegate.deleteFromMetricsIndexAndTags(id, tags);
}

@Override
public <T> Observable<Integer> updateMetricsIndex(Observable<Metric<T>> metrics) {
return delegate.updateMetricsIndex(metrics);
Expand Down Expand Up @@ -208,6 +204,16 @@ public <T> ResultSetFuture updateRetentionsIndex(Metric<T> metric) {
return delegate.updateRetentionsIndex(metric);
}

@Override
public <T> Observable<ResultSet> insertIntoMetricsTagsIndex(Metric<T> metric, Map<String, String> tags) {
return delegate.insertIntoMetricsTagsIndex(metric, tags);
}

@Override
public <T> Observable<ResultSet> deleteFromMetricsTagsIndex(Metric<T> metric, Map<String, String> tags) {
return delegate.deleteFromMetricsTagsIndex(metric, tags);
}

@Override
public Observable<Row> findMetricsByTagName(String tenantId, String tag) {
return delegate.findMetricsByTagName(tenantId, tag);
Expand Down

0 comments on commit a0b5252

Please sign in to comment.