Skip to content
Permalink
Browse files
Injvm protocol support async invoke (#7555)
* Update InjvmInvoker.java

InjvmInvoker改为CompletableFuture模式

* 支持提供端配置async

* exception时返回exception

* Update InjvmInvoker.java

executor == null后自我保护

* add UT

* Update DemoServiceImpl.java

* Update InjvmProtocolTest.java

* Update InjvmInvoker.java

* Update InjvmInvoker.java
  • Loading branch information
zhangyz-hd committed May 11, 2021
1 parent 31db4ac commit 87d4fa9db126c4c91f01aeaa224ea86b65f8a87f
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 5 deletions.
@@ -17,17 +17,27 @@
package org.apache.dubbo.rpc.protocol.injvm;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Constants;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.FutureContext;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.InvokeMode;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import static org.apache.dubbo.common.constants.CommonConstants.LOCALHOST_VALUE;
import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;

/**
* InjvmInvoker
@@ -38,6 +48,8 @@

private final Map<String, Exporter<?>> exporterMap;

private final ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();

InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>> exporterMap) {
super(type, url);
this.key = key;
@@ -67,6 +79,33 @@ public Result doInvoke(Invocation invocation) throws Throwable {
if (serverHasToken) {
invocation.setAttachment(Constants.TOKEN_KEY, serverURL.getParameter(Constants.TOKEN_KEY));
}
return exporter.getInvoker().invoke(invocation);

if (isAsync(exporter.getInvoker().getUrl(), getUrl())) {
((RpcInvocation) invocation).setInvokeMode(InvokeMode.ASYNC);
// use consumer executor
ExecutorService executor = executorRepository.createExecutorIfAbsent(getUrl());
CompletableFuture<AppResponse> appResponseFuture = CompletableFuture.supplyAsync(() -> {
Result result = exporter.getInvoker().invoke(invocation);
if (result.hasException()) {
return new AppResponse(result.getException());
} else {
return new AppResponse(result.getValue());
}
}, executor);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, invocation);
result.setExecutor(executor);
return result;
} else {
return exporter.getInvoker().invoke(invocation);
}
}

private boolean isAsync(URL remoteUrl, URL localUrl) {
if (localUrl.hasParameter(ASYNC_KEY)) {
return localUrl.getParameter(ASYNC_KEY, false);
}
return remoteUrl.getParameter(ASYNC_KEY, false);
}
}
@@ -39,6 +39,7 @@ public interface DemoService {

Type enumlength(Type... types);


String getRemoteApplicationName();

String getAsyncResult();
}
@@ -70,9 +70,19 @@ public int stringLength(String str) {
return str.length();
}


@Override
public String getRemoteApplicationName() {
return RpcContext.getContext().getRemoteApplicationName();
}

@Override
public String getAsyncResult() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("getAsyncResult() Interrupted");
}
return "DONE";
}

}
@@ -34,13 +34,15 @@
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;
import static org.apache.dubbo.rpc.Constants.GENERIC_KEY;
import static org.apache.dubbo.rpc.Constants.LOCAL_PROTOCOL;
import static org.apache.dubbo.rpc.Constants.SCOPE_KEY;
import static org.apache.dubbo.rpc.Constants.SCOPE_LOCAL;
import static org.apache.dubbo.rpc.Constants.SCOPE_REMOTE;
import static org.apache.dubbo.rpc.Constants.GENERIC_KEY;
import static org.apache.dubbo.rpc.Constants.LOCAL_PROTOCOL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
@@ -136,4 +138,17 @@ public void testRemoteApplicationName() throws Exception {
assertEquals(service.getRemoteApplicationName(), "consumer");
}

@Test
public void testLocalProtocolAsync() throws Exception {
DemoService service = new DemoServiceImpl();
URL url = URL.valueOf("injvm://127.0.0.1/TestService")
.addParameter(ASYNC_KEY, true)
.addParameter(INTERFACE_KEY, DemoService.class.getName()).addParameter("application", "consumer");
Invoker<?> invoker = proxy.getInvoker(service, DemoService.class, url);
assertTrue(invoker.isAvailable());
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
service = proxy.getProxy(protocol.refer(DemoService.class, url));
assertNull(service.getAsyncResult());
}
}

0 comments on commit 87d4fa9

Please sign in to comment.