Skip to content

Commit

Permalink
Add --experimental_worker_metrics_poll_interval flag.
Browse files Browse the repository at this point in the history
Partly so people can tune it to their liking, partly to be able to test. Also adds a flag for the example worker to spend actual time "working", so we can do a measurement.

PiperOrigin-RevId: 527594383
Change-Id: Id7496004d50230cb44741d4a00954eaa9ccf560c
  • Loading branch information
larsrc-google authored and Copybara-Service committed Apr 27, 2023
1 parent 77cd84a commit a95847c
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 11 deletions.
Expand Up @@ -49,7 +49,7 @@ public static PsInfoCollector instance() {
return instance;
}

private PsSnapshot currenPsSnapshot;
private PsSnapshot currentPsSnapshot;

// prevent construction
private PsInfoCollector() {}
Expand All @@ -60,8 +60,8 @@ private PsInfoCollector() {}
*/
public synchronized ResourceSnapshot collectResourceUsage(ImmutableSet<Long> processIds) {
Instant now = clock.now();
if (currenPsSnapshot == null
|| Duration.between(currenPsSnapshot.getCollectionTime(), now)
if (currentPsSnapshot == null
|| Duration.between(currentPsSnapshot.getCollectionTime(), now)
.compareTo(MIN_COLLECTION_INTERVAL)
> 0) {

Expand All @@ -70,15 +70,15 @@ public synchronized ResourceSnapshot collectResourceUsage(ImmutableSet<Long> pro

ImmutableMap.Builder<Long, Integer> pidToMemoryInKb = ImmutableMap.builder();
for (Long pid : processIds) {
PsInfo psInfo = currenPsSnapshot.getPidToPsInfo().get(pid);
PsInfo psInfo = currentPsSnapshot.getPidToPsInfo().get(pid);
if (psInfo == null) {
continue;
}
pidToMemoryInKb.put(pid, collectMemoryUsageOfDescendants(psInfo, currenPsSnapshot));
pidToMemoryInKb.put(pid, collectMemoryUsageOfDescendants(psInfo, currentPsSnapshot));
}

return ResourceSnapshot.create(
pidToMemoryInKb.buildOrThrow(), currenPsSnapshot.getCollectionTime());
pidToMemoryInKb.buildOrThrow(), currentPsSnapshot.getCollectionTime());
}

/** Updates current snapshot of all processes state, using ps command. */
Expand All @@ -90,7 +90,7 @@ private void updatePsSnapshot() {
pidToPsInfo.values().stream()
.collect(toImmutableSetMultimap(PsInfo::getParentPid, Function.identity()));

currenPsSnapshot = PsSnapshot.create(pidToPsInfo, pidToChildrenPsInfo, clock.now());
currentPsSnapshot = PsSnapshot.create(pidToPsInfo, pidToChildrenPsInfo, clock.now());
}

/** Collects memory usage for every process. */
Expand Down
Expand Up @@ -19,7 +19,6 @@
import com.google.common.flogger.GoogleLogger;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.Reporter;
import java.time.Duration;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
Expand All @@ -35,7 +34,6 @@
*/
final class WorkerLifecycleManager extends Thread {

private static final Duration SLEEP_INTERVAL = Duration.ofSeconds(5);
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private boolean isWorking = false;
Expand Down Expand Up @@ -64,14 +62,13 @@ public void run() {
// This loop works until method stopProcessing() called by WorkerModule.
while (isWorking) {
try {
Thread.sleep(SLEEP_INTERVAL.toMillis());
Thread.sleep(options.workerMetricsPollInterval.toMillis());
} catch (InterruptedException e) {
break;
}

ImmutableList<WorkerMetric> workerMetrics =
WorkerMetricsCollector.instance().collectMetrics();

try {
evictWorkers(workerMetrics);
} catch (InterruptedException e) {
Expand Down
Expand Up @@ -18,12 +18,14 @@
import com.google.devtools.build.lib.util.ResourceConverter;
import com.google.devtools.common.options.Converter;
import com.google.devtools.common.options.Converters;
import com.google.devtools.common.options.Converters.DurationConverter;
import com.google.devtools.common.options.Option;
import com.google.devtools.common.options.OptionDocumentationCategory;
import com.google.devtools.common.options.OptionEffectTag;
import com.google.devtools.common.options.Options;
import com.google.devtools.common.options.OptionsBase;
import com.google.devtools.common.options.OptionsParsingException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -212,4 +214,15 @@ public String getTypeDescription() {
"If enabled, could shrink worker pool if worker memory pressure is high. This flag works"
+ " only when flag experimental_total_worker_memory_limit_mb is enabled.")
public boolean shrinkWorkerPool;

@Option(
name = "experimental_worker_metrics_poll_interval",
converter = DurationConverter.class,
defaultValue = "5s",
documentationCategory = OptionDocumentationCategory.EXECUTION_STRATEGY,
effectTags = {OptionEffectTag.EXECUTION, OptionEffectTag.HOST_MACHINE_RESOURCE_OPTIMIZATIONS},
help =
"The interval between collecting worker metrics and possibly attempting evictions. "
+ "Cannot effectively be less than 1s for performance reasons.")
public Duration workerMetricsPollInterval;
}
Expand Up @@ -280,6 +280,15 @@ private static void parseOptionsAndLog(List<String> args) throws Exception {
}
}

if (options.workTime != null) {
try {
Thread.sleep(options.workTime.toMillis());
} catch (InterruptedException e) {
System.err.printf(
"Interrupted while pretending to work for %d millis%n", options.workTime.toMillis());
}
}

String outputStr = Joiner.on('\n').join(outputs);
if (options.outputFile.isEmpty()) {
System.out.println(outputStr);
Expand Down
Expand Up @@ -14,11 +14,13 @@
package com.google.devtools.build.lib.worker;

import com.google.devtools.build.lib.actions.ExecutionRequirements;
import com.google.devtools.common.options.Converters;
import com.google.devtools.common.options.EnumConverter;
import com.google.devtools.common.options.Option;
import com.google.devtools.common.options.OptionDocumentationCategory;
import com.google.devtools.common.options.OptionEffectTag;
import com.google.devtools.common.options.OptionsBase;
import java.time.Duration;

/**
* Options for the example worker itself.
Expand Down Expand Up @@ -89,6 +91,17 @@ public static class ExampleWorkOptions extends OptionsBase {
help = "Prints a list of all environment variables."
)
public boolean printEnv;

@Option(
name = "work_time",
documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
effectTags = {OptionEffectTag.NO_OP},
converter = Converters.DurationConverter.class,
defaultValue = "0",
help =
"When the worker receives a work request, it will sleep for this long before "
+ "responding.")
public Duration workTime;
}

@Option(
Expand Down
30 changes: 30 additions & 0 deletions src/test/shell/integration/bazel_worker_test.sh
Expand Up @@ -728,4 +728,34 @@ EOF
expect_log "^---8<---8<--- End of log ---8<---8<---"
}

function test_worker_metrics_collection() {
prepare_example_worker
cat >>BUILD <<EOF
[work(
name = "hello_world_%s" % idx,
worker = ":worker",
worker_args = ["--worker_protocol=${WORKER_PROTOCOL}"],
args = ["--write_uuid", "--write_counter", "--work_time=1s"],
) for idx in range(10)]
EOF

bazel build \
--build_event_text_file="${TEST_log}".build.json \
--profile="${TEST_log}".profile \
--experimental_worker_metrics_poll_interval=400ms \
--experimental_collect_worker_data_in_profiler \
:hello_world_1 &> "$TEST_log" \
|| fail "build failed"
expect_log "Created new ${WORKER_TYPE_LOG_STRING} Work worker (id [0-9]\+)"
# Now see that we have metrics in the build event log.
mv "${TEST_log}".build.json "${TEST_log}"
expect_log "mnemonic: \"Work\""
expect_log "worker_memory_in_kb: [0-9][0-9]*"
# And see that we collected metrics several times
mv "${TEST_log}".profile "${TEST_log}"
local metric_events=$(grep -sc -- "Workers memory usage" $TEST_log)
(( metric_events >= 2 )) || fail "Expected at least 2 worker metric collections"
}


run_suite "Worker integration tests"

0 comments on commit a95847c

Please sign in to comment.