Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the bug of broadcast cluster invoke #7174

Merged
merged 10 commits into from
Feb 22, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.Invocation;
Expand All @@ -26,15 +27,18 @@
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;

import java.text.Format;
import java.util.List;

/**
* BroadcastClusterInvoker
*
*/
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {

private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
private static final String BROADCAST_FAIL_PERCENT_KEY = "broadcast.fail.percent";
private static final int MAX_BROADCAST_FAIL_PERCENT = 100;
private static final int MIN_BROADCAST_FAIL_PERCENT = 0;

public BroadcastClusterInvoker(Directory<T> directory) {
super(directory);
Expand All @@ -47,21 +51,67 @@ public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, L
RpcContext.getContext().setInvokers((List) invokers);
RpcException exception = null;
Result result = null;
URL url = getUrl();
// The value range of broadcast.fail.threshold must be 0~100.
// 100 means that an exception will be thrown last, and 0 means that as long as an exception occurs, it will be thrown.
// see https://github.com/apache/dubbo/pull/7174
int broadcastFailPercent = url.getParameter(BROADCAST_FAIL_PERCENT_KEY, MAX_BROADCAST_FAIL_PERCENT);

if (broadcastFailPercent < MIN_BROADCAST_FAIL_PERCENT || broadcastFailPercent > MAX_BROADCAST_FAIL_PERCENT) {
logger.info(String.format("The value corresponding to the broadcast.fail.percent parameter must be between 0 and 100. " +
"The current setting is %s, which is reset to 100.", broadcastFailPercent));
broadcastFailPercent = MAX_BROADCAST_FAIL_PERCENT;
}

int failThresholdIndex = invokers.size() * broadcastFailPercent / MAX_BROADCAST_FAIL_PERCENT;
int failIndex = 0;
for (Invoker<T> invoker : invokers) {
try {
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
if (null != result && result.hasException()) {
Throwable resultException = result.getException();
if (null != resultException) {
exception = getRpcException(result.getException());
logger.warn(exception.getMessage(), exception);
if (failIndex == failThresholdIndex) {
break;
AlbumenJ marked this conversation as resolved.
Show resolved Hide resolved
} else {
failIndex++;
}
}
}
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
logger.warn(e.getMessage(), e);
exception = getRpcException(e);
logger.warn(exception.getMessage(), exception);
if (failIndex == failThresholdIndex) {
break;
} else {
failIndex++;
}
}
}

if (exception != null) {
if (failIndex == failThresholdIndex) {
logger.debug(
String.format("The number of BroadcastCluster call failures has reached the threshold %s", failThresholdIndex));
} else {
logger.debug(String.format("The number of BroadcastCluster call failures has not reached the threshold %s, fail size is %s",
failIndex));
}
throw exception;
}

return result;
}

private RpcException getRpcException(Throwable throwable) {
RpcException rpcException = null;
if (throwable instanceof RpcException) {
rpcException = (RpcException) throwable;
} else {
rpcException = new RpcException(throwable.getMessage(), throwable);
}
return rpcException;
}
}