-
Notifications
You must be signed in to change notification settings - Fork 139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support deploy multiple shuffle servers in a single node #166
Changes from all commits
efd3e29
abe704f
21b20ea
ca76d85
54760f6
2b74e85
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.uniffle.coordinator; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ASSGINMENT_HOST_STRATEGY; | ||
|
||
public abstract class AbstractAssignmentStrategy implements AssignmentStrategy { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why we need this? If u want to deploy multiple shuffle servers on one machine and hope avoid partitions assigned to same host. I think you could implement custom assignment strategy. There is no need to change default strategy. Right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most cases need this logic, and strategys may have many same logics in the future. |
||
protected final CoordinatorConf conf; | ||
private final HostAssignmentStrategy assignmentHostStrategy; | ||
|
||
public AbstractAssignmentStrategy(CoordinatorConf conf) { | ||
this.conf = conf; | ||
assignmentHostStrategy = conf.get(COORDINATOR_ASSGINMENT_HOST_STRATEGY); | ||
} | ||
|
||
protected List<ServerNode> getCandidateNodes(List<ServerNode> allNodes, int expectNum) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For a scheduler, we usually provide an option for users to choose whether to allocate on a single node. Because there are enough nodes to choose sometimes. To solve this problems, scheduler usually provide three semantics. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
switch (assignmentHostStrategy) { | ||
case MUST_DIFF: return getCandidateNodesWithDiffHost(allNodes, expectNum); | ||
case PREFER_DIFF: return tryGetCandidateNodesWithDiffHost(allNodes, expectNum); | ||
case NONE: return allNodes.subList(0, expectNum); | ||
default: throw new RuntimeException("Unsupported host assignment strategy:" + assignmentHostStrategy); | ||
} | ||
} | ||
|
||
protected List<ServerNode> tryGetCandidateNodesWithDiffHost(List<ServerNode> allNodes, int expectNum) { | ||
List<ServerNode> candidatesNodes = getCandidateNodesWithDiffHost(allNodes, expectNum); | ||
Set<ServerNode> candidatesNodeSet = candidatesNodes.stream().collect(Collectors.toSet()); | ||
if (candidatesNodes.size() < expectNum) { | ||
for (ServerNode node : allNodes) { | ||
if (candidatesNodeSet.contains(node)) { | ||
continue; | ||
} | ||
candidatesNodes.add(node); | ||
if (candidatesNodes.size() >= expectNum) { | ||
break; | ||
} | ||
} | ||
} | ||
return candidatesNodes; | ||
} | ||
|
||
protected List<ServerNode> getCandidateNodesWithDiffHost(List<ServerNode> allNodes, int expectNum) { | ||
List<ServerNode> candidatesNodes = new ArrayList<>(); | ||
Set<String> hostIpCandidate = new HashSet<>(); | ||
for (ServerNode node : allNodes) { | ||
if (hostIpCandidate.contains(node.getIp())) { | ||
continue; | ||
} | ||
hostIpCandidate.add(node.getIp()); | ||
candidatesNodes.add(node); | ||
if (candidatesNodes.size() >= expectNum) { | ||
break; | ||
} | ||
} | ||
return candidatesNodes; | ||
} | ||
|
||
|
||
public enum HostAssignmentStrategy { | ||
MUST_DIFF, | ||
PREFER_DIFF, | ||
NONE | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CONF_FILE
should beSHUFFLE_SERVER_CONF_FILE
. I will raise a pr to fix this.