Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #604]Improve the rebalance algorithm #605

Merged
merged 32 commits into from Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b81d393
modify:optimize flow control in downstreaming msg
lrhkobe May 15, 2021
3f2f8b5
modify:optimize stategy of selecting session in downstream msg
lrhkobe May 17, 2021
97c6a3f
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe May 26, 2021
fda8068
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe May 28, 2021
fc3686b
modify:optimize msg downstream,msg store in session
lrhkobe May 28, 2021
cb3e551
modify:fix bug:not a @Sharable handler
lrhkobe May 28, 2021
2568539
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe May 31, 2021
4218b4f
modify:downstream broadcast msg asynchronously
lrhkobe Jun 1, 2021
8c69ea5
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Jun 16, 2021
23a7175
modify:remove unneccessary interface in eventmesh-connector-api
lrhkobe Jun 17, 2021
a478849
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Jun 21, 2021
9c7fcbc
modify:fix conflict
lrhkobe Jun 21, 2021
4ee468b
modify:add license in EventMeshAction
lrhkobe Jun 21, 2021
8936bd2
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Jun 24, 2021
fea8a57
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Jul 23, 2021
368c478
modify:fix ack problem
lrhkobe Jul 23, 2021
20efba7
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Jul 29, 2021
35a73b3
modify:fix exception handle when exception occured in EventMeshTcpMes…
lrhkobe Jul 29, 2021
7594503
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Jul 29, 2021
5e3f88f
modify:fix log print
lrhkobe Aug 2, 2021
e94d7c0
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Aug 3, 2021
2bea098
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Aug 5, 2021
f749313
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Aug 10, 2021
6aab0e9
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Aug 13, 2021
ec8427f
modify: fix issue#496,ClassCastException
lrhkobe Aug 13, 2021
4ebfc71
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Aug 13, 2021
c656328
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Aug 16, 2021
6bfa40b
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Nov 22, 2021
add0a57
modify: improve rebalance algorithm
lrhkobe Nov 22, 2021
1e772e9
modify: fix checkstyle fail for line length
lrhkobe Nov 23, 2021
aba9136
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Dec 14, 2021
2455bb9
Merge branch 'develop' of github.com:apache/incubator-eventmesh into …
lrhkobe Dec 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -97,59 +97,108 @@ private Map<String, String> queryLocalEventMeshMap(String cluster){
}

private void doRebalanceByGroup(String cluster, String group, String purpose, Map<String, String> eventMeshMap) throws Exception{
logger.info("doRebalanceByGroup start, cluster:{}, group:{}, purpose:{}", cluster, group, purpose);

//query distribute data of loacl idc
Map<String, Integer> clientDistributionMap = queryLocalEventMeshDistributeData(cluster, group, purpose, eventMeshMap);
if(clientDistributionMap == null || clientDistributionMap.size() == 0){
return;
}

doRebalanceRedirect(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName, group, purpose, eventMeshMap, clientDistributionMap);
logger.info("doRebalanceByGroup end, cluster:{}, group:{}, purpose:{}", cluster, group, purpose);

}

private void doRebalanceRedirect(String currEventMeshName, String group, String purpose, Map<String, String> eventMeshMap, Map<String, Integer> clientDistributionMap)throws Exception{
if(clientDistributionMap == null || clientDistributionMap.size() == 0){
return;
}

//caculate client num need to redirect in currEventMesh
int judge = caculateRedirectNum(currEventMeshName, group, purpose, clientDistributionMap);

if(judge > 0) {

//select redirect target eventmesh lisg
List<String> eventMeshRecommendResult = selectRedirectEventMesh(group, eventMeshMap, clientDistributionMap, judge, currEventMeshName);
if(eventMeshRecommendResult == null || eventMeshRecommendResult.size() != judge){
logger.warn("doRebalance failed,recommendEventMeshNum is not consistent,recommendResult:{},judge:{}", eventMeshRecommendResult, judge);
return;
}

//do redirect
doRedirect(group, purpose, judge, eventMeshRecommendResult);
}else{
logger.info("rebalance condition not satisfy,group:{}, purpose:{},judge:{}", group, purpose, judge);
}
}

private void doRedirect(String group, String purpose, int judge, List<String> eventMeshRecommendResult) throws Exception{
logger.info("doRebalance redirect start---------------------group:{},judge:{}", group, judge);
Set<Session> sessionSet = null;
if(EventMeshConstants.PURPOSE_SUB.equals(purpose)) {
sessionSet = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().get(group).getGroupConsumerSessions();
}else if(EventMeshConstants.PURPOSE_PUB.equals(purpose)){
sessionSet = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().get(group).getGroupProducerSessions();
}else{
logger.warn("doRebalance failed,param is illegal, group:{}, purpose:{}",group, purpose);
return;
}
List<Session> sessionList = new ArrayList<>(sessionSet);
Collections.shuffle(new ArrayList<>(sessionList));

for(int i= 0; i<judge; i++){
//String redirectSessionAddr = ProxyTcp2Client.redirectClientForRebalance(sessionList.get(i), eventMeshTCPServer.getClientSessionGroupMapping());
String newProxyIp = eventMeshRecommendResult.get(i).split(":")[0];
String newProxyPort = eventMeshRecommendResult.get(i).split(":")[1];
String redirectSessionAddr = EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer,newProxyIp,Integer.valueOf(newProxyPort),sessionList.get(i), eventMeshTCPServer.getClientSessionGroupMapping());
logger.info("doRebalance,redirect sessionAddr:{}", redirectSessionAddr);
try {
Thread.sleep(eventMeshTCPServer.getEventMeshTCPConfiguration().sleepIntervalInRebalanceRedirectMills);
} catch (InterruptedException e) {
logger.warn("Thread.sleep occur InterruptedException", e);
}
}
logger.info("doRebalance redirect end---------------------group:{}", group);
}

private List<String> selectRedirectEventMesh(String group, Map<String, String> eventMeshMap, Map<String, Integer> clientDistributionMap, int judge, String evenMeshName)throws Exception{
EventMeshRecommendStrategy eventMeshRecommendStrategy = new EventMeshRecommendImpl(eventMeshTCPServer);
return eventMeshRecommendStrategy.calculateRedirectRecommendEventMesh(eventMeshMap, clientDistributionMap, group, judge, evenMeshName);
}

public int caculateRedirectNum(String eventMeshName, String group, String purpose, Map<String, Integer> clientDistributionMap) throws Exception{
int sum = 0;
for(Integer item : clientDistributionMap.values()){
sum += item.intValue();
}
int currentNum = 0;
if(clientDistributionMap.get(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName) != null){
currentNum = clientDistributionMap.get(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName);
if(clientDistributionMap.get(eventMeshName) != null){
currentNum = clientDistributionMap.get(eventMeshName);
}
int avgNum = sum / clientDistributionMap.size();
int judge = avgNum >= 2 ? avgNum/2 : 1;

if(currentNum - avgNum > judge) {
Set<Session> sessionSet = null;
if(EventMeshConstants.PURPOSE_PUB.equals(purpose)){
sessionSet = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().get(group).getGroupProducerSessions();
}else if(EventMeshConstants.PURPOSE_SUB.equals(purpose)){
sessionSet = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().get(group).getGroupConsumerSessions();
}else{
logger.warn("doRebalance failed,purpose is not support,purpose:{}", purpose);
return;
int modNum = sum % clientDistributionMap.size();

List<String> eventMeshList = new ArrayList<>(clientDistributionMap.keySet());
Collections.sort(eventMeshList);
int index = -1;
for(int i=0; i < Math.min(modNum, eventMeshList.size()); i++){
if(StringUtils.equals(eventMeshName, eventMeshList.get(i))){
index = i;
break;
}

List<Session> sessionList = new ArrayList<>(sessionSet);
Collections.shuffle(new ArrayList<>(sessionList));
EventMeshRecommendStrategy eventMeshRecommendStrategy = new EventMeshRecommendImpl(eventMeshTCPServer);
List<String> eventMeshRecommendResult = eventMeshRecommendStrategy.calculateRedirectRecommendEventMesh(eventMeshMap, clientDistributionMap, group, judge);
if(eventMeshRecommendResult == null || eventMeshRecommendResult.size() != judge){
logger.warn("doRebalance failed,recommendProxyNum is not consistent,recommendResult:{},judge:{}", eventMeshRecommendResult, judge);
return;
}
logger.info("doRebalance redirect start---------------------group:{},purpose:{},judge:{}", group, purpose, judge);
for(int i= 0; i<judge; i++){
//String redirectSessionAddr = ProxyTcp2Client.redirectClientForRebalance(sessionList.get(i), eventMeshTCPServer.getClientSessionGroupMapping());
String newProxyIp = eventMeshRecommendResult.get(i).split(":")[0];
String newProxyPort = eventMeshRecommendResult.get(i).split(":")[1];
String redirectSessionAddr = EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer,newProxyIp,Integer.valueOf(newProxyPort),sessionList.get(i), eventMeshTCPServer.getClientSessionGroupMapping());
logger.info("doRebalance,redirect sessionAddr:{}", redirectSessionAddr);
try {
Thread.sleep(eventMeshTCPServer.getEventMeshTCPConfiguration().sleepIntervalInRebalanceRedirectMills);
} catch (InterruptedException e) {
logger.warn("Thread.sleep occur InterruptedException", e);
}
}
logger.info("doRebalance redirect end---------------------group:{}, purpose:{}", group, purpose);
}else{
logger.info("rebalance condition not satisfy,group:{},sum:{},currentNum:{},avgNum:{},judge:{}", group, sum, currentNum, avgNum, judge);
}
int rebalanceResult = 0;
if(avgNum == 0){
rebalanceResult = 1;
}else {
rebalanceResult = (modNum != 0 && index < modNum && index >= 0) ? avgNum + 1 : avgNum;
}
logger.info("rebalance caculateRedirectNum,group:{}, purpose:{},sum:{},avgNum:{}," +
"modNum:{}, index:{}, currentNum:{}, rebalanceResult:{}", group, purpose, sum,
avgNum, modNum, index, currentNum, rebalanceResult);
return currentNum - rebalanceResult;
}

private Map<String, Integer> queryLocalEventMeshDistributeData(String cluster, String group, String purpose, Map<String, String> eventMeshMap){
Expand Down Expand Up @@ -197,12 +246,4 @@ private Map<String, Integer> queryLocalEventMeshDistributeData(String cluster, S

return localEventMeshDistributeData;
}


private class ValueComparator implements Comparator<Map.Entry<String, Integer>> {
@Override
public int compare(Map.Entry<String, Integer> x, Map.Entry<String, Integer> y) {
return x.getValue().intValue() - y.getValue().intValue();
}
}
}
Expand Up @@ -90,8 +90,11 @@ public String calculateRecommendEventMesh(String group, String purpose) throws E
}

@Override
public List<String> calculateRedirectRecommendEventMesh(Map<String, String> eventMeshMap, Map<String, Integer> clientDistributeMap, String group, int recommendProxyNum) throws Exception {
logger.info("eventMeshMap:{},clientDistributionMap:{},group:{},recommendNum:{}", eventMeshMap,clientDistributeMap,group,recommendProxyNum);
public List<String> calculateRedirectRecommendEventMesh(Map<String, String> eventMeshMap, Map<String, Integer> clientDistributeMap, String group, int recommendProxyNum, String eventMeshName) throws Exception {
if(recommendProxyNum < 1){
return null;
}
logger.info("eventMeshMap:{},clientDistributionMap:{},group:{},recommendNum:{},currEventMeshName:{}", eventMeshMap,clientDistributeMap,group,recommendProxyNum, eventMeshName);
List<String> recommendProxyList = null;

//find eventmesh with least client
Expand All @@ -106,10 +109,10 @@ public List<String> calculateRedirectRecommendEventMesh(Map<String, String> even
recommendProxyList = new ArrayList<>(recommendProxyNum);
while(recommendProxyList.size() < recommendProxyNum){
Map.Entry<String, Integer> minProxyItem = list.get(0);
int currProxyNum = clientDistributeMap.get(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName);
int currProxyNum = clientDistributeMap.get(eventMeshName);
recommendProxyList.add(eventMeshMap.get(minProxyItem.getKey()));
clientDistributeMap.put(minProxyItem.getKey(),minProxyItem.getValue() + 1);
clientDistributeMap.put(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName,currProxyNum - 1);
clientDistributeMap.put(eventMeshName,currProxyNum - 1);
Collections.sort(list, vc);
logger.info("clientDistributionMap after sort:{}", list);
}
Expand Down
Expand Up @@ -22,5 +22,5 @@
public interface EventMeshRecommendStrategy {
String calculateRecommendEventMesh(String group, String purpose) throws Exception;

List<String> calculateRedirectRecommendEventMesh(Map<String, String> eventMeshMap, Map<String, Integer> clientDistributeMap, String group, int recommendNum) throws Exception;
List<String> calculateRedirectRecommendEventMesh(Map<String, String> eventMeshMap, Map<String, Integer> clientDistributeMap, String group, int recommendNum, String eventMeshName) throws Exception;
}
Expand Up @@ -23,6 +23,10 @@
public class ValueComparator implements Comparator<Map.Entry<String, Integer>> {
@Override
public int compare(Map.Entry<String, Integer> x, Map.Entry<String, Integer> y) {
return x.getValue().intValue() - y.getValue().intValue();
if(x.getValue().intValue() != y.getValue().intValue()){
return x.getValue().intValue() - y.getValue().intValue();
}else {
return x.getKey().compareTo(y.getKey());
}
}
}