-
Notifications
You must be signed in to change notification settings - Fork 26.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adaptive loadbalance #10745
Adaptive loadbalance #10745
Conversation
基准测试 启动参数:-Xms2048m -Xmx2048m -XX:+UseG1GC 之前写的基准测试有问题,拿到的都是同一LoadBalance,以下数据为更新后的。 Benchmark Mode Cnt Score Error Units
LoadBalance.adaptive thrpt 4 7156.037 ± 9787.735 ops/s
LoadBalance.random thrpt 4 6475.586 ± 2325.695 ops/s
LoadBalance.roundrobin thrpt 4 6498.221 ± 1064.650 ops/s
LoadBalance.shortestresponse thrpt 4 6407.356 ± 560.281 ops/s 3服务器,其中一台cpu 100% Benchmark Mode Cnt Score Error Units
LoadBalance.adaptive thrpt 4 15035.626 ± 2484.066 ops/s
LoadBalance.random thrpt 4 10191.162 ± 2346.887 ops/s
LoadBalance.roundrobin thrpt 4 10064.456 ± 4554.784 ops/s
LoadBalance.shortestresponse thrpt 4 10487.329 ± 2498.746 ops/s |
/** | ||
* uses a single worker thread operating off an bounded queue | ||
*/ | ||
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be lazy init or use dubbo shared executor
executor = new ThreadPoolExecutor(1, 1, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), | ||
new NamedInternalThreadFactory("Dubbo-framework-loadbalance-adaptive", true), new ThreadPoolExecutor.DiscardOldestPolicy()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should not create executor by default when user do not use this
executor = new ThreadPoolExecutor(1, 1, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), | ||
new NamedInternalThreadFactory("Dubbo-framework-loadbalance-adaptive", true), new ThreadPoolExecutor.DiscardOldestPolicy()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Register to org.apache.dubbo.common.resource.GlobalResourcesRepository#registerDisposable
*/ | ||
public class AdaptiveMetrics { | ||
|
||
private static final ConcurrentMap<String, AdaptiveMetrics> METRICS_STATISTICS = new ConcurrentHashMap<String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not use static field to store status in Dubbo. Use scope bean instead
* @see org.apache.dubbo.rpc.Filter | ||
* @see org.apache.dubbo.rpc.RpcContext | ||
*/ | ||
@Activate(group = CONSUMER, order = -200000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Activate(group = CONSUMER, order = -200000) | |
@Activate(group = CONSUMER, order = -200000, value = {"loadbalance:adaptive"}) |
private AdaptiveMetrics adaptiveMetrics; | ||
|
||
@Override | ||
public void setApplicationModel(ApplicationModel scopeModel) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace to constructor
public void setApplicationModel(ApplicationModel scopeModel) { | ||
AdaptiveMetrics bean = scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class); | ||
if (bean == null) { | ||
scopeModel.getBeanFactory().registerBean(new AdaptiveMetrics()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add into ScopeBeanInitializer
private AdaptiveMetrics getAdaptiveMetricsInstance(){ | ||
if (adaptiveMetrics == null) { | ||
adaptiveMetrics = scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class); | ||
} | ||
return adaptiveMetrics; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to constructor
@Override | ||
public void setApplicationModel(ApplicationModel scopeModel) { | ||
AdaptiveMetrics bean = scopeModel.getBeanFactory().getBean(AdaptiveMetrics.class); | ||
if (bean == null) { | ||
scopeModel.getBeanFactory().registerBean(new AdaptiveMetrics()); | ||
} | ||
this.scopeModel = scopeModel; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to constructor
if (bean == null) { | ||
scopeModel.getBeanFactory().registerBean(new AdaptiveMetrics()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use ScopeBeanInitializer
@Override | ||
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { | ||
Invoker invoker = selectByP2C(invokers,url,invocation); | ||
invocation.setAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY,attachmentKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use attribute
would better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to pass parameters to provider
private String buildServiceKey(Invoker<?> invoker,Invocation invocation){ | ||
URL url = invoker.getUrl(); | ||
return url.getAddress() + ":" + invocation.getProtocolServiceKey(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cache this in invocation attribute
URL url = invoker.getUrl(); | ||
Object countdown = RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY); | ||
int timeout; | ||
if (countdown == null) { | ||
timeout = (int) RpcUtils.getTimeout(url, invocation.getMethodName(), RpcContext.getClientAttachment(), DEFAULT_TIMEOUT); | ||
} else { | ||
TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown; | ||
timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS); | ||
} | ||
return timeout; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why read current invocation's timeout?
private <T> Invoker<T> chooseLowLoadInvoker(Invoker<T> invoker1,Invoker<T> invoker2,Invocation invocation){ | ||
int weight1 = getWeight(invoker1, invocation); | ||
int weight2 = getWeight(invoker2, invocation); | ||
int timeout1 = getTimeout(invoker2, invocation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there some spell mistakes?
* @see org.apache.dubbo.rpc.RpcContext | ||
*/ | ||
@Activate(group = CONSUMER, order = -200000, value = {"loadbalance:adaptive"}) | ||
public class AdaptiveLoadBalanceFilter implements ClusterFilter, ClusterFilter.Listener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this filter to implement Filter
would be better. The implementation are related with the actual provider address.
private String buildServiceKey(Invocation invocation){ | ||
StringBuilder sb = new StringBuilder(128); | ||
sb.append(invocation.getInvoker().getUrl().getAddress()).append(":").append(invocation.getProtocolServiceKey()); | ||
return sb.toString(); | ||
//return url.getAddress() + ProtocolUtils.serviceKey(url.getPort(), url.getPath(), url.getVersion(), url.getGroup()); | ||
} | ||
|
||
private String getServiceKey(Invocation invocation){ | ||
|
||
String key = (String) invocation.getAttributes().get(invocation.getInvoker()); | ||
if (StringUtils.isNotEmpty(key)){ | ||
return key; | ||
} | ||
|
||
key = buildServiceKey(invocation); | ||
invocation.getAttributes().put(invocation.getInvoker(),key); | ||
return key; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this key be reused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
used 2 times
Codecov Report
@@ Coverage Diff @@
## 3.2 #10745 +/- ##
============================================
- Coverage 64.90% 64.88% -0.02%
Complexity 14 14
============================================
Files 1468 1471 +3
Lines 61031 61245 +214
Branches 8944 8983 +39
============================================
+ Hits 39614 39741 +127
- Misses 17233 17296 +63
- Partials 4184 4208 +24
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
System.out.println(sumInvoker1); | ||
System.out.println(sumInvoker2); | ||
System.out.println(sumInvoker5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Print here is useless
int expectWeightValue = loop / totalWeight; | ||
int maxDeviation = expectWeightValue * 2; | ||
double beta = 0.5; | ||
//这个估算值并不准确 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment in English
System.out.println(sumInvoker1); | ||
System.out.println(sumInvoker2); | ||
System.out.println(sumInvoker3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Print is useless in unit test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Kudos, SonarCloud Quality Gate passed! |
What is the purpose of the change
issue #10571
adaptive loadbalance
Brief changelog
Verifying this change
Checklist