-
Notifications
You must be signed in to change notification settings - Fork 588
/
chunk_balancer.py
290 lines (237 loc) · 12 KB
/
chunk_balancer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
import abc
import copy
from typing import Optional, Tuple, Union
import numpy as np
from meep.timing_measurements import MeepTimingMeasurements
import meep as mp
from meep import binary_partition_utils as bpu
class AbstractChunkBalancer(abc.ABC):
"""Abstract chunk balancer for adaptive chunk layouts in Meep simulations.
This class defines interfaces for a chunk balancer, which adjusts chunk
layouts to optimal load balancing. It provides two main functionalities:
1. Generating an initial chunk layout for the first iteration of an
optimization run, using a strategy other than the default chunk
partitioning routine in Meep
2. Adaptively modifying chunk layouts for subsequent iterations in an
optimization run by incorporating timing data from the previous
iteration(s) and resizing the chunks accordingly
Subclasses of this class can be passed as an option into optimization runs.
"""
def make_initial_chunk_layout(
self, sim: mp.Simulation
) -> Union[mp.BinaryPartition, None]:
"""Generates an initial chunk layout based on expected compute costs.
Args:
sim: the meep.Simulation object to generate a chunk layout for
Returns:
The chunk layout to be used by the simulation, or None, in which case
Meep will use its own default logic to compute the chunk layout (this is
the default behavior and can be overridden in child classes).
"""
del sim
return None
def adjust_chunk_layout(self, sim: mp.Simulation, **kwargs) -> None:
"""Computes a new chunk layout, applies it to sim, and resets/re-inits sim.
This method also calls self.should_adjust_chunk_layout(sim). If the current
chunk layout is sufficiently well-balanced, no changes will be made.
NOTE: This is a state-changing method which may reset the simulation.
Args:
sim: the meep.Simulation object with the chunk layout to be adjusted
**kwargs: extra args to be passed to self.compute_new_chunk_layout
Raises:
ValueError: if sim.chunk_layout includes nodes with duplicate proc_ids
ValueError: if sim.chunk_layout has proc_ids not included in
sim.structure.get_chunk_owners() (this could occur if number of
processes exceeds the number of physical cores)
"""
self._validate_sim(sim)
if self.should_adjust_chunk_layout(sim):
old_chunk_layout = sim.chunk_layout
chunk_volumes = sim.structure.get_chunk_volumes()
chunk_owners = sim.structure.get_chunk_owners()
timing_measurements = MeepTimingMeasurements.new_from_simulation(sim, -1)
new_chunk_layout = self.compute_new_chunk_layout(
timing_measurements,
old_chunk_layout,
chunk_volumes,
chunk_owners,
**kwargs,
)
sim.reset_meep()
sim.chunk_layout = new_chunk_layout
sim.init_sim()
@abc.abstractmethod
def compute_new_chunk_layout(
self,
timing_measurements: MeepTimingMeasurements,
old_partition: mp.BinaryPartition,
chunk_volumes: Tuple[mp.grid_volume],
chunk_owners: np.ndarray,
) -> mp.BinaryPartition:
"""Rebalances the partition to equalize simulation time for node's children.
Args:
timing_measurements: elapsed time by category from previous iteration
old_partition: the chunk_layout from the previous iteration
chunk_volumes: obtained from sim.structure.get_chunk_volumes()
chunk_owners: obtained from sim.structure.get_chunk_owners()
Returns:
The chunk layout to be used by the next iteration, with the chunks
resized appropriately to balance load across MPI processes.
"""
def should_adjust_chunk_layout(self, sim: mp.Simulation) -> bool:
"""Is the current layout imbalanced enough to justify rebuilding the sim?
Args:
sim: the meep.Simulation object with the chunk layout to be adjusted
Returns:
A bool specifying whether the current chunk layout is sufficiently poorly
load-balanced to justify the upfront cost of rebuilding the Meep
simulation object to redefine the chunks. By default, True is returned if
this method is not overridden in a subclass.
"""
del sim
return True
def _validate_sim(self, sim: mp.Simulation):
"""Ensures that chunk layout and chunk volumes are properly formatted.
Args:
sim: the meep.Simulation object to validate
Raises:
ValueError: if sim.chunk_layout includes nodes with duplicate proc_ids
ValueError: if sim.chunk_layout has proc_ids not included in
sim.structure.get_chunk_owners() (this could occur if number of
processes exceeds the number of physical cores)
"""
# Check that chunk_layout does not contain duplicate nodes for same process
if bpu.partition_has_duplicate_proc_ids(sim.chunk_layout):
raise ValueError("Duplicate proc_ids found in chunk_layout!")
# Check that proc_ids in chunk_layout are assigned properly to grid owners
chunk_owners = sim.structure.get_chunk_owners()
proc_ids = [node.proc_id for node in bpu.enumerate_leaf_nodes(sim.chunk_layout)]
if set(chunk_owners) != set(proc_ids):
raise ValueError(
f"Processes {set(proc_ids) - set(chunk_owners)} present in chunk_layout but not grid_owners!"
)
class ChunkBalancer(AbstractChunkBalancer):
"""A chunk balancer for adaptive chunk layouts in Meep simulations.
This class generates initial chunk layouts using Meep's built-in scheme, and
rebalances chunks using timing data from the previous iteration in an
optimization run to resize chunks appropriately.
The general idea behind this chunk balancer implementation is to compute
'load' terms defined by the ratio of (simulation time t) / (chunk volume V).
This implicitly should account for additional background load on the various
MPI nodes, since if one machine is more busy than another, it will have a
higher ratio of t/V.
The split_pos for each node is adjusted to equalize the per-core values of
left and right simulation times, then the method is recursively called for
each of the node's children. A sensitivity parameter determines how much the
split_pos should change from the previous value, with 0.0 being no change
and 1.0 being instant change to the new computed value.
"""
def compute_new_chunk_layout(
self,
timing_measurements: MeepTimingMeasurements,
partition: mp.BinaryPartition,
chunk_volumes: Tuple[mp.grid_volume],
chunk_owners: np.ndarray,
new_partition: Optional[mp.BinaryPartition] = None,
new_partition_root: Optional[mp.BinaryPartition] = None,
xyz_bounds: Optional[Tuple[float, float, float, float, float, float]] = None,
sensitivity: float = 0.5,
) -> mp.BinaryPartition:
"""Rebalances the partition to equalize simulation time for node's children.
Args:
timing_measurements: elapsed time by category from previous iteration
partition: the chunk_layout from the previous iteration
chunk_volumes: obtained from sim.structure.get_chunk_volumes()
chunk_owners: obtained from sim.structure.get_chunk_owners()
new_partition: reference to the new chunk_layout object
new_partition_root: reference to the root node of the new chunk_layout
xyz_bounds: min and max xyz values for the new sub-partition
sensitivity: adjusts how much the new split_pos should change
Returns:
The chunk layout to be used by the next iteration, with the chunks
resized appropriately to balance sim times across MPI processes.
"""
# if we're at root, make a copy of the partition to return as new partition
if new_partition is None:
new_partition = copy.deepcopy(partition)
new_partition_root = new_partition
# if at leaf node, no more rebalancing to do, so return the partition
if bpu.is_leaf_node(partition):
return partition
if xyz_bounds is None:
xyz_bounds = bpu.get_box_ranges(partition, chunk_volumes, chunk_owners)
# Retrieve the relevant dimensions d_min, d_max along the split axis
# NOTE: we call the distances d_min, d_max regardless of split direction
if partition.split_dir == mp.X:
d_min, d_max, _, _, _, _ = xyz_bounds
elif partition.split_dir == mp.Y:
_, _, d_min, d_max, _, _ = xyz_bounds
elif partition.split_dir == mp.Z:
_, _, _, _, d_min, d_max = xyz_bounds
else:
raise ValueError("Split direction must be mp.X, mp.Y, or mp.Z!")
sim_times = list(self._compute_working_times_per_process(timing_measurements))
n_left = partition.left.numchunks()
n_right = partition.right.numchunks()
t_left, t_right = bpu.get_left_right_total_weights(partition, sim_times)
v_left, v_right = bpu.get_left_right_total_volumes(
partition, chunk_volumes, chunk_owners
)
v_left_new = v_left * t_right * n_left
v_right_new = v_right * t_left * n_right
split_frac = v_left_new / (v_left_new + v_right_new)
old_split_pos = partition.split_pos
new_split_pos = (d_max - d_min) * split_frac + d_min
new_split_pos = sensitivity * new_split_pos + (1 - sensitivity) * old_split_pos
# Adjust the split pos on new_partition. We can't modify the old partition
# because it could affect left/right volume ratios for subsequent calls
new_partition.split_pos = new_split_pos
x_min, x_max, y_min, y_max, z_min, z_max = xyz_bounds
# Update the box ranges for the new partition after the adjustment
if partition.split_dir == mp.X:
left_xyz_bounds = (x_min, new_split_pos, y_min, y_max, z_min, z_max)
right_xyz_bounds = (new_split_pos, x_max, y_min, y_max, z_min, z_max)
elif partition.split_dir == mp.Y:
left_xyz_bounds = (x_min, x_max, y_min, new_split_pos, z_min, z_max)
right_xyz_bounds = (x_min, x_max, new_split_pos, y_max, z_min, z_max)
elif partition.split_dir == mp.Z:
left_xyz_bounds = (x_min, x_max, y_min, y_max, z_min, new_split_pos)
right_xyz_bounds = (x_min, x_max, y_min, y_max, new_split_pos, z_max)
self.compute_new_chunk_layout(
timing_measurements,
partition.left,
chunk_volumes,
chunk_owners,
new_partition=new_partition.left,
new_partition_root=new_partition_root,
xyz_bounds=left_xyz_bounds,
sensitivity=sensitivity,
)
self.compute_new_chunk_layout(
timing_measurements,
partition.right,
chunk_volumes,
chunk_owners,
new_partition=new_partition.right,
new_partition_root=new_partition_root,
xyz_bounds=right_xyz_bounds,
sensitivity=sensitivity,
)
return new_partition
def _compute_working_times_per_process(
self, timing_measurements: MeepTimingMeasurements
) -> np.ndarray:
"""Computes the time spent by each MPI process actively working."""
time_sinks_to_include = [
"time_stepping",
"boundaries_copying",
"field_output",
"fourier_transform",
"mpb",
"near_to_farfield_transform",
]
num_processes = len(timing_measurements.measurements["time_stepping"])
working_times = np.zeros(num_processes)
for category in time_sinks_to_include:
working_times += np.array(timing_measurements.measurements[category])
return working_times