Skip to content

Commit

Permalink
[HWMMETRICS-74] initial commit for java driver wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
John Sanda committed May 26, 2015
1 parent 9a54ea2 commit 455ec75
Show file tree
Hide file tree
Showing 12 changed files with 356 additions and 59 deletions.
6 changes: 6 additions & 0 deletions core/metrics-core-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>hawkular-rx-java-driver</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@
import java.util.Map;
import java.util.Set;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import org.hawkular.metrics.core.api.AvailabilityData;
import org.hawkular.metrics.core.api.Availability;
import org.hawkular.metrics.core.api.AvailabilityData;
import org.hawkular.metrics.core.api.Counter;
import org.hawkular.metrics.core.api.Gauge;
import org.hawkular.metrics.core.api.GaugeData;
import org.hawkular.metrics.core.api.Interval;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricData;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.api.GaugeData;
import org.hawkular.metrics.core.api.Gauge;
import org.hawkular.metrics.core.api.Retention;
import org.hawkular.metrics.core.api.Tenant;
import rx.Observable;

/**
* @author John Sanda
Expand Down Expand Up @@ -66,12 +68,12 @@ ResultSetFuture updateTagsInMetricsIndex(Metric<?> metric, Map<String, String> a

ResultSetFuture insertData(Gauge metric, int ttl);

ResultSetFuture findData(String tenantId, MetricId id, long startTime, long endTime);
// ResultSetFuture findData(QueryParams queryParams);
Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime);

ResultSetFuture findData(Gauge metric, long startTime, long endTime, Order order);

ResultSetFuture findData(String tenantId, MetricId id, long startTime, long endTime, boolean includeWriteTime);
Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime,
boolean includeWriteTime);

ResultSetFuture findData(Gauge metric, long timestamp, boolean includeWriteTime);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,22 @@
import java.util.Set;
import java.util.function.BiFunction;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TupleType;
import com.datastax.driver.core.TupleValue;
import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;
import com.datastax.driver.core.utils.UUIDs;
import org.hawkular.metrics.core.api.AggregationTemplate;
import org.hawkular.metrics.core.api.AvailabilityData;
import org.hawkular.metrics.core.api.Availability;
import org.hawkular.metrics.core.api.AvailabilityData;
import org.hawkular.metrics.core.api.Counter;
import org.hawkular.metrics.core.api.Gauge;
import org.hawkular.metrics.core.api.GaugeData;
Expand All @@ -41,19 +54,9 @@
import org.hawkular.metrics.core.api.RetentionSettings;
import org.hawkular.metrics.core.api.Tenant;
import org.hawkular.metrics.core.api.TimeUUIDUtils;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TupleType;
import com.datastax.driver.core.TupleValue;
import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;
import com.datastax.driver.core.utils.UUIDs;
import org.hawkular.rx.cassandra.driver.RxSession;
import org.hawkular.rx.cassandra.driver.RxSessionImpl;
import rx.Observable;

/**
*
Expand All @@ -63,6 +66,8 @@ public class DataAccessImpl implements DataAccess {

private Session session;

private RxSession rxSession;

private PreparedStatement insertTenant;

private PreparedStatement findAllTenantIds;
Expand Down Expand Up @@ -141,6 +146,7 @@ public class DataAccessImpl implements DataAccess {

public DataAccessImpl(Session session) {
this.session = session;
rxSession = new RxSessionImpl(session);
initPreparedStatements();
}

Expand Down Expand Up @@ -452,7 +458,7 @@ public ResultSetFuture insertData(Gauge metric, int ttl) {
}

@Override
public ResultSetFuture findData(String tenantId, MetricId id, long startTime, long endTime) {
public Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime) {
return findData(tenantId, id, startTime, endTime, false);
}

Expand All @@ -470,16 +476,16 @@ public ResultSetFuture findData(Gauge metric, long startTime, long endTime, Orde
}

@Override
public ResultSetFuture findData(String tenantId, MetricId id, long startTime, long endTime,
public Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime,
boolean includeWriteTime) {
if (includeWriteTime) {
return session.executeAsync(findGaugeDataWithWriteTimeByDateRangeExclusive.bind(tenantId,
return rxSession.execute(findGaugeDataWithWriteTimeByDateRangeExclusive.bind(tenantId,
MetricType.GAUGE.getCode(), id.getName(), id.getInterval().toString(), Metric.DPART,
TimeUUIDUtils.getTimeUUID(startTime), TimeUUIDUtils.getTimeUUID(endTime)));
} else {
return session.executeAsync(findGaugeDataByDateRangeExclusive.bind(tenantId,
MetricType.GAUGE.getCode(), id.getName(), id.getInterval().toString(), Metric.DPART,
TimeUUIDUtils.getTimeUUID(startTime), TimeUUIDUtils.getTimeUUID(endTime)));
return rxSession.execute(findGaugeDataByDateRangeExclusive.bind(tenantId, MetricType.GAUGE.getCode(),
id.getName(), id.getInterval().toString(), Metric.DPART, TimeUUIDUtils.getTimeUUID(startTime),
TimeUUIDUtils.getTimeUUID(endTime)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static GaugeData getGaugeData(Row row) {
StreamSupport.stream(resultSet.spliterator(), false).map(Functions::getGaugeDataAndWriteTime)
.collect(toList());

private static GaugeData getGaugeDataAndWriteTime(Row row) {
public static GaugeData getGaugeDataAndWriteTime(Row row) {
return new GaugeData(
row.getUUID(GAUGE_COLS.TIME.ordinal()),
row.getDouble(GAUGE_COLS.VALUE.ordinal()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
import org.hawkular.metrics.core.api.RetentionSettings;
import org.hawkular.metrics.core.api.Tenant;
import org.hawkular.metrics.core.api.TenantAlreadyExistsException;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.rx.cassandra.driver.RxUtil;
import org.joda.time.Duration;
import org.joda.time.Hours;
import org.slf4j.Logger;
Expand Down Expand Up @@ -519,8 +521,7 @@ public Observable<GaugeData> findGaugeData(String tenantId, MetricId id, Long st
// When we implement date partitioning, dpart will have to be determined based on
// the start and end params. And it is possible the the date range spans multiple
// date partitions.
ResultSetFuture future = dataAccess.findData(tenantId, id, start, end);
return RxUtil.from(future, metricsTasks).flatMap(Observable::from).map(Functions::getGaugeData);
return dataAccess.findData(tenantId, id, start, end).flatMap(Observable::from).map(Functions::getGaugeData);
}

@Override
Expand All @@ -530,10 +531,22 @@ public ListenableFuture<BucketedOutput<GaugeBucketDataPoint>> findGaugeStats(
// When we implement date partitioning, dpart will have to be determined based on
// the start and end params. And it is possible the the date range spans multiple
// date partitions.
ResultSetFuture queryFuture = dataAccess.findData(metric.getTenantId(), metric.getId(), start, end);
ListenableFuture<List<GaugeData>> raw = Futures.transform(queryFuture, Functions.MAP_GAUGE_DATA,
metricsTasks);
return Futures.transform(raw, new GaugeBucketedOutputMapper(metric.getTenantId(), metric.getId(), buckets));

//
//
// ResultSetFuture queryFuture = dataAccess.findData(metric.getTenantId(), metric.getId(), start, end);
// ListenableFuture<List<GaugeData>> raw = Futures.transform(queryFuture, Functions.MAP_GAUGE_DATA,
// metricsTasks);
// return Futures.transform(raw, new GaugeBucketedOutputMapper(metric.getTenantId(), metric.getId(), buckets));

List<GaugeData> data = ImmutableList.copyOf(
dataAccess.findData(metric.getTenantId(), metric.getId(), start, end)
.flatMap(Observable::from)
.map(Functions::getGaugeData)
.toBlocking()
.toIterable());
GaugeBucketedOutputMapper mapper = new GaugeBucketedOutputMapper(metric.getTenantId(), metric.getId(), buckets);
return Futures.immediateFuture(mapper.apply(data));
}

@Override
Expand Down Expand Up @@ -603,12 +616,27 @@ public Boolean apply(ResultSet resultSet) {
// metrics since they could efficiently be inserted in a single batch statement.
public ListenableFuture<List<GaugeData>> tagGaugeData(Gauge metric, final Map<String, String> tags,
long start, long end) {
ResultSetFuture queryFuture = dataAccess.findData(metric.getTenantId(), metric.getId(), start, end, true);
ListenableFuture<List<GaugeData>> dataFuture = Futures.transform(queryFuture,
Functions.MAP_GAUGE_DATA_WITH_WRITE_TIME, metricsTasks);
// ResultSetFuture queryFuture = dataAccess.findData(metric.getTenantId(), metric.getId(), start, end, true);
// ListenableFuture<List<GaugeData>> dataFuture = Futures.transform(queryFuture,
// Functions.MAP_GAUGE_DATA_WITH_WRITE_TIME, metricsTasks);
// int ttl = getTTL(metric);
// ListenableFuture<List<GaugeData>> updatedDataFuture = Futures.transform(dataFuture, new ComputeTTL<>(ttl));
// return Futures.transform(updatedDataFuture, new TagAsyncFunction(tags, metric));

List<GaugeData> data = ImmutableList.copyOf(
dataAccess.findData(metric.getTenantId(), metric.getId(), start, end, true)
.flatMap(Observable::from)
.map(Functions::getGaugeDataAndWriteTime)
.toBlocking()
.toIterable()
);
int ttl = getTTL(metric);
ListenableFuture<List<GaugeData>> updatedDataFuture = Futures.transform(dataFuture, new ComputeTTL<>(ttl));
return Futures.transform(updatedDataFuture, new TagAsyncFunction(tags, metric));
List<GaugeData> updatedData = new ComputeTTL<GaugeData>(ttl).apply(data);
try {
return new TagAsyncFunction<GaugeData>(tags, metric).apply(updatedData);
} catch (Exception e) {
return Futures.immediateFailedFuture(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand All @@ -48,7 +47,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import rx.Observable;
import rx.schedulers.Schedulers;

/**
* @author John Sanda
Expand Down Expand Up @@ -132,22 +130,14 @@ public void insertAndFindGaugeRawData() throws Exception {

getUninterruptibly(dataAccess.insertData(metric, MetricsServiceCassandra.DEFAULT_TTL));

ResultSetFuture queryFuture = dataAccess.findData("tenant-1", new MetricId("metric-1"), start.getMillis(),
Observable<ResultSet> observable = dataAccess.findData("tenant-1", new MetricId("metric-1"), start.getMillis(),
end.getMillis());

Observable<ResultSet> observable = RxUtil.from(queryFuture, Schedulers.io());
Observable<Row> rowsObservable = observable.flatMap(Observable::from);
Observable<GaugeData> gaugeDataObservable = rowsObservable.map(Functions::getGaugeData);

// List<GaugeData> actual = ImmutableList.copyOf(gaugeDataObservable.toBlocking().toIterable());
List<GaugeData> actual = ImmutableList.copyOf(observable
.flatMap(Observable::from)
.map(Functions::getGaugeData)
.toBlocking()
.toIterable());

// ListenableFuture<List<GaugeData>> dataFuture = Futures.transform(queryFuture, Functions.MAP_GAUGE_DATA);
// List<GaugeData> actual = getUninterruptibly(dataFuture);
List<GaugeData> expected = asList(
new GaugeData(start.plusMinutes(2).getMillis(), 1.234),
new GaugeData(start.plusMinutes(1).getMillis(), 1.234),
Expand All @@ -174,10 +164,14 @@ public void addMetadataToGaugeRawData() throws Exception {
metric.addData(new GaugeData(end.getMillis(), 1.234));
getUninterruptibly(dataAccess.insertData(metric, MetricsServiceCassandra.DEFAULT_TTL));

ResultSetFuture queryFuture = dataAccess.findData("tenant-1", new MetricId("metric-1"), start.getMillis(),
Observable<ResultSet> observable = dataAccess.findData("tenant-1", new MetricId("metric-1"), start.getMillis(),
end.getMillis());
ListenableFuture<List<GaugeData>> dataFuture = Futures.transform(queryFuture, Functions.MAP_GAUGE_DATA);
List<GaugeData> actual = getUninterruptibly(dataFuture);
List<GaugeData> actual = ImmutableList.copyOf(observable
.flatMap(Observable::from)
.map(Functions::getGaugeData)
.toBlocking()
.toIterable());

List<GaugeData> expected = asList(
new GaugeData(start.plusMinutes(4).getMillis(), 1.234),
new GaugeData(start.plusMinutes(2).getMillis(), 1.234),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@
import java.util.Map;
import java.util.Set;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import org.hawkular.metrics.core.api.AvailabilityData;
import org.hawkular.metrics.core.api.Availability;
import org.hawkular.metrics.core.api.AvailabilityData;
import org.hawkular.metrics.core.api.Counter;
import org.hawkular.metrics.core.api.Gauge;
import org.hawkular.metrics.core.api.GaugeData;
import org.hawkular.metrics.core.api.Interval;
import org.hawkular.metrics.core.api.Metric;
import org.hawkular.metrics.core.api.MetricData;
import org.hawkular.metrics.core.api.MetricId;
import org.hawkular.metrics.core.api.MetricType;
import org.hawkular.metrics.core.api.GaugeData;
import org.hawkular.metrics.core.api.Gauge;
import org.hawkular.metrics.core.api.Retention;
import org.hawkular.metrics.core.api.Tenant;
import rx.Observable;

/**
* @author John Sanda
Expand Down Expand Up @@ -113,7 +115,7 @@ public ResultSetFuture insertData(Gauge metric, int ttl) {
}

@Override
public ResultSetFuture findData(String tenantId, MetricId id, long startTime, long endTime) {
public Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime) {
return delegate.findData(tenantId, id, startTime, endTime);
}

Expand All @@ -123,7 +125,7 @@ public ResultSetFuture findData(Gauge metric, long startTime, long endTime, Orde
}

@Override
public ResultSetFuture findData(String tenantId, MetricId id, long startTime, long endTime,
public Observable<ResultSet> findData(String tenantId, MetricId id, long startTime, long endTime,
boolean includeWriteTime) {
return delegate.findData(tenantId, id, startTime, endTime, includeWriteTime);
}
Expand Down
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@
</developers>

<modules>
<module>rx-java-driver</module>
<module>schema-manager</module>
<module>task-queue</module>
<module>core/metrics-core-api</module>
<module>core/metrics-core-impl</module>
<module>task-queue</module>
<module>embedded-cassandra/embedded-cassandra-service</module>
<module>embedded-cassandra/embedded-cassandra-ear</module>
<module>api/metrics-api-jaxrs</module>
Expand Down

0 comments on commit 455ec75

Please sign in to comment.