Skip to content

Commit

Permalink
Merge pull request #151 from benjchristensen/version_1_3-rxjava
Browse files Browse the repository at this point in the history
Version 1.3 - RxJava Observable Integration
  • Loading branch information
benjchristensen committed Jul 26, 2013
2 parents 783867c + 2ae4493 commit 4feab8a
Show file tree
Hide file tree
Showing 18 changed files with 2,643 additions and 1,686 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=1.2.19-SNAPSHOT
version=1.3.0-SNAPSHOT
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=http\://services.gradle.org/distributions/gradle-1.1-bin.zip
distributionUrl=http\://services.gradle.org/distributions/gradle-1.3-bin.zip
1 change: 1 addition & 0 deletions hystrix-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ apply plugin: 'idea'

dependencies {
compile 'com.netflix.archaius:archaius-core:0.4.1'
compile 'com.netflix.rxjava:rxjava-core:0.9.0'
compile 'org.slf4j:slf4j-api:1.7.0'
compile 'com.google.code.findbugs:jsr305:2.0.0'
provided 'junit:junit-dep:4.10'
Expand Down
1,078 changes: 211 additions & 867 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java

Large diffs are not rendered by default.

2,008 changes: 1,255 additions & 753 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

import java.util.concurrent.Future;

import rx.Observable;
import rx.concurrency.Schedulers;

import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.exception.HystrixBadRequestException;
import com.netflix.hystrix.exception.HystrixRuntimeException;

Expand Down Expand Up @@ -56,4 +60,35 @@ public interface HystrixExecutable<R> {
*/
public Future<R> queue();

/**
* Used for asynchronous execution of command with a callback by subscribing to the {@link Observable}.
* <p>
* This eagerly starts execution of the command the same as {@link #queue()} and {@link #execute()}.
* A lazy {@link Observable} can be obtained from {@link HystrixCommand#toObservable()} or {@link HystrixCollapser#toObservable()}.
* <p>
* <b>Callback Scheduling</b>
* <p>
* <ul>
* <li>When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.</li>
* <li>When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.</li>
* </ul>
* Use {@link HystrixCommand#toObservable(rx.Scheduler)} or {@link HystrixCollapser#toObservable(rx.Scheduler)} to schedule the callback differently.
* <p>
* See https://github.com/Netflix/RxJava/wiki for more information.
*
* @return {@code Observable<R>} that executes and calls back with the result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
* @throws HystrixRuntimeException
* if a fallback does not exist
* <p>
* <ul>
* <li>via {@code Observer#onError} if a failure occurs</li>
* <li>or immediately if the command can not be queued (such as short-circuited, thread-pool/semaphore rejected)</li>
* </ul>
* @throws HystrixBadRequestException
* via {@code Observer#onError} if invalid arguments or state were used representing a user failure, not a system failure
* @throws IllegalStateException
* if invoked more than once
*/
public Observable<R> observe();

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@
import static org.junit.Assert.*;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
Expand Down Expand Up @@ -54,15 +56,15 @@ public class HystrixRequestCache {
* <p>
* Key => CommandPrefix + CacheKey : Future<?> from queue()
*/
private static final HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, Future<?>>> requestVariableForCache = new HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, Future<?>>>(new HystrixRequestVariableLifecycle<ConcurrentHashMap<ValueCacheKey, Future<?>>>() {
private static final HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, Observable<?>>> requestVariableForCache = new HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, Observable<?>>>(new HystrixRequestVariableLifecycle<ConcurrentHashMap<ValueCacheKey, Observable<?>>>() {

@Override
public ConcurrentHashMap<ValueCacheKey, Future<?>> initialValue() {
return new ConcurrentHashMap<ValueCacheKey, Future<?>>();
public ConcurrentHashMap<ValueCacheKey, Observable<?>> initialValue() {
return new ConcurrentHashMap<ValueCacheKey, Observable<?>>();
}

@Override
public void shutdown(ConcurrentHashMap<ValueCacheKey, Future<?>> value) {
public void shutdown(ConcurrentHashMap<ValueCacheKey, Observable<?>> value) {
// nothing to shutdown
};

Expand Down Expand Up @@ -104,11 +106,11 @@ private static HystrixRequestCache getInstance(RequestCacheKey rcKey, HystrixCon
*/
// suppressing warnings because we are using a raw Future since it's in a heterogeneous ConcurrentHashMap cache
@SuppressWarnings({ "unchecked" })
public <T> Future<T> get(String cacheKey) {
/* package */<T> Observable<T> get(String cacheKey) {
ValueCacheKey key = getRequestCacheKey(cacheKey);
if (key != null) {
/* look for the stored value */
return (Future<T>) requestVariableForCache.get(concurrencyStrategy).get(key);
return (Observable<T>) requestVariableForCache.get(concurrencyStrategy).get(key);
}
return null;
}
Expand All @@ -127,11 +129,11 @@ public <T> Future<T> get(String cacheKey) {
*/
// suppressing warnings because we are using a raw Future since it's in a heterogeneous ConcurrentHashMap cache
@SuppressWarnings({ "unchecked" })
public <T> Future<T> putIfAbsent(String cacheKey, Future<T> f) {
/* package */<T> Observable<T> putIfAbsent(String cacheKey, Observable<T> f) {
ValueCacheKey key = getRequestCacheKey(cacheKey);
if (key != null) {
/* look for the stored value */
Future<T> alreadySet = (Future<T>) requestVariableForCache.get(concurrencyStrategy).putIfAbsent(key, f);
Observable<T> alreadySet = (Observable<T>) requestVariableForCache.get(concurrencyStrategy).putIfAbsent(key, f);
if (alreadySet != null) {
// someone beat us so we didn't cache this
return alreadySet;
Expand Down Expand Up @@ -282,17 +284,17 @@ public void testCache() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
HystrixRequestCache cache1 = HystrixRequestCache.getInstance(HystrixCommandKey.Factory.asKey("command1"), strategy);
cache1.putIfAbsent("valueA", new TestFuture("a1"));
cache1.putIfAbsent("valueA", new TestFuture("a2"));
cache1.putIfAbsent("valueB", new TestFuture("b1"));
cache1.putIfAbsent("valueA", new TestObservable("a1"));
cache1.putIfAbsent("valueA", new TestObservable("a2"));
cache1.putIfAbsent("valueB", new TestObservable("b1"));

HystrixRequestCache cache2 = HystrixRequestCache.getInstance(HystrixCommandKey.Factory.asKey("command2"), strategy);
cache2.putIfAbsent("valueA", new TestFuture("a3"));
cache2.putIfAbsent("valueA", new TestObservable("a3"));

assertEquals("a1", cache1.get("valueA").get());
assertEquals("b1", cache1.get("valueB").get());
assertEquals("a1", cache1.get("valueA").toBlockingObservable().last());
assertEquals("b1", cache1.get("valueB").toBlockingObservable().last());

assertEquals("a3", cache2.get("valueA").get());
assertEquals("a3", cache2.get("valueA").toBlockingObservable().last());
assertNull(cache2.get("valueB"));
} catch (Exception e) {
fail("Exception: " + e.getMessage());
Expand All @@ -318,8 +320,8 @@ public void testClearCache() {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
HystrixRequestCache cache1 = HystrixRequestCache.getInstance(HystrixCommandKey.Factory.asKey("command1"), strategy);
cache1.putIfAbsent("valueA", new TestFuture("a1"));
assertEquals("a1", cache1.get("valueA").get());
cache1.putIfAbsent("valueA", new TestObservable("a1"));
assertEquals("a1", cache1.get("valueA").toBlockingObservable().last());
cache1.clear("valueA");
assertNull(cache1.get("valueA"));
} catch (Exception e) {
Expand All @@ -330,39 +332,19 @@ public void testClearCache() {
}
}

private static class TestFuture implements Future<String> {

private final String value;

public TestFuture(String value) {
this.value = value;
}
private static class TestObservable extends Observable<String> {
public TestObservable(final String value) {
super(new Func1<Observer<String>, Subscription>() {

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public Subscription call(Observer<String> observer) {
observer.onNext(value);
observer.onCompleted();
return Subscriptions.empty();
}

@Override
public boolean isCancelled() {
return false;
});
}

@Override
public boolean isDone() {
return false;
}

@Override
public String get() throws InterruptedException, ExecutionException {
return value;
}

@Override
public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return value;
}

}

}
Expand Down
Loading

0 comments on commit 4feab8a

Please sign in to comment.