Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support deploy multiple shuffle servers in a single node #166

Merged
merged 6 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bin/start-shuffle-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ MAIN_CLASS="org.apache.uniffle.server.ShuffleServer"
HADOOP_DEPENDENCY="$("$HADOOP_HOME/bin/hadoop" classpath --glob)"

echo "Check process existence"
is_jvm_process_running "$JPS" $MAIN_CLASS
RPC_PORT=`grep '^rss.rpc.server.port' $CONF_FILE |awk '{print $2}'`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CONF_FILE should be SHUFFLE_SERVER_CONF_FILE. I will raise a pr to fix this.

is_port_in_use $RPC_PORT


CLASSPATH=""

Expand Down
8 changes: 8 additions & 0 deletions bin/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ function is_jvm_process_running {

}

function is_port_in_use {
local port=$1
local tmp=$(lsof -i:$port | grep LISTEN)
if [[ "$tmp" != "" ]]; then
echo "port[$port] is already in use"
exit 1
fi
}
#---
# load_rss_env: Export RSS environment variables
#---
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ASSGINMENT_HOST_STRATEGY;

public abstract class AbstractAssignmentStrategy implements AssignmentStrategy {
Copy link
Member

@zuston zuston Aug 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need this? If u want to deploy multiple shuffle servers on one machine and hope avoid partitions assigned to same host. I think you could implement custom assignment strategy. There is no need to change default strategy.

Right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most cases need this logic, and strategys may have many same logics in the future.

protected final CoordinatorConf conf;
private final HostAssignmentStrategy assignmentHostStrategy;

public AbstractAssignmentStrategy(CoordinatorConf conf) {
this.conf = conf;
assignmentHostStrategy = conf.get(COORDINATOR_ASSGINMENT_HOST_STRATEGY);
}

protected List<ServerNode> getCandidateNodes(List<ServerNode> allNodes, int expectNum) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a scheduler, we usually provide an option for users to choose whether to allocate on a single node. Because there are enough nodes to choose sometimes. To solve this problems, scheduler usually provide three semantics.
First, don't consider this factor. When we can assign the servers, we don't consider whether there are partitions on the same node.
Second, prefer considering this factor. When we can assign the servers, we try our best to avoid the partitions on the same node. But if we don't have enough the servers, we could assign the same node to the partitions.
Third, must considering this factor. We must avoid the partitions on the same node. If we don't have enough servers, we return the servers directly.
You don't need to implement the every semantics, but we hope we can implement the other semantics easily in the future when there are users which need the semantics , so should we need some config options or implement abstraction?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

switch (assignmentHostStrategy) {
case MUST_DIFF: return getCandidateNodesWithDiffHost(allNodes, expectNum);
case PREFER_DIFF: return tryGetCandidateNodesWithDiffHost(allNodes, expectNum);
case NONE: return allNodes.subList(0, expectNum);
default: throw new RuntimeException("Unsupported host assignment strategy:" + assignmentHostStrategy);
}
}

protected List<ServerNode> tryGetCandidateNodesWithDiffHost(List<ServerNode> allNodes, int expectNum) {
List<ServerNode> candidatesNodes = getCandidateNodesWithDiffHost(allNodes, expectNum);
Set<ServerNode> candidatesNodeSet = candidatesNodes.stream().collect(Collectors.toSet());
if (candidatesNodes.size() < expectNum) {
for (ServerNode node : allNodes) {
if (candidatesNodeSet.contains(node)) {
continue;
}
candidatesNodes.add(node);
if (candidatesNodes.size() >= expectNum) {
break;
}
}
}
return candidatesNodes;
}

protected List<ServerNode> getCandidateNodesWithDiffHost(List<ServerNode> allNodes, int expectNum) {
List<ServerNode> candidatesNodes = new ArrayList<>();
Set<String> hostIpCandidate = new HashSet<>();
for (ServerNode node : allNodes) {
if (hostIpCandidate.contains(node.getIp())) {
continue;
}
hostIpCandidate.add(node.getIp());
candidatesNodes.add(node);
if (candidatesNodes.size() >= expectNum) {
break;
}
}
return candidatesNodes;
}


public enum HostAssignmentStrategy {
MUST_DIFF,
PREFER_DIFF,
NONE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public AssignmentStrategyFactory(CoordinatorConf conf, ClusterManager clusterMan
public AssignmentStrategy getAssignmentStrategy() {
StrategyName strategy = conf.get(CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY);
if (StrategyName.BASIC == strategy) {
return new BasicAssignmentStrategy(clusterManager);
return new BasicAssignmentStrategy(clusterManager, conf);
} else if (StrategyName.PARTITION_BALANCE == strategy) {
return new PartitionBalanceAssignmentStrategy(clusterManager);
return new PartitionBalanceAssignmentStrategy(clusterManager, conf);
} else {
throw new UnsupportedOperationException("Unsupported assignment strategy.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@

import org.apache.uniffle.common.PartitionRange;

public class BasicAssignmentStrategy implements AssignmentStrategy {
public class BasicAssignmentStrategy extends AbstractAssignmentStrategy {

private static final Logger LOG = LoggerFactory.getLogger(BasicAssignmentStrategy.class);

private ClusterManager clusterManager;

public BasicAssignmentStrategy(ClusterManager clusterManager) {
public BasicAssignmentStrategy(ClusterManager clusterManager, CoordinatorConf conf) {
super(conf);
this.clusterManager = clusterManager;
}

Expand Down Expand Up @@ -81,6 +82,7 @@ private List<ServerNode> getRequiredServers(Set<String> requiredTags, int expect
LOG.warn("Can't get expected servers [" + expectedNum + "] and found only [" + servers.size() + "]");
return servers;
}
return servers.subList(0, expectedNum);

return getCandidateNodes(servers, expectedNum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ public class CoordinatorConf extends RssBaseConf {
.intType()
.defaultValue(3)
.withDescription("The number of times to read and write HDFS files");
public static final ConfigOption<AbstractAssignmentStrategy.HostAssignmentStrategy>
COORDINATOR_ASSGINMENT_HOST_STRATEGY =
ConfigOptions.key("rss.coordinator.assignment.host.strategy")
.enumType(AbstractAssignmentStrategy.HostAssignmentStrategy.class)
.defaultValue(AbstractAssignmentStrategy.HostAssignmentStrategy.PREFER_DIFF)
.withDescription("Strategy for selecting shuffle servers");

public CoordinatorConf() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@
* ....
**/

public class PartitionBalanceAssignmentStrategy implements AssignmentStrategy {
public class PartitionBalanceAssignmentStrategy extends AbstractAssignmentStrategy {

private static final Logger LOG = LoggerFactory.getLogger(PartitionBalanceAssignmentStrategy.class);

private ClusterManager clusterManager;
private Map<ServerNode, PartitionAssignmentInfo> serverToPartitions = Maps.newConcurrentMap();

public PartitionBalanceAssignmentStrategy(ClusterManager clusterManager) {
public PartitionBalanceAssignmentStrategy(ClusterManager clusterManager, CoordinatorConf conf) {
super(conf);
this.clusterManager = clusterManager;
}

Expand Down Expand Up @@ -119,7 +120,7 @@ public int compare(ServerNode o1, ServerNode o2) {
expectNum = nodes.size();
}

List<ServerNode> candidatesNodes = nodes.subList(0, expectNum);
List<ServerNode> candidatesNodes = getCandidateNodes(nodes, expectNum);
int idx = 0;
List<PartitionRange> ranges = CoordinatorUtils.generateRanges(totalPartitionNum, 1);
for (PartitionRange range : ranges) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void setUp() throws Exception {
CoordinatorConf ssc = new CoordinatorConf();
ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, shuffleNodesMax);
clusterManager = new SimpleClusterManager(ssc, new Configuration());
strategy = new BasicAssignmentStrategy(clusterManager);
strategy = new BasicAssignmentStrategy(clusterManager, ssc);
}

@AfterEach
Expand All @@ -63,7 +63,7 @@ public void tearDown() throws IOException {
@Test
public void testAssign() {
for (int i = 0; i < 20; ++i) {
clusterManager.add(new ServerNode(String.valueOf(i), "", 0, 0, 0,
clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 0, 0,
20 - i, 0, tags, true));
}

Expand All @@ -90,7 +90,7 @@ public void testAssign() {
@Test
public void testRandomAssign() {
for (int i = 0; i < 20; ++i) {
clusterManager.add(new ServerNode(String.valueOf(i), "", 0, 0, 0,
clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 0, 0,
0, 0, tags, true));
}
PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1);
Expand Down Expand Up @@ -156,7 +156,7 @@ public void testAssignmentShuffleNodesNum() {
Set<String> serverTags = Sets.newHashSet("tag-1");

for (int i = 0; i < 20; ++i) {
clusterManager.add(new ServerNode("t1-" + i, "", 0, 0, 0,
clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0,
20 - i, 0, serverTags, true));
}

Expand Down
Loading