Skip to content

Commit

Permalink
Check if return AsyncRpcResult in Filter (#12467)
Browse files Browse the repository at this point in the history
* Check if return AsyncRpcResult in Filter

* Fix compatible
  • Loading branch information
AlbumenJ committed Jun 6, 2023
1 parent 2227f0b commit c8ef10e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
Expand Up @@ -21,6 +21,7 @@
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.BaseFilter;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
Expand All @@ -36,6 +37,7 @@
import java.util.stream.Collectors;

import static org.apache.dubbo.common.constants.LoggerCodeConstants.CLUSTER_EXECUTE_FILTER_EXCEPTION;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.INTERNAL_ERROR;
import static org.apache.dubbo.common.extension.ExtensionScope.APPLICATION;

@SPI(value = "default", scope = APPLICATION)
Expand Down Expand Up @@ -294,6 +296,7 @@ public boolean isDestroyed() {

@Experimental("Works for the same purpose as FilterChainNode, replace FilterChainNode with this one when proved stable enough")
class CopyOfFilterChainNode<T, TYPE extends Invoker<T>, FILTER extends BaseFilter> implements Invoker<T> {
private static final ErrorTypeAwareLogger LOGGER = LoggerFactory.getErrorTypeAwareLogger(CopyOfFilterChainNode.class);
TYPE originalInvoker;
Invoker<T> nextNode;
FILTER filter;
Expand Down Expand Up @@ -329,6 +332,12 @@ public Result invoke(Invocation invocation) throws RpcException {
try {
InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Filter " + filter.getClass().getName() + " invoke.");
asyncResult = filter.invoke(nextNode, invocation);
if (!(asyncResult instanceof AsyncRpcResult)) {
String msg = "The result of filter invocation must be AsyncRpcResult. (If you want to recreate a result, please use AsyncRpcResult.newDefaultAsyncResult.) " +
"Filter class: " + filter.getClass().getName() + ". Result class: " + asyncResult.getClass().getName() + ".";
LOGGER.error(INTERNAL_ERROR, "", "", msg);
throw new RpcException(msg);
}
} catch (Exception e) {
InvocationProfilerUtils.releaseDetailProfiler(invocation);
if (filter instanceof ListenableFilter) {
Expand Down
Expand Up @@ -35,7 +35,7 @@ default org.apache.dubbo.rpc.Result invoke(org.apache.dubbo.rpc.Invoker<?> invok
new Invocation.CompatibleInvocation(invocation));

if (invokeResult instanceof Result.CompatibleResult) {
return invokeResult;
return ((Result.CompatibleResult) invokeResult).getDelegate();
}

AsyncRpcResult asyncRpcResult = AsyncRpcResult.newDefaultAsyncResult(invocation);
Expand Down

0 comments on commit c8ef10e

Please sign in to comment.