Skip to content

Commit dce697d

Browse files
Googlercopybara-github
authored andcommitted
Shrink worker pool if memory pressure too high.
`WorkerLifecycleManager` catches workers in idle state too rarely, therefore I implemented strategy to shrink worker pools if the memory pressure is high even after evicting of idle workers. This strategy sets workers as `doomed` and on the next return to worker pool invalidates them and shrinks worker pool. To enable this strategy set flag `experimental_shrink_worker_pool`. This strategy is tested and could decrease worker memory usage twice without almost any wall time degradation. PiperOrigin-RevId: 522302353 Change-Id: I0c1d09cb3d70c711862dc87ca486bd5c978f0813
1 parent 071bcba commit dce697d

File tree

15 files changed

+469
-54
lines changed

15 files changed

+469
-54
lines changed

src/main/java/com/google/devtools/build/lib/worker/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@ java_library(
157157
"//src/main/java/com/google/devtools/build/lib/util:ps_info_collector",
158158
"//third_party:auto_value",
159159
"//third_party:guava",
160-
"//third_party:jsr305",
161160
],
162161
)
163162

@@ -202,6 +201,7 @@ java_library(
202201
":worker",
203202
":worker_key",
204203
"//third_party:apache_commons_pool2",
204+
"//third_party:guava",
205205
],
206206
)
207207

src/main/java/com/google/devtools/build/lib/worker/SimpleWorkerPool.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import com.google.common.base.Throwables;
1717
import java.io.IOException;
18+
import java.util.HashMap;
19+
import java.util.Map;
1820
import java.util.Objects;
1921
import javax.annotation.concurrent.ThreadSafe;
2022
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
@@ -29,6 +31,14 @@
2931
@ThreadSafe
3032
final class SimpleWorkerPool extends GenericKeyedObjectPool<WorkerKey, Worker> {
3133

34+
/**
35+
* The subtrahend for maximal toal number of object per key. Unfortunately
36+
* GenericKeyedObjectPoolConfig doesn't support different number of objects per key, so we need to
37+
* use this adhoc variable to manage that. We need this variable to automatically adjusting pool
38+
* size per worker key.
39+
*/
40+
private Map<WorkerKey, Integer> shrunkBy = new HashMap<>();
41+
3242
public SimpleWorkerPool(WorkerFactory factory, int max) {
3343
super(factory, makeConfig(max));
3444
}
@@ -77,12 +87,38 @@ public Worker borrowObject(WorkerKey key) throws IOException, InterruptedExcepti
7787
public void invalidateObject(WorkerKey key, Worker obj) throws InterruptedException {
7888
try {
7989
super.invalidateObject(key, obj);
90+
if (obj.isDoomed()) {
91+
updateShrunkBy(key);
92+
}
8093
} catch (Throwable t) {
8194
Throwables.propagateIfPossible(t, InterruptedException.class);
8295
throw new RuntimeException("unexpected", t);
8396
}
8497
}
8598

99+
@Override
100+
public void returnObject(WorkerKey key, Worker obj) {
101+
super.returnObject(key, obj);
102+
if (obj.isDoomed()) {
103+
updateShrunkBy(key);
104+
}
105+
}
106+
107+
public int getMaxTotalPerKey(WorkerKey key) {
108+
return getMaxTotalPerKey() - shrunkBy.getOrDefault(key, 0);
109+
}
110+
111+
private synchronized void updateShrunkBy(WorkerKey workerKey) {
112+
int currentValue = shrunkBy.getOrDefault(workerKey, 0);
113+
if (getMaxTotalPerKey() - currentValue > 1) {
114+
shrunkBy.put(workerKey, currentValue + 1);
115+
}
116+
}
117+
118+
void clearShrunkBy() {
119+
shrunkBy = new HashMap<>();
120+
}
121+
86122
/**
87123
* Our own configuration class for the {@code SimpleWorkerPool} that correctly implements {@code
88124
* equals()} and {@code hashCode()}.

src/main/java/com/google/devtools/build/lib/worker/Worker.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,24 @@ public abstract class Worker {
3939
protected final int workerId;
4040
/** The path of the log file for this worker. */
4141
protected final Path logFile;
42+
/**
43+
* Indicated that worker should be destroyed after usage. If worker doomed, then after its
44+
* desctruction we automacically shrink the pool size for its worker key.
45+
*/
46+
protected boolean doomed;
4247

4348
public Worker(WorkerKey workerKey, int workerId, Path logFile) {
4449
this.workerKey = workerKey;
4550
this.workerId = workerId;
4651
this.logFile = logFile;
52+
this.doomed = false;
4753
}
4854

4955
/**
5056
* Returns a unique id for this worker. This is used to distinguish different worker processes in
5157
* logs and messages.
5258
*/
53-
int getWorkerId() {
59+
public int getWorkerId() {
5460
return this.workerId;
5561
}
5662

@@ -64,6 +70,14 @@ public WorkerKey getWorkerKey() {
6470
return workerKey;
6571
}
6672

73+
public boolean isDoomed() {
74+
return doomed;
75+
}
76+
77+
void setDoomed(boolean doomed) {
78+
this.doomed = doomed;
79+
}
80+
6781
HashCode getWorkerFilesCombinedHash() {
6882
return workerKey.getWorkerFilesCombinedHash();
6983
}

src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ public void destroyObject(WorkerKey key, PooledObject<Worker> p) {
144144
@Override
145145
public boolean validateObject(WorkerKey key, PooledObject<Worker> p) {
146146
Worker worker = p.getObject();
147+
if (worker.isDoomed()) {
148+
return false;
149+
}
147150
Optional<Integer> exitValue = worker.getExitValue();
148151
if (exitValue.isPresent()) {
149152
if (worker.diedUnexpectedly()) {

src/main/java/com/google/devtools/build/lib/worker/WorkerLifecycleManager.java

Lines changed: 87 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import com.google.common.collect.ImmutableList;
1818
import com.google.common.collect.ImmutableSet;
1919
import com.google.common.flogger.GoogleLogger;
20+
import com.google.devtools.build.lib.events.Event;
21+
import com.google.devtools.build.lib.events.Reporter;
2022
import java.time.Duration;
2123
import java.util.Comparator;
2224
import java.util.HashSet;
@@ -41,12 +43,17 @@ final class WorkerLifecycleManager extends Thread {
4143
private boolean isWorking = false;
4244
private final WorkerPool workerPool;
4345
private final WorkerOptions options;
46+
private Reporter reporter;
4447

4548
public WorkerLifecycleManager(WorkerPool workerPool, WorkerOptions options) {
4649
this.workerPool = workerPool;
4750
this.options = options;
4851
}
4952

53+
public void setReporter(Reporter reporter) {
54+
this.reporter = reporter;
55+
}
56+
5057
@Override
5158
public void run() {
5259
if (options.totalWorkerMemoryLimitMb == 0) {
@@ -89,7 +96,6 @@ void evictWorkers(ImmutableList<WorkerMetric> workerMetrics) throws InterruptedE
8996

9097
int workerMemeoryUsage =
9198
workerMetrics.stream()
92-
.filter(metric -> metric.getWorkerStat() != null)
9399
.mapToInt(metric -> metric.getWorkerStat().getUsedMemoryInKB() / 1000)
94100
.sum();
95101

@@ -101,18 +107,67 @@ void evictWorkers(ImmutableList<WorkerMetric> workerMetrics) throws InterruptedE
101107
collectEvictionCandidates(
102108
workerMetrics, options.totalWorkerMemoryLimitMb, workerMemeoryUsage);
103109

104-
if (candidates.isEmpty()) {
105-
return;
110+
String msg =
111+
String.format("Going to evict %d workers with ids: %s", candidates.size(), candidates);
112+
logger.atInfo().log("%s", msg);
113+
if (reporter != null) {
114+
reporter.handle(Event.info(msg));
106115
}
107116

108-
logger.atInfo().log("Going to evict %d workers with ids: %s", candidates.size(), candidates);
117+
ImmutableSet<Integer> evictedWorkers = evictCandidates(workerPool, candidates);
109118

110-
evictCandidates(workerPool, candidates);
119+
msg =
120+
String.format(
121+
"Total evicted idle workers %d. With ids: %s", evictedWorkers.size(), evictedWorkers);
122+
logger.atInfo().log("%s", msg);
123+
if (reporter != null) {
124+
reporter.handle(Event.info(msg));
125+
}
126+
127+
if (options.shrinkWorkerPool) {
128+
List<WorkerMetric> notEvictedWorkerMetrics =
129+
workerMetrics.stream()
130+
.filter(
131+
metric ->
132+
!evictedWorkers.containsAll(metric.getWorkerProperties().getWorkerIds()))
133+
.collect(Collectors.toList());
134+
135+
int notEvictedWorkerMemeoryUsage =
136+
notEvictedWorkerMetrics.stream()
137+
.mapToInt(metric -> metric.getWorkerStat().getUsedMemoryInKB() / 1000)
138+
.sum();
139+
140+
if (notEvictedWorkerMemeoryUsage <= options.totalWorkerMemoryLimitMb) {
141+
return;
142+
}
143+
144+
postponeInvalidation(notEvictedWorkerMetrics, notEvictedWorkerMemeoryUsage);
145+
}
111146
}
112147

113-
private void evictCandidates(WorkerPool pool, ImmutableSet<Integer> candidates)
114-
throws InterruptedException {
115-
pool.evictWithPolicy(new CandidateEvictionPolicy(candidates));
148+
private void postponeInvalidation(
149+
List<WorkerMetric> workerMetrics, int notEvictedWorkerMemeoryUsage) {
150+
ImmutableSet<Integer> potentialCandidates =
151+
getCandidates(
152+
workerMetrics, options.totalWorkerMemoryLimitMb, notEvictedWorkerMemeoryUsage);
153+
154+
String msg = String.format("New doomed workers candidates %s", potentialCandidates);
155+
logger.atInfo().log("%s", msg);
156+
if (reporter != null) {
157+
reporter.handle(Event.info(msg));
158+
}
159+
workerPool.setDoomedWorkers(potentialCandidates);
160+
}
161+
162+
/**
163+
* Applies eviction police for candidates. Returns the worker ids of evicted workers. We don't
164+
* garantee that every candidate is going to be evicted. Returns worker ids of evicted workers.
165+
*/
166+
private static ImmutableSet<Integer> evictCandidates(
167+
WorkerPool pool, ImmutableSet<Integer> candidates) throws InterruptedException {
168+
CandidateEvictionPolicy policy = new CandidateEvictionPolicy(candidates);
169+
pool.evictWithPolicy(policy);
170+
return policy.getEvictedWorkers();
116171
}
117172

118173
/** Collects worker candidates to evict. Choses workers with the largest memory consumption. */
@@ -124,27 +179,27 @@ ImmutableSet<Integer> collectEvictionCandidates(
124179

125180
List<WorkerMetric> idleWorkerMetrics =
126181
workerMetrics.stream()
127-
.filter(
128-
metric ->
129-
metric.getWorkerStat() != null
130-
&& idleWorkers.containsAll(metric.getWorkerProperties().getWorkerIds()))
182+
.filter(metric -> idleWorkers.containsAll(metric.getWorkerProperties().getWorkerIds()))
131183
.collect(Collectors.toList());
132184

133-
if (idleWorkerMetrics.size() != idleWorkers.size()) {
134-
logger.atInfo().log(
135-
"Difference between idle workers and idle worker metrics is %d",
136-
idleWorkers.size() - idleWorkerMetrics.size());
137-
}
185+
return getCandidates(idleWorkerMetrics, memoryLimitMb, workerMemeoryUsageMb);
186+
}
138187

139-
idleWorkerMetrics.sort(new MemoryComparator());
188+
/**
189+
* Chooses the worker ids of workers with the most usage of memory. Selects workers until total
190+
* memory usage is less than memoryLimitMb.
191+
*/
192+
private static ImmutableSet<Integer> getCandidates(
193+
List<WorkerMetric> workerMetrics, int memoryLimitMb, int usedMemoryMb) {
140194

195+
workerMetrics.sort(new MemoryComparator());
141196
ImmutableSet.Builder<Integer> candidates = ImmutableSet.builder();
142197
int freeMemoryMb = 0;
143-
for (WorkerMetric metric : idleWorkerMetrics) {
198+
for (WorkerMetric metric : workerMetrics) {
144199
candidates.addAll(metric.getWorkerProperties().getWorkerIds());
145200
freeMemoryMb += metric.getWorkerStat().getUsedMemoryInKB() / 1000;
146201

147-
if (workerMemeoryUsageMb - freeMemoryMb <= memoryLimitMb) {
202+
if (usedMemoryMb - freeMemoryMb <= memoryLimitMb) {
148203
break;
149204
}
150205
}
@@ -183,27 +238,33 @@ public Set<Integer> getWorkerIds() {
183238

184239
/** Eviction policy for WorkerPool. Evict all idle workers, which were passed in constructor. */
185240
private static class CandidateEvictionPolicy implements EvictionPolicy<Worker> {
186-
private final ImmutableSet<Integer> workerIds;
241+
private final ImmutableSet<Integer> workerCandidates;
242+
private final Set<Integer> evictedWorkers;
187243

188-
public CandidateEvictionPolicy(ImmutableSet<Integer> workerIds) {
189-
this.workerIds = workerIds;
244+
public CandidateEvictionPolicy(ImmutableSet<Integer> workerCandidates) {
245+
this.workerCandidates = workerCandidates;
246+
this.evictedWorkers = new HashSet<>();
190247
}
191248

192249
@Override
193250
public boolean evict(EvictionConfig config, PooledObject<Worker> underTest, int idleCount) {
194251
int workerId = underTest.getObject().getWorkerId();
195-
if (workerIds.contains(workerId)) {
252+
if (workerCandidates.contains(workerId)) {
253+
evictedWorkers.add(workerId);
196254
logger.atInfo().log(
197255
"Evicting worker %d with mnemonic %s",
198256
workerId, underTest.getObject().getWorkerKey().getMnemonic());
199257
return true;
200258
}
201-
202259
return false;
203260
}
261+
262+
public ImmutableSet<Integer> getEvictedWorkers() {
263+
return ImmutableSet.copyOf(evictedWorkers);
264+
}
204265
}
205266

206-
/** Compare workers memory in descending order. */
267+
/** Compare worker metrics by memory consupmtion in descending order. */
207268
private static class MemoryComparator implements Comparator<WorkerMetric> {
208269
@Override
209270
public int compare(WorkerMetric m1, WorkerMetric m2) {

src/main/java/com/google/devtools/build/lib/worker/WorkerMetric.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildMetrics.WorkerMetrics;
1919
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildMetrics.WorkerMetrics.WorkerStats;
2020
import java.time.Instant;
21-
import javax.annotation.Nullable;
2221

2322
/**
2423
* Contains data about worker statistics during execution. This class contains data for {@link
@@ -29,13 +28,12 @@ public abstract class WorkerMetric {
2928

3029
public abstract WorkerProperties getWorkerProperties();
3130

32-
@Nullable
3331
public abstract WorkerStat getWorkerStat();
3432

3533
public abstract boolean isMeasurable();
3634

3735
public static WorkerMetric create(
38-
WorkerProperties workerProperties, @Nullable WorkerStat workerStat, boolean isMeasurable) {
36+
WorkerProperties workerProperties, WorkerStat workerStat, boolean isMeasurable) {
3937
return new AutoValue_WorkerMetric(workerProperties, workerStat, isMeasurable);
4038
}
4139

src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,18 @@ public void buildStarting(BuildStartingEvent event) {
162162
workerPool = new WorkerPoolImpl(newConfig);
163163
// If workerPool is restarted then we should recreate metrics.
164164
WorkerMetricsCollector.instance().clear();
165+
166+
// Start collecting after a pool is defined
167+
workerLifecycleManager = new WorkerLifecycleManager(workerPool, options);
168+
if (options.workerVerbose) {
169+
workerLifecycleManager.setReporter(env.getReporter());
170+
}
171+
workerLifecycleManager.setDaemon(true);
172+
workerLifecycleManager.start();
165173
}
166174

167-
// Start collecting after a pool is defined
168-
workerLifecycleManager = new WorkerLifecycleManager(workerPool, options);
169-
workerLifecycleManager.setDaemon(true);
170-
workerLifecycleManager.start();
175+
// Clean doomed workers on the beginning of a build.
176+
workerPool.clearDoomedWorkers();
171177
}
172178

173179
@Override

src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,4 +221,14 @@ public String getTypeDescription() {
221221
effectTags = {OptionEffectTag.EXECUTION},
222222
help = "If enabled, workers are run in a hardened sandbox, if the implementation allows it.")
223223
public boolean sandboxHardening;
224+
225+
@Option(
226+
name = "experimental_shrink_worker_pool",
227+
defaultValue = "false",
228+
documentationCategory = OptionDocumentationCategory.EXECUTION_STRATEGY,
229+
effectTags = {OptionEffectTag.EXECUTION, OptionEffectTag.HOST_MACHINE_RESOURCE_OPTIMIZATIONS},
230+
help =
231+
"If enabled, could shrink worker pool if worker memory pressure is high. This flag works"
232+
+ " only when flag experimental_total_worker_memory_limit_mb is enabled.")
233+
public boolean shrinkWorkerPool;
224234
}

src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414
package com.google.devtools.build.lib.worker;
1515

16+
import com.google.common.collect.ImmutableSet;
1617
import java.io.IOException;
1718
import org.apache.commons.pool2.impl.EvictionPolicy;
1819

@@ -43,5 +44,9 @@ public interface WorkerPool {
4344

4445
void invalidateObject(WorkerKey key, Worker obj) throws InterruptedException;
4546

47+
void setDoomedWorkers(ImmutableSet<Integer> workerIds);
48+
49+
void clearDoomedWorkers();
50+
4651
void close();
4752
}

0 commit comments

Comments
 (0)