-
Notifications
You must be signed in to change notification settings - Fork 26.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adaptive loadbalance #10745
Adaptive loadbalance #10745
Changes from 9 commits
6e84c8c
90da4de
f7b1150
897bf0b
c355e39
affa7eb
b27cf64
3effb4e
9a4e917
9bd85ee
8c43e89
c4a8fd1
4705938
6431161
bb558b3
a6fcdcd
260b449
5ea3d84
5dd82ea
2d46bb8
8547ad8
bc0c753
b0ffe9c
a65dbde
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.dubbo.rpc.cluster.filter.support; | ||
|
||
import org.apache.dubbo.common.extension.Activate; | ||
import org.apache.dubbo.common.profiler.Profiler; | ||
import org.apache.dubbo.common.profiler.ProfilerEntry; | ||
import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory; | ||
import org.apache.dubbo.common.utils.StringUtils; | ||
import org.apache.dubbo.rpc.AdaptiveMetrics; | ||
import org.apache.dubbo.rpc.Constants; | ||
import org.apache.dubbo.rpc.Invocation; | ||
import org.apache.dubbo.rpc.Invoker; | ||
import org.apache.dubbo.rpc.Result; | ||
import org.apache.dubbo.rpc.RpcException; | ||
import org.apache.dubbo.rpc.cluster.filter.ClusterFilter; | ||
import org.apache.dubbo.rpc.cluster.loadbalance.AdaptiveLoadBalance; | ||
import org.apache.dubbo.rpc.model.ApplicationModel; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER; | ||
import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY; | ||
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN; | ||
|
||
/** | ||
* if the load balance is adaptive ,set attachment to get the metrics of the server | ||
* @see org.apache.dubbo.rpc.Filter | ||
* @see org.apache.dubbo.rpc.RpcContext | ||
*/ | ||
@Activate(group = CONSUMER, order = -200000) | ||
public class AdaptiveLoadBalanceFilter implements ClusterFilter, ClusterFilter.Listener { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this filter to implement |
||
|
||
/** | ||
* uses a single worker thread operating off an bounded queue | ||
*/ | ||
private ThreadPoolExecutor executor; | ||
|
||
public AdaptiveLoadBalanceFilter(ApplicationModel applicationModel) { | ||
executor = new ThreadPoolExecutor(1, 1, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), | ||
new NamedInternalThreadFactory("Dubbo-framework-loadbalance-adaptive", true), new ThreadPoolExecutor.DiscardOldestPolicy()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should not create executor by default when user do not use this |
||
} | ||
|
||
@Override | ||
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { | ||
return invoker.invoke(invocation); | ||
} | ||
|
||
private String buildServiceKey(Invocation invocation){ | ||
return invocation.getInvoker().getUrl().getAddress() + ":" + invocation.getProtocolServiceKey(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. replace to StringBuilder and set initial size to improve performance |
||
//return url.getAddress() + ProtocolUtils.serviceKey(url.getPort(), url.getPath(), url.getVersion(), url.getGroup()); | ||
} | ||
|
||
@Override | ||
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) { | ||
|
||
try { | ||
if (StringUtils.isNotEmpty(invoker.getUrl().getParameter(LOADBALANCE_KEY)) | ||
&& AdaptiveLoadBalance.NAME.equals(invoker.getUrl().getParameter(LOADBALANCE_KEY))) { | ||
AdaptiveMetrics.addConsumerSuccess(buildServiceKey(invocation)); | ||
} | ||
String attachment = appResponse.getAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY); | ||
if (StringUtils.isNotEmpty(attachment)) { | ||
String[] parties = COMMA_SPLIT_PATTERN.split(attachment); | ||
if (parties.length == 0) { | ||
return; | ||
} | ||
Map<String, String> metricsMap = new HashMap<>(); | ||
for (String party : parties) { | ||
String[] groups = party.split(":"); | ||
if (groups.length != 2) { | ||
continue; | ||
} | ||
metricsMap.put(groups[0], groups[1]); | ||
} | ||
ProfilerEntry profilerEntry = (ProfilerEntry) invocation.getAttributes().get(Profiler.PROFILER_KEY); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. handle NPE if |
||
long rt = (System.nanoTime() - profilerEntry.getStartTime()) / 1000_000L; | ||
metricsMap.put("rt", String.valueOf(rt)); | ||
|
||
executor.execute(() -> { | ||
AdaptiveMetrics.setProviderMetrics(buildServiceKey(invocation), metricsMap); | ||
}); | ||
} | ||
} | ||
finally { | ||
appResponse.getAttachments().remove(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY); | ||
} | ||
|
||
} | ||
|
||
@Override | ||
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) { | ||
executor.execute(() -> { | ||
AdaptiveMetrics.addErrorReq(buildServiceKey(invocation)); | ||
}); | ||
} | ||
|
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.dubbo.rpc.cluster.loadbalance; | ||
|
||
import org.apache.dubbo.common.URL; | ||
import org.apache.dubbo.rpc.AdaptiveMetrics; | ||
import org.apache.dubbo.rpc.Constants; | ||
import org.apache.dubbo.rpc.Invocation; | ||
import org.apache.dubbo.rpc.Invoker; | ||
import org.apache.dubbo.rpc.RpcContext; | ||
import org.apache.dubbo.rpc.TimeoutCountDown; | ||
import org.apache.dubbo.rpc.model.ApplicationModel; | ||
import org.apache.dubbo.rpc.model.ScopeModelAware; | ||
import org.apache.dubbo.rpc.support.RpcUtils; | ||
|
||
import java.util.List; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY; | ||
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; | ||
|
||
/** | ||
* AdaptiveLoadBalance | ||
* </p> | ||
*/ | ||
public class AdaptiveLoadBalance extends AbstractLoadBalance implements ScopeModelAware { | ||
|
||
public static final String NAME = "adaptive"; | ||
|
||
//default key | ||
private String attachmentKey = "mem,load"; | ||
|
||
private final int default_timeout = 30_000; | ||
|
||
@Override | ||
public void setApplicationModel(ApplicationModel applicationModel) { | ||
} | ||
|
||
@Override | ||
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { | ||
Invoker invoker = selectByP2C(invokers,url,invocation); | ||
invocation.setAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY,attachmentKey); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to pass parameters to provider |
||
AdaptiveMetrics.addConsumerReq(buildServiceKey(invoker,invocation)); | ||
AdaptiveMetrics.setPickTime(buildServiceKey(invoker,invocation),System.currentTimeMillis()); | ||
|
||
return invoker; | ||
} | ||
|
||
private <T> Invoker<T> selectByP2C(List<Invoker<T>> invokers, URL url, Invocation invocation){ | ||
int length = invokers.size(); | ||
if(length == 1) { | ||
return invokers.get(0); | ||
} | ||
|
||
if(length == 2) { | ||
return chooseLowLoadInvoker(invokers.get(0),invokers.get(1),invocation); | ||
} | ||
|
||
int pos1 = ThreadLocalRandom.current().nextInt(length); | ||
int pos2 = ThreadLocalRandom.current().nextInt(length); | ||
while(pos1 == pos2) { | ||
pos2 = ThreadLocalRandom.current().nextInt(length); | ||
} | ||
|
||
return chooseLowLoadInvoker(invokers.get(pos1),invokers.get(pos2),invocation); | ||
} | ||
|
||
private String buildServiceKey(Invoker<?> invoker,Invocation invocation){ | ||
URL url = invoker.getUrl(); | ||
return url.getAddress() + ":" + invocation.getProtocolServiceKey(); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cache this in invocation attribute |
||
|
||
private int getTimeout(Invoker<?> invoker, Invocation invocation) { | ||
URL url = invoker.getUrl(); | ||
Object countdown = RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY); | ||
int timeout; | ||
if (countdown == null) { | ||
timeout = (int) RpcUtils.getTimeout(url, invocation.getMethodName(), RpcContext.getClientAttachment(), DEFAULT_TIMEOUT); | ||
} else { | ||
TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown; | ||
timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS); | ||
} | ||
return timeout; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why read current invocation's timeout? |
||
} | ||
|
||
private <T> Invoker<T> chooseLowLoadInvoker(Invoker<T> invoker1,Invoker<T> invoker2,Invocation invocation){ | ||
int weight1 = getWeight(invoker1, invocation); | ||
int weight2 = getWeight(invoker2, invocation); | ||
int timeout1 = getTimeout(invoker2, invocation); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there some spell mistakes? |
||
int timeout2 = getTimeout(invoker2, invocation); | ||
long load1 = Double.doubleToLongBits(AdaptiveMetrics.getLoad(buildServiceKey(invoker1,invocation),weight1,timeout1 )); | ||
long load2 = Double.doubleToLongBits(AdaptiveMetrics.getLoad(buildServiceKey(invoker2,invocation),weight2,timeout2 )); | ||
|
||
if (load1 == load2) { | ||
// The sum of weights | ||
int totalWeight = weight1 + weight2; | ||
if (totalWeight > 0) { | ||
int offset = ThreadLocalRandom.current().nextInt(totalWeight); | ||
if (offset < weight1) { | ||
return invoker1; | ||
} | ||
return invoker2; | ||
} | ||
return ThreadLocalRandom.current().nextInt(2) == 0 ? invoker1 : invoker2; | ||
} | ||
return load1 > load2 ? invoker2 : invoker1; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
consumercontext=org.apache.dubbo.rpc.cluster.filter.support.ConsumerContextFilter | ||
router-snapshot=org.apache.dubbo.rpc.cluster.router.RouterSnapshotFilter | ||
adaptiveLoadBalance=org.apache.dubbo.rpc.cluster.filter.support.AdaptiveLoadBalanceFilter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.