Skip to content

Commit

Permalink
fix the bug of broadcast cluster invoke (#7174)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoheng1 committed Feb 22, 2021
1 parent 9bb5d9e commit c24ccf0
Showing 1 changed file with 56 additions and 6 deletions.
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.Invocation;
Expand All @@ -26,15 +27,18 @@
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;

import java.text.Format;
import java.util.List;

/**
* BroadcastClusterInvoker
*
*/
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {

private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
private static final String BROADCAST_FAIL_PERCENT_KEY = "broadcast.fail.percent";
private static final int MAX_BROADCAST_FAIL_PERCENT = 100;
private static final int MIN_BROADCAST_FAIL_PERCENT = 0;

public BroadcastClusterInvoker(Directory<T> directory) {
super(directory);
Expand All @@ -47,21 +51,67 @@ public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, L
RpcContext.getContext().setInvokers((List) invokers);
RpcException exception = null;
Result result = null;
URL url = getUrl();
// The value range of broadcast.fail.threshold must be 0~100.
// 100 means that an exception will be thrown last, and 0 means that as long as an exception occurs, it will be thrown.
// see https://github.com/apache/dubbo/pull/7174
int broadcastFailPercent = url.getParameter(BROADCAST_FAIL_PERCENT_KEY, MAX_BROADCAST_FAIL_PERCENT);

if (broadcastFailPercent < MIN_BROADCAST_FAIL_PERCENT || broadcastFailPercent > MAX_BROADCAST_FAIL_PERCENT) {
logger.info(String.format("The value corresponding to the broadcast.fail.percent parameter must be between 0 and 100. " +
"The current setting is %s, which is reset to 100.", broadcastFailPercent));
broadcastFailPercent = MAX_BROADCAST_FAIL_PERCENT;
}

int failThresholdIndex = invokers.size() * broadcastFailPercent / MAX_BROADCAST_FAIL_PERCENT;
int failIndex = 0;
for (Invoker<T> invoker : invokers) {
try {
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
if (null != result && result.hasException()) {
Throwable resultException = result.getException();
if (null != resultException) {
exception = getRpcException(result.getException());
logger.warn(exception.getMessage(), exception);
if (failIndex == failThresholdIndex) {
break;
} else {
failIndex++;
}
}
}
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
logger.warn(e.getMessage(), e);
exception = getRpcException(e);
logger.warn(exception.getMessage(), exception);
if (failIndex == failThresholdIndex) {
break;
} else {
failIndex++;
}
}
}

if (exception != null) {
if (failIndex == failThresholdIndex) {
logger.debug(
String.format("The number of BroadcastCluster call failures has reached the threshold %s", failThresholdIndex));
} else {
logger.debug(String.format("The number of BroadcastCluster call failures has not reached the threshold %s, fail size is %s",
failIndex));
}
throw exception;
}

return result;
}

private RpcException getRpcException(Throwable throwable) {
RpcException rpcException = null;
if (throwable instanceof RpcException) {
rpcException = (RpcException) throwable;
} else {
rpcException = new RpcException(throwable.getMessage(), throwable);
}
return rpcException;
}
}

0 comments on commit c24ccf0

Please sign in to comment.