Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iss1403: Support @HystrixCommand for rx.Single and rx.Completable sim… #1524

Merged
merged 4 commits into from May 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions hystrix-contrib/hystrix-javanica/README.md
Expand Up @@ -125,8 +125,8 @@ To perform "Reactive Execution" you should return an instance of `Observable` in
});
}
```

The return type of command method should be `Observable`.
In addition to `Observable` Javanica supports the following RX types: `Single` and `Completable`.
Hystrix core supports only one RX type which is `Observable`, `HystrixObservableCommand` requires to return `Observable` therefore javanica transforms `Single` or `Completable` to `Observable` using `toObservable()` method for appropriate type and before returning the result to caller it translates `Observable` to either `Single` or `Completable` using `toSingle()` or `toCompletable()` correspondingly.

HystrixObservable interface provides two methods: ```observe()``` - eagerly starts execution of the command the same as ``` HystrixCommand#queue()``` and ```HystrixCommand#execute()```; ```toObservable()``` - lazily starts execution of the command only once the Observable is subscribed to. To control this behaviour and swith between two modes ```@HystrixCommand``` provides specific parameter called ```observableExecutionMode```.
```@HystrixCommand(observableExecutionMode = EAGER)``` indicates that ```observe()``` method should be used to execute observable command
Expand Down
Expand Up @@ -41,7 +41,9 @@
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import rx.Completable;
import rx.Observable;
import rx.Single;
import rx.functions.Func1;

import java.lang.reflect.Method;
Expand Down Expand Up @@ -110,8 +112,8 @@ public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinP
return result;
}

private Observable executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) {
return ((Observable) CommandExecutor.execute(invokable, executionType, metaHolder))
private Object executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) {
return mapObservable(((Observable) CommandExecutor.execute(invokable, executionType, metaHolder))
.onErrorResumeNext(new Func1<Throwable, Observable>() {
@Override
public Observable call(Throwable throwable) {
Expand All @@ -123,7 +125,16 @@ public Observable call(Throwable throwable) {
}
return Observable.error(throwable);
}
});
}), metaHolder);
}

private Object mapObservable(Observable observable, final MetaHolder metaHolder) {
if (Completable.class.isAssignableFrom(metaHolder.getMethod().getReturnType())) {
return observable.toCompletable();
} else if (Single.class.isAssignableFrom(metaHolder.getMethod().getReturnType())) {
return observable.toSingle();
}
return observable;
}

private Throwable hystrixRuntimeExceptionToThrowable(MetaHolder metaHolder, HystrixRuntimeException e) {
Expand Down
Expand Up @@ -15,8 +15,12 @@
*/
package com.netflix.hystrix.contrib.javanica.command;

import com.google.common.collect.ImmutableSet;
import rx.Completable;
import rx.Observable;
import rx.Single;

import java.util.Set;
import java.util.concurrent.Future;

/**
Expand All @@ -39,6 +43,9 @@ public enum ExecutionType {
*/
OBSERVABLE;

// RX types
private static final Set<? extends Class> RX_TYPES = ImmutableSet.of(Observable.class, Single.class, Completable.class);

/**
* Gets execution type for specified class type.
* @param type the type
Expand All @@ -47,11 +54,19 @@ public enum ExecutionType {
public static ExecutionType getExecutionType(Class<?> type) {
if (Future.class.isAssignableFrom(type)) {
return ExecutionType.ASYNCHRONOUS;
} else if (Observable.class.isAssignableFrom(type)) {
} else if (isRxType(type)) {
return ExecutionType.OBSERVABLE;
} else {
return ExecutionType.SYNCHRONOUS;
}
}

private static boolean isRxType(Class<?> cl) {
for (Class<?> rxType : RX_TYPES) {
if (rxType.isAssignableFrom(cl)) {
return true;
}
}
return false;
}
}
Expand Up @@ -28,7 +28,9 @@
import com.netflix.hystrix.exception.HystrixBadRequestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
import rx.Observable;
import rx.Single;
import rx.functions.Func1;

import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -67,7 +69,8 @@ public GenericObservableCommand(HystrixCommandBuilder builder) {
protected Observable construct() {
Observable result;
try {
result = ((Observable) commandActions.getCommandAction().execute(executionType))
Observable observable = toObservable(commandActions.getCommandAction().execute(executionType));
result = observable
.onErrorResumeNext(new Func1<Throwable, Observable>() {
@Override
public Observable call(Throwable throwable) {
Expand Down Expand Up @@ -105,6 +108,10 @@ protected Observable resumeWithFallback() {
Object res = commandActions.getFallbackAction().executeWithArgs(executionType, args);
if (res instanceof Observable) {
return (Observable) res;
} else if (res instanceof Single) {
return ((Single) res).toObservable();
} else if (res instanceof Completable) {
return ((Completable) res).toObservable();
} else {
return Observable.just(res);
}
Expand Down Expand Up @@ -157,4 +164,16 @@ boolean isIgnorable(Throwable throwable) {
}
return false;
}

private Observable toObservable(Object obj) {
if (Observable.class.isAssignableFrom(obj.getClass())) {
return (Observable) obj;
} else if (Completable.class.isAssignableFrom(obj.getClass())) {
return ((Completable) obj).toObservable();
} else if (Single.class.isAssignableFrom(obj.getClass())) {
return ((Single) obj).toObservable();
} else {
throw new IllegalStateException("unsupported rx type: " + obj.getClass());
}
}
}
Expand Up @@ -24,6 +24,7 @@
import com.netflix.hystrix.contrib.javanica.exception.FallbackDefinitionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import rx.Completable;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -97,6 +98,13 @@ public void validateReturnType(Method commandMethod) throws FallbackDefinitionEx
if (ExecutionType.OBSERVABLE == ExecutionType.getExecutionType(commandReturnType)) {
if (ExecutionType.OBSERVABLE != getExecutionType()) {
Type commandParametrizedType = commandMethod.getGenericReturnType();

// basically any object can be wrapped into Completable, Completable itself ins't parametrized
if(Completable.class.isAssignableFrom(commandMethod.getReturnType())) {
validateCompletableReturnType(commandMethod, method.getReturnType());
return;
}

if (isReturnTypeParametrized(commandMethod)) {
commandParametrizedType = getFirstParametrizedType(commandMethod);
}
Expand Down Expand Up @@ -142,6 +150,13 @@ private Type getFirstParametrizedType(Method m) {
return null;
}

// everything can be wrapped into completable except 'void'
private void validateCompletableReturnType(Method commandMethod, Class<?> callbackReturnType) {
if (Void.TYPE == callbackReturnType) {
throw new FallbackDefinitionException(createErrorMsg(commandMethod, method, "fallback cannot return 'void' if command return type is " + Completable.class.getSimpleName()));
}
}

private void validateReturnType(Method commandMethod, Method fallbackMethod) {
if (isGenericReturnType(commandMethod)) {
List<Type> commandParametrizedTypes = flattenTypeVariables(commandMethod.getGenericReturnType());
Expand Down
Expand Up @@ -14,6 +14,7 @@
import org.slf4j.helpers.MessageFormatter;
import org.springframework.test.context.junit4.rules.SpringClassRule;
import org.springframework.test.context.junit4.rules.SpringMethodRule;
import rx.Completable;

import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -64,6 +65,7 @@ public Object[] methodGenericDefinitionFailure() {
new Object[]{MethodGenericDefinitionFailureCase8.class},
new Object[]{MethodGenericDefinitionFailureCase9.class},
new Object[]{MethodGenericDefinitionFailureCase10.class},
new Object[]{MethodGenericDefinitionFailureCase11.class},

};
}
Expand Down Expand Up @@ -247,6 +249,12 @@ public static class MethodGenericDefinitionFailureCase10 {
private GenericEntity<? super Comparable> fallback() { return null; }
}

public static class MethodGenericDefinitionFailureCase11 {
@HystrixCommand(fallbackMethod = "fallback")
public Completable command() { throw new IllegalStateException(); }
private void fallback() { return; }
}

/* ====================================================================== */
/* ===================== GENERIC CLASS DEFINITIONS =====+================ */
/* =========================== SUCCESS CASES ============================ */
Expand Down
Expand Up @@ -24,11 +24,13 @@
import org.apache.commons.lang3.StringUtils;
import org.junit.Before;
import org.junit.Test;
import rx.Completable;
import rx.Observable;
import rx.Observer;
import rx.Single;
import rx.Subscriber;
import rx.functions.Action1;
import rx.subjects.ReplaySubject;
import rx.functions.Func0;

import static com.netflix.hystrix.contrib.javanica.test.common.CommonUtils.getHystrixCommandByKey;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -90,6 +92,82 @@ public void call(User user) {
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS));
}

@Test
public void testGetCompletableUser(){
userService.getCompletableUser("1", "name: ");
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getCompletableUser");
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS));
}

@Test
public void testGetCompletableUserWithRegularFallback() {
Completable completable = userService.getCompletableUserWithRegularFallback(null, "name: ");
completable.<User>toObservable().subscribe(new Action1<User>() {
@Override
public void call(User user) {
assertEquals("default_id", user.getId());
}
});
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getCompletableUserWithRegularFallback");
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
}

@Test
public void testGetCompletableUserWithRxFallback() {
Completable completable = userService.getCompletableUserWithRxFallback(null, "name: ");
completable.<User>toObservable().subscribe(new Action1<User>() {
@Override
public void call(User user) {
assertEquals("default_id", user.getId());
}
});
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getCompletableUserWithRxFallback");
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
}

@Test
public void testGetSingleUser() {
final String id = "1";
Single<User> user = userService.getSingleUser(id, "name: ");
user.subscribe(new Action1<User>() {
@Override
public void call(User user) {
assertEquals(id, user.getId());
}
});
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getSingleUser");
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS));
}

@Test
public void testGetSingleUserWithRegularFallback(){
Single<User> user = userService.getSingleUserWithRegularFallback(null, "name: ");
user.subscribe(new Action1<User>() {
@Override
public void call(User user) {
assertEquals("default_id", user.getId());
}
});
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getSingleUserWithRegularFallback");
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
}

@Test
public void testGetSingleUserWithRxFallback(){
Single<User> user = userService.getSingleUserWithRxFallback(null, "name: ");
user.subscribe(new Action1<User>() {
@Override
public void call(User user) {
assertEquals("default_id", user.getId());
}
});
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getSingleUserWithRxFallback");
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
}

@Test
public void testGetUserWithRegularFallback() {
Expand Down Expand Up @@ -163,6 +241,59 @@ public Observable<User> getUser(final String id, final String name) {
return createObservable(id, name);
}

@HystrixCommand
public Completable getCompletableUser(final String id, final String name) {
validate(id, name, "getCompletableUser has failed");
return createObservable(id, name).toCompletable();
}

@HystrixCommand(fallbackMethod = "completableUserRegularFallback")
public Completable getCompletableUserWithRegularFallback(final String id, final String name) {
return getCompletableUser(id, name);
}

@HystrixCommand(fallbackMethod = "completableUserRxFallback")
public Completable getCompletableUserWithRxFallback(final String id, final String name) {
return getCompletableUser(id, name);
}

public User completableUserRegularFallback(final String id, final String name) {
return new User("default_id", "default_name");
}

public Completable completableUserRxFallback(final String id, final String name) {
return Completable.fromCallable(new Func0<User>() {
@Override
public User call() {
return new User("default_id", "default_name");
}
});
}

@HystrixCommand
public Single<User> getSingleUser(final String id, final String name) {
validate(id, name, "getSingleUser has failed");
return createObservable(id, name).toSingle();
}

@HystrixCommand(fallbackMethod = "singleUserRegularFallback")
public Single<User> getSingleUserWithRegularFallback(final String id, final String name) {
return getSingleUser(id, name);
}

@HystrixCommand(fallbackMethod = "singleUserRxFallback")
public Single<User> getSingleUserWithRxFallback(final String id, final String name) {
return getSingleUser(id, name);
}

User singleUserRegularFallback(final String id, final String name) {
return new User("default_id", "default_name");
}

Single<User> singleUserRxFallback(final String id, final String name) {
return createObservable("default_id", "default_name").toSingle();
}

@HystrixCommand(fallbackMethod = "regularFallback", observableExecutionMode = ObservableExecutionMode.LAZY)
public Observable<User> getUserRegularFallback(final String id, final String name) {
validate(id, name, "getUser has failed");
Expand Down