Skip to content

Commit

Permalink
Merge pull request #464 from tsegismont/jira/HWKMETRICS-356
Browse files Browse the repository at this point in the history
HWKMETRICS-356 Use RxJavaGuava instead of RxUtil
  • Loading branch information
jsanda committed Feb 29, 2016
2 parents 8141820 + 87ad583 commit 96e6d3c
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.hawkular.metrics.model.exception.TenantAlreadyExistsException;
import org.hawkular.metrics.schema.SchemaManager;
import org.hawkular.metrics.tasks.api.TaskScheduler;
import org.hawkular.rx.cassandra.driver.RxUtil;
import org.joda.time.Duration;

import com.codahale.metrics.Meter;
Expand All @@ -81,6 +80,7 @@
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.Func5;
import rx.observable.ListenableFutureObservable;
import rx.subjects.PublishSubject;

/**
Expand Down Expand Up @@ -393,7 +393,7 @@ public Observable<Void> createMetric(Metric<?> metric) {
}

ResultSetFuture future = dataAccess.insertMetricInMetricsIndex(metric);
Observable<ResultSet> indexUpdated = RxUtil.from(future, metricsTasks);
Observable<ResultSet> indexUpdated = ListenableFutureObservable.from(future, metricsTasks);
return Observable.create(subscriber -> indexUpdated.subscribe(resultSet -> {
if (!resultSet.wasApplied()) {
subscriber.onError(new MetricAlreadyExistsException(metric));
Expand Down Expand Up @@ -423,7 +423,7 @@ public Observable<Void> createMetric(Metric<?> metric) {

private Observable<ResultSet> updateRetentionsIndex(Metric<?> metric) {
ResultSetFuture dataRetentionFuture = dataAccess.updateRetentionsIndex(metric);
Observable<ResultSet> dataRetentionUpdated = RxUtil.from(dataRetentionFuture, metricsTasks);
Observable<ResultSet> dataRetentionUpdated = ListenableFutureObservable.from(dataRetentionFuture, metricsTasks);
// TODO Shouldn't we only update dataRetentions map when the retentions index update succeeds?
dataRetentions.put(new DataRetentionKey(metric), metric.getDataRetention());

Expand Down
6 changes: 6 additions & 0 deletions core/rx-java-driver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,11 @@
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-guava</artifactId>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2015 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -27,6 +27,7 @@

import rx.Observable;
import rx.Scheduler;
import rx.observable.ListenableFutureObservable;
import rx.schedulers.Schedulers;

/**
Expand Down Expand Up @@ -54,61 +55,61 @@ public RxSession init() {
@Override
public Observable<ResultSet> execute(String query) {
ResultSetFuture future = session.executeAsync(query);
return RxUtil.from(future, Schedulers.computation());
return ListenableFutureObservable.from(future, Schedulers.computation());
}

@Override
public Observable<ResultSet> execute(String query, Scheduler scheduler) {
ResultSetFuture future = session.executeAsync(query);
return RxUtil.from(future, scheduler);
return ListenableFutureObservable.from(future, scheduler);
}

@Override
public Observable<ResultSet> execute(String query, Object... values) {
ResultSetFuture future = session.executeAsync(query, values);
return RxUtil.from(future, Schedulers.computation());
return ListenableFutureObservable.from(future, Schedulers.computation());
}

@Override
public Observable<ResultSet> execute(String query, Scheduler scheduler, Object... values) {
ResultSetFuture future = session.executeAsync(query, values, scheduler);
return RxUtil.from(future, scheduler);
return ListenableFutureObservable.from(future, scheduler);
}

@Override
public Observable<ResultSet> execute(Statement statement) {
ResultSetFuture future = session.executeAsync(statement);
return RxUtil.from(future, Schedulers.computation());
return ListenableFutureObservable.from(future, Schedulers.computation());
}

@Override
public Observable<ResultSet> execute(Statement statement, Scheduler scheduler) {
ResultSetFuture future = session.executeAsync(statement);
return RxUtil.from(future, scheduler);
return ListenableFutureObservable.from(future, scheduler);
}

@Override
public Observable<PreparedStatement> prepare(String query) {
ListenableFuture<PreparedStatement> future = session.prepareAsync(query);
return RxUtil.from(future, Schedulers.computation());
return ListenableFutureObservable.from(future, Schedulers.computation());
}

@Override
public Observable<PreparedStatement> prepare(String query, Scheduler scheduler) {
ListenableFuture<PreparedStatement> future = session.prepareAsync(query);
return RxUtil.from(future, scheduler);
return ListenableFutureObservable.from(future, scheduler);
}

@Override
public Observable<PreparedStatement> prepare(RegularStatement statement) {
ListenableFuture<PreparedStatement> future = session.prepareAsync(statement);
return RxUtil.from(future, Schedulers.computation());
return ListenableFutureObservable.from(future, Schedulers.computation());
}

@Override
public Observable<PreparedStatement> prepare(RegularStatement statement, Scheduler scheduler) {
ListenableFuture<PreparedStatement> future = session.prepareAsync(statement);
return RxUtil.from(future, scheduler);
return ListenableFutureObservable.from(future, scheduler);
}

@Override
Expand Down

This file was deleted.

6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
<version.org.testng>6.8.8</version.org.testng>
<version.io.reactivex.rxjava>1.0.13</version.io.reactivex.rxjava>
<version.io.reactivex.rxjava-math>1.0.0</version.io.reactivex.rxjava-math>
<version.io.reactivex.rxjava-guava>1.0.3</version.io.reactivex.rxjava-guava>
<version.io.swagger>1.5.3</version.io.swagger>
<version.org.codehaus.mojo.findbugs-maven-plugin>3.0.0</version.org.codehaus.mojo.findbugs-maven-plugin>

Expand Down Expand Up @@ -203,6 +204,11 @@
<artifactId>rxjava-math</artifactId>
<version>${version.io.reactivex.rxjava-math}</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-guava</artifactId>
<version>${version.io.reactivex.rxjava-guava}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down

0 comments on commit 96e6d3c

Please sign in to comment.