Skip to content

Commit aa176c4

Browse files
author
zhuqi
committed
YARN-10738: When multi thread scheduling with multi node, we should shuffle to prevent hot accessing nodes.
1 parent f0241ec commit aa176c4

File tree

2 files changed

+149
-1
lines changed

2 files changed

+149
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
20+
21+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
22+
23+
import java.util.Comparator;
24+
import java.util.Collection;
25+
import java.util.Collections;
26+
import java.util.Iterator;
27+
import java.util.Map;
28+
import java.util.Set;
29+
import java.util.List;
30+
import java.util.ArrayList;
31+
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.ConcurrentSkipListSet;
33+
34+
/**
35+
* <p>
36+
* This class has the following functionality:
37+
*
38+
* <p>
39+
* ResourceUsageWithPartialShuffleMultiNodeLookupPolicy
40+
* holds sorted nodes list based on the
41+
* resource usage of nodes at given time.
42+
* Also inorder to prevent hot accessing node with multi-thread scheduling,
43+
* we add the partial shuffle with SHUFFLE_INTERVAL default is 10.
44+
* Se details YARN-10738.
45+
* </p>
46+
*/
47+
public class ResourceUsageWithPartialShuffleMultiNodeLookupPolicy
48+
<N extends SchedulerNode> implements MultiNodeLookupPolicy<N> {
49+
50+
private Map<String, Set<N>> nodesPerPartition = new ConcurrentHashMap<>();
51+
private Comparator<N> comparator;
52+
// Shuffle interval(the shuffle size of every shuffle).
53+
private static final int SHUFFLE_INTERVAL = 10;
54+
55+
public ResourceUsageWithPartialShuffleMultiNodeLookupPolicy() {
56+
this.comparator = new Comparator<N>() {
57+
@Override
58+
public int compare(N o1, N o2) {
59+
int allocatedDiff = o1.getAllocatedResource()
60+
.compareTo(o2.getAllocatedResource());
61+
if (allocatedDiff == 0) {
62+
return o1.getNodeID().compareTo(o2.getNodeID());
63+
}
64+
return allocatedDiff;
65+
}
66+
};
67+
}
68+
69+
@Override
70+
public Iterator<N> getPreferredNodeIterator(Collection<N> nodes,
71+
String partition) {
72+
Iterator<N> beforePartialShuffle =
73+
getNodesPerPartition(partition).iterator();
74+
int counter = 0;
75+
List<N> list = new ArrayList<N>();
76+
while(beforePartialShuffle.hasNext()) {
77+
list.add(beforePartialShuffle.next());
78+
// Every shuffle interval(the shuffle size of every shuffle),
79+
// we should shuffle to prevent
80+
// hot accessing node when multi scheduling.
81+
// It's very important for big clusters.
82+
if (counter > 0 && counter % SHUFFLE_INTERVAL == 0) {
83+
Collections.
84+
shuffle(list.subList(counter -10, counter));
85+
}
86+
++counter;
87+
}
88+
return list.iterator();
89+
}
90+
91+
@Override
92+
public void addAndRefreshNodesSet(Collection<N> nodes,
93+
String partition) {
94+
Set<N> nodeList = new ConcurrentSkipListSet<N>(comparator);
95+
nodeList.addAll(nodes);
96+
nodesPerPartition.put(partition, Collections.unmodifiableSet(nodeList));
97+
}
98+
99+
@Override
100+
public Set<N> getNodesPerPartition(String partition) {
101+
return nodesPerPartition.getOrDefault(partition, Collections.emptySet());
102+
}
103+
104+
public Map<String, Set<N>> getNodesPerPartition() {
105+
return nodesPerPartition;
106+
}
107+
108+
public Comparator<N> getComparator() {
109+
return comparator;
110+
}
111+
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
6060
private CapacitySchedulerConfiguration conf;
6161
private static final String POLICY_CLASS_NAME =
6262
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy";
63-
63+
private static final String POLICY_CLASS_NAME_WITH_SHUFFLE =
64+
"org.apache.hadoop.yarn.server.resourcemanager." +
65+
"scheduler.placement." +
66+
"ResourceUsageWithPartialShuffleMultiNodeLookupPolicy";
6467
@Before
6568
public void setUp() {
6669
CapacitySchedulerConfiguration config =
@@ -105,6 +108,40 @@ public void testMultiNodeSorterForScheduling() throws Exception {
105108
rm.stop();
106109
}
107110

111+
@Test
112+
public void testResourceUsageWithPartialShuffleMultiNodeLookupPolicy()
113+
throws Exception {
114+
String policyName =
115+
CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
116+
+ ".resource-based" + ".class";
117+
conf.set(policyName, POLICY_CLASS_NAME_WITH_SHUFFLE);
118+
MockRM rm = new MockRM(conf);
119+
rm.start();
120+
for (int i = 0; i < 1000; ++i) {
121+
rm.registerNode("127.0.0.1:" + i, 10 * GB);
122+
}
123+
ResourceScheduler scheduler = rm.getRMContext().getScheduler();
124+
waitforNMRegistered(scheduler, 1000, 5);
125+
MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext()
126+
.getMultiNodeSortingManager();
127+
MultiNodeSorter<SchedulerNode> sorter = mns
128+
.getMultiNodePolicy(POLICY_CLASS_NAME_WITH_SHUFFLE);
129+
sorter.reSortClusterNodes();
130+
Set<SchedulerNode> nodes = sorter.getMultiNodeLookupPolicy()
131+
.getNodesPerPartition("");
132+
Assert.assertEquals(1000, nodes.size());
133+
134+
Iterator<SchedulerNode> list = sorter.getMultiNodeLookupPolicy().
135+
getPreferredNodeIterator(null, "");
136+
int count = 0;
137+
while (list.hasNext()) {
138+
list.next();
139+
++count;
140+
}
141+
Assert.assertEquals(1000, count);
142+
rm.stop();
143+
}
144+
108145
@Test
109146
public void testMultiNodeSorterForSchedulingWithOrdering() throws Exception {
110147
MockRM rm = new MockRM(conf);

0 commit comments

Comments
 (0)