/
default_allocator.py
138 lines (111 loc) · 4.97 KB
/
default_allocator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""Define the DefaultAllocator class."""
from __future__ import division, print_function
import warnings
import numpy as np
from six.moves import range
from openmdao.proc_allocators.proc_allocator import ProcAllocator, ProcAllocationError
from openmdao.utils.mpi import MPI
class DefaultAllocator(ProcAllocator):
"""
Default processor allocator.
"""
def _divide_procs(self, proc_info, comm):
"""
Perform the parallel processor allocation.
Parameters
----------
proc_info : list of (min_procs, max_procs, weight)
Information used to determine MPI process allocation to subsystems.
comm : MPI.Comm or <FakeComm>
communicator of the owning System.
Returns
-------
isubs : [int, ...]
indices of the owned local subsystems.
sub_comm : MPI.Comm or <FakeComm>
communicator to pass to the subsystems.
sub_proc_range : (int, int)
The range of processors that the subcomm owns, among those of comm.
"""
iproc = comm.rank
nproc = comm.size
nsubs = len(proc_info)
min_procs, max_procs, proc_weights = self._split_proc_info(proc_info, comm)
min_sum = np.sum(min_procs)
if np.sum(max_procs) < nproc:
raise ProcAllocationError("too many MPI procs allocated. Comm is size %d but "
"can only use %d." % (nproc, np.sum(max_procs)))
if min_sum > nproc and np.any(min_procs > 1):
raise ProcAllocationError("can't meet min_procs required because the sum of the "
"min procs required exceeds the procs allocated and the "
"min procs required is > 1",
np.array(list(range(nsubs)))[min_procs > 1])
# Define the normalized weights for all subsystems
proc_weights /= np.sum(proc_weights)
if min_sum > nproc:
isubs_list = [[] for ind in range(nproc)]
proc_load = np.zeros(nproc)
sub_sort_idxs = np.flipud(np.argsort(proc_weights))
vals = proc_weights
# Assign the slowest subsystem to the most free processor
for isub in sub_sort_idxs:
min_loads = np.argsort(proc_load)
for i in range(min_procs[isub]):
iproc1 = min_loads[i]
isubs_list[iproc1].append(isub)
proc_load[iproc1] += vals[isub]
# Result
sub_comm = comm.Split(iproc)
return isubs_list[iproc], sub_comm, [comm.rank, comm.rank + sub_comm.size]
num_procs = min_procs.copy()
if min_sum < nproc:
# weighted sums to nproc
weighted = proc_weights * nproc
# the number of procs expected beyond the min requested
weighted_less_min = weighted.astype(int) - min_procs
weighted_less_min[weighted_less_min < 0] = 0
if np.sum(weighted_less_min) + min_sum <= nproc:
# start with min procs then add what's left over using weights
num_procs += weighted_less_min
excess_idxs = (max_procs - num_procs) < 0
# limit all procs to their stated max
num_procs[excess_idxs] = max_procs[excess_idxs]
expected_total = np.sum(num_procs)
extras = nproc - expected_total
if extras > 0: # we have some extra procs lying around.
# give remaining procs such that after each addition we are closest to
# desired weights
newsum = expected_total
eye = np.eye(weighted.size)
weighted[:] = proc_weights
for i in range(extras):
mask = max_procs <= num_procs
weighted[mask] = 0.0
weighted *= (1. / np.sum(weighted))
newsum += 1
mat = eye + num_procs
mat *= (1. / newsum)
mat -= weighted
# prevent rows associated with the maxed out subsystems from having the
# smallest norm.
mat[mask] = 1e99
# zero out columns for maxed out subsystems
mat[:, mask] = 0.0
norm = np.linalg.norm(mat, axis=1)
# add a proc to a subsystem based on matching closest to desired weights for
# the remaining 'active' subsystems.
num_procs[np.argmin(norm)] += 1
# Compute the coloring
color = np.zeros(nproc, int)
start, end = 0, 0
for isub in range(nsubs):
end += num_procs[isub]
color[start:end] = isub
start += num_procs[isub]
isub = color[iproc]
# Result
isubs = [isub]
sub_comm = comm.Split(isub)
start = list(color).index(isub) # find lowest matching color
sub_proc_range = [start, start + sub_comm.size]
return isubs, sub_comm, sub_proc_range