Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Javanica: basic Observable Collapser support #1320

Merged
merged 4 commits into from Sep 7, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -16,7 +16,6 @@
package com.netflix.hystrix.contrib.javanica.aop.aspectj;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.netflix.hystrix.HystrixInvokable;
import com.netflix.hystrix.contrib.javanica.annotation.DefaultProperties;
Expand Down Expand Up @@ -168,33 +167,38 @@ public MetaHolder create(Object proxy, Method collapserMethod, Object obj, Objec
}

Method batchCommandMethod = getDeclaredMethod(obj.getClass(), hystrixCollapser.batchMethod(), List.class);
if (batchCommandMethod == null || !batchCommandMethod.getReturnType().equals(List.class)) {

if (batchCommandMethod == null)
throw new IllegalStateException("batch method is absent: " + hystrixCollapser.batchMethod());

Class<?> batchReturnType = batchCommandMethod.getReturnType();
Class<?> collapserReturnType = collapserMethod.getReturnType();
boolean observable = collapserReturnType.equals(Observable.class);

if (!batchReturnType.equals(List.class))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this check is redundant because
getDeclaredMethod(obj.getClass(), hystrixCollapser.batchMethod(), List.class) already checks return type, if batch method return type isn't List then getDeclaredMethod returns null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's legacy code. anyway it's better to remove it

throw new IllegalStateException("required batch method for collapser is absent: "
+ "(java.util.List) " + obj.getClass().getCanonicalName() + "." +
hystrixCollapser.batchMethod() + "(java.util.List)");
}

if (!collapserMethod.getParameterTypes()[0]
.equals(getGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]))) {
throw new IllegalStateException("required batch method for collapser is absent, wrong generic type: expected"
.equals(getFirstGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]))) {
throw new IllegalStateException("required batch method for collapser is absent, wrong generic type: expected "
+ obj.getClass().getCanonicalName() + "." +
hystrixCollapser.batchMethod() + "(java.util.List<" + collapserMethod.getParameterTypes()[0] + ">), but it's " +
getGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]));
getFirstGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]));
}

Class<?> collapserMethodReturnType;
if (Future.class.isAssignableFrom(collapserMethod.getReturnType())) {
collapserMethodReturnType = getGenericParameter(collapserMethod.getGenericReturnType());
} else {
collapserMethodReturnType = collapserMethod.getReturnType();
}
final Class<?> collapserMethodReturnType = getFirstGenericParameter(
collapserMethod.getGenericReturnType(),
Future.class.isAssignableFrom(collapserReturnType) || Observable.class.isAssignableFrom(collapserReturnType) ? 1 : 0);

Class<?> batchCommandActualReturnType = getFirstGenericParameter(batchCommandMethod.getGenericReturnType());
if (!collapserMethodReturnType
.equals(getGenericParameter(batchCommandMethod.getGenericReturnType()))) {
.equals(batchCommandActualReturnType)) {
throw new IllegalStateException("Return type of batch method must be java.util.List parametrized with corresponding type: expected " +
"(java.util.List<" + collapserMethodReturnType + ">)" + obj.getClass().getCanonicalName() + "." +
hystrixCollapser.batchMethod() + "(java.util.List<" + collapserMethod.getParameterTypes()[0] + ">), but it's " +
getGenericParameter(batchCommandMethod.getGenericReturnType()));
batchCommandActualReturnType);
}

HystrixCommand hystrixCommand = batchCommandMethod.getAnnotation(HystrixCommand.class);
Expand All @@ -212,11 +216,12 @@ public MetaHolder create(Object proxy, Method collapserMethod, Object obj, Objec

builder.hystrixCollapser(hystrixCollapser);
builder.defaultCollapserKey(collapserMethod.getName());
builder.collapserExecutionType(ExecutionType.getExecutionType(collapserMethod.getReturnType()));
builder.collapserExecutionType(ExecutionType.getExecutionType(collapserReturnType));

builder.defaultCommandKey(batchCommandMethod.getName());
builder.hystrixCommand(hystrixCommand);
builder.executionType(ExecutionType.getExecutionType(batchCommandMethod.getReturnType()));
builder.executionType(ExecutionType.getExecutionType(batchReturnType));
builder.observable(observable);
FallbackMethod fallbackMethod = MethodProvider.getInstance().getFallbackMethod(obj.getClass(), batchCommandMethod);
if (fallbackMethod.isPresent()) {
fallbackMethod.validateReturnType(batchCommandMethod);
Expand Down Expand Up @@ -260,14 +265,26 @@ private static Method getAjcMethodFromTarget(JoinPoint joinPoint) {
}


private static Class<?> getGenericParameter(Type type) {
Type tType = ((ParameterizedType) type).getActualTypeArguments()[0];
String className = tType.toString().split(" ")[1];
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
throw Throwables.propagate(e);
private static Class<?> getFirstGenericParameter(Type type) {
return getFirstGenericParameter(type, 1);
}

private static Class<?> getFirstGenericParameter(final Type type, final int nestedDepth) {
int cDepth = 0;
Type tType = type;

for (int cDept = 0; cDept < nestedDepth; cDept++) {
if (!(tType instanceof ParameterizedType))
throw new IllegalStateException(String.format("Sub type at nesting level %d of %s is expected to be generic", cDepth, type));
tType = ((ParameterizedType) tType).getActualTypeArguments()[0];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be: tType = ((ParameterizedType) tType).getActualTypeArguments()[cDept]
BTW: In a separate branch I'm working on strategic solution to compare generic types. Will refactor this code to use the common feature.

}

if (tType instanceof ParameterizedType)
return (Class<?>) ((ParameterizedType) tType).getRawType();
else if (tType instanceof Class)
return (Class<?>) tType;

throw new UnsupportedOperationException("Unsupported type " + tType);
}

private static MetaHolder.Builder setDefaultProperties(MetaHolder.Builder builder, Class<?> declaringClass, final ProceedingJoinPoint joinPoint) {
Expand Down
Expand Up @@ -16,7 +16,6 @@
package com.netflix.hystrix.contrib.javanica.command;


import com.google.common.base.Throwables;
import com.netflix.hystrix.HystrixCollapser;
import com.netflix.hystrix.contrib.javanica.cache.CacheInvocationContext;
import com.netflix.hystrix.contrib.javanica.cache.HystrixCacheKeyGenerator;
Expand Down Expand Up @@ -140,8 +139,8 @@ boolean isIgnorable(Throwable throwable) {
* @param action the action
* @return result of command action execution
*/
Object process(Action action) throws Exception {
Object result;
<ReturnType> ReturnType process(Action<ReturnType> action) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need generic type here ?

ReturnType result;
try {
result = action.execute();
flushCache();
Expand Down Expand Up @@ -188,14 +187,14 @@ protected void flushCache() {
/**
* Common action.
*/
abstract class Action {
abstract class Action<ReturnType> {
/**
* Each implementation of this method should wrap any exceptions in CommandActionExecutionException.
*
* @return execution result
* @throws CommandActionExecutionException
*/
abstract Object execute() throws CommandActionExecutionException;
abstract ReturnType execute() throws CommandActionExecutionException;
}


Expand Down
Expand Up @@ -52,8 +52,8 @@ public HystrixCommandBuilder(Builder builder) {
this.executionType = builder.executionType;
}

public static Builder builder() {
return new Builder();
public static <ResponseType> Builder builder() {
return new Builder<ResponseType>();
}

public GenericSetterBuilder getSetterBuilder() {
Expand Down Expand Up @@ -85,12 +85,12 @@ public ExecutionType getExecutionType() {
}


public static class Builder {
public static class Builder<ResponseType> {
private GenericSetterBuilder setterBuilder;
private CommandActions commandActions;
private CacheInvocationContext<CacheResult> cacheResultInvocationContext;
private CacheInvocationContext<CacheRemove> cacheRemoveInvocationContext;
private Collection<HystrixCollapser.CollapsedRequest<Object, Object>> collapsedRequests = Collections.emptyList();
private Collection<HystrixCollapser.CollapsedRequest<ResponseType, Object>> collapsedRequests = Collections.emptyList();
private List<Class<? extends Throwable>> ignoreExceptions = Collections.emptyList();
private ExecutionType executionType = ExecutionType.SYNCHRONOUS;

Expand Down Expand Up @@ -144,7 +144,7 @@ public Builder cacheRemoveInvocationContext(CacheInvocationContext<CacheRemove>
* @param pCollapsedRequests the collapsed requests
* @return this {@link HystrixCommandBuilder.Builder}
*/
public Builder collapsedRequests(Collection<HystrixCollapser.CollapsedRequest<Object, Object>> pCollapsedRequests) {
public Builder collapsedRequests(Collection<HystrixCollapser.CollapsedRequest<ResponseType, Object>> pCollapsedRequests) {
this.collapsedRequests = pCollapsedRequests;
return this;
}
Expand Down
Expand Up @@ -51,7 +51,7 @@ public HystrixCommandBuilder create(MetaHolder metaHolder) {
return create(metaHolder, Collections.<HystrixCollapser.CollapsedRequest<Object, Object>>emptyList());
}

public HystrixCommandBuilder create(MetaHolder metaHolder, Collection<HystrixCollapser.CollapsedRequest<Object, Object>> collapsedRequests) {
public <ResponseType> HystrixCommandBuilder create(MetaHolder metaHolder, Collection<HystrixCollapser.CollapsedRequest<ResponseType, Object>> collapsedRequests) {
validateMetaHolder(metaHolder);

return HystrixCommandBuilder.builder()
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/
package com.netflix.hystrix.contrib.javanica.command;

import com.netflix.hystrix.HystrixExecutable;
import com.netflix.hystrix.HystrixInvokable;
import com.netflix.hystrix.contrib.javanica.collapser.CommandCollapser;

Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.hystrix.contrib.javanica.test.common.collapser;

import com.google.common.collect.Sets;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.HystrixInvokableInfo;
import com.netflix.hystrix.HystrixRequestLog;
Expand All @@ -25,9 +26,13 @@
import com.netflix.hystrix.contrib.javanica.test.common.domain.User;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -77,6 +82,33 @@ public void testGetUserById() throws ExecutionException, InterruptedException {
assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
}

@Test
public void testReactive() throws Exception {

final Observable<User> u1 = userService.getUserByIdReactive("1");
final Observable<User> u2 = userService.getUserByIdReactive("2");
final Observable<User> u3 = userService.getUserByIdReactive("3");
final Observable<User> u4 = userService.getUserByIdReactive("4");
final Observable<User> u5 = userService.getUserByIdReactive("5");

final Iterable<User> users = Observable.merge(u1, u2, u3, u4, u5).toBlocking().toIterable();

Set<String> expectedIds = Sets.newHashSet("1", "2", "3", "4", "5");
for (User cUser : users) {
assertEquals(expectedIds.remove(cUser.getId()), true);
}
assertEquals(expectedIds.isEmpty(), true);
assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
HystrixInvokableInfo<?> command = HystrixRequestLog.getCurrentRequest()
.getAllExecutedCommands().iterator().next();
// assert the command is the one we're expecting
assertEquals("getUserByIds", command.getCommandKey().name());
// confirm that it was a COLLAPSED command execution
assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
// and that it was successful
assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
}

@Test
public void testGetUserByIdWithFallback() throws ExecutionException, InterruptedException {
Future<User> f1 = userService.getUserByIdWithFallback("1");
Expand Down Expand Up @@ -158,6 +190,7 @@ public void testGetUserByIdWrongCollapserNoArgs() {

public static class UserService {

public static final Logger log = LoggerFactory.getLogger(UserService.class);
public static final User DEFAULT_USER = new User("def", "def");


Expand All @@ -173,6 +206,11 @@ public Future<User> getUserByIdWithFallback(String id) {
return null;
}

@HystrixCollapser(batchMethod = "getUserByIds",
collapserProperties = {@HystrixProperty(name = "timerDelayInMilliseconds", value = "200")})
public Observable<User> getUserByIdReactive(String id) {
return null;
}

@HystrixCollapser(batchMethod = "getUserByIdsThrowsException",
collapserProperties = {@HystrixProperty(name = "timerDelayInMilliseconds", value = "200")})
Expand Down Expand Up @@ -226,14 +264,14 @@ public List<User> getUserByIds(List<String> ids) {
for (String id : ids) {
users.add(new User(id, "name: " + id));
}
log.debug("executing on thread id: {}", Thread.currentThread().getId());
return users;
}

@HystrixCommand(fallbackMethod = "getUserByIdsFallback",
commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "10000")// for debug
})

public List<User> getUserByIdsWithFallback(List<String> ids) {
throw new RuntimeException("not found");
}
Expand Down
Expand Up @@ -19,8 +19,9 @@ log4j.rootLogger = ERROR, CONSOLE

# Define the console appender
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.File=${log}/log.out

# Define the layout for console appender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.conversionPattern=%m%n
log4j.appender.CONSOLE.layout.conversionPattern=%m%n

log4j.logger.com.netflix.hystrix.contrib.javanica=DEBUG