diff --git a/.bazelci/run_server_test.sh b/.bazelci/run_server_test.sh index 21b6d70389..54e4658b16 100755 --- a/.bazelci/run_server_test.sh +++ b/.bazelci/run_server_test.sh @@ -8,11 +8,11 @@ bazel build //src/main/java/build/buildfarm:buildfarm-shard-worker bazel build //src/main/java/build/buildfarm:buildfarm-server # Start a single worker -bazel run //src/main/java/build/buildfarm:buildfarm-shard-worker $(pwd)/examples/config.minimal.yml > server.log 2>&1 & +bazel run //src/main/java/build/buildfarm:buildfarm-shard-worker $(pwd)/examples/config.minimal.yml > worker.log 2>&1 & echo "Started buildfarm-shard-worker..." # Start a single server -bazel run //src/main/java/build/buildfarm:buildfarm-server $(pwd)/examples/config.minimal.yml > worker.log 2>&1 & +bazel run //src/main/java/build/buildfarm:buildfarm-server $(pwd)/examples/config.minimal.yml > server.log 2>&1 & echo "Started buildfarm-server..." echo "Wait for startup to finish..." diff --git a/.bazelrc b/.bazelrc index adbb3c68fd..c3caf4d8fb 100644 --- a/.bazelrc +++ b/.bazelrc @@ -1,3 +1,9 @@ +build --java_language_version=17 +build --java_runtime_version=remotejdk_17 + +build --tool_java_language_version=17 +build --tool_java_runtime_version=remotejdk_17 + common --enable_platform_specific_config build:fuse --define=fuse=true diff --git a/.bazelversion b/.bazelversion index 6abaeb2f90..19b860c187 100644 --- a/.bazelversion +++ b/.bazelversion @@ -1 +1 @@ -6.2.0 +6.4.0 diff --git a/BUILD b/BUILD index f552ea9d7b..eb773e1a16 100644 --- a/BUILD +++ b/BUILD @@ -120,7 +120,7 @@ sh_binary( java_image( name = "buildfarm-server", args = ["/app/build_buildfarm/examples/config.minimal.yml"], - base = "@amazon_corretto_java_image_base//image", + base = "@ubuntu-mantic//image", classpath_resources = [ "//src/main/java/build/buildfarm:configs", ], @@ -148,14 +148,14 @@ oss_audit( # Download cgroup-tools so that the worker is able to restrict actions via control groups. download_pkgs( name = "worker_pkgs", - image_tar = "@ubuntu-jammy//image", + image_tar = "@ubuntu-mantic//image", packages = ["cgroup-tools"], tags = ["container"], ) install_pkgs( name = "worker_pkgs_image", - image_tar = "@ubuntu-jammy//image", + image_tar = "@ubuntu-mantic//image", installables_tar = ":worker_pkgs.tar", installation_cleanup_commands = "rm -rf /var/lib/apt/lists/*", output_image_name = "worker_pkgs_image", diff --git a/README.md b/README.md index 675fe94dbd..cd9caf8ee3 100644 --- a/README.md +++ b/README.md @@ -19,8 +19,8 @@ All commandline options override corresponding config settings. Run via -``` -docker run -d --rm --name buildfarm-redis -p 6379:6379 redis:5.0.9 +```shell +$ docker run -d --rm --name buildfarm-redis -p 6379:6379 redis:5.0.9 redis-cli config set stop-writes-on-bgsave-error no ``` @@ -28,8 +28,8 @@ redis-cli config set stop-writes-on-bgsave-error no Run via -``` -bazelisk run //src/main/java/build/buildfarm:buildfarm-server -- +```shell +$ bazelisk run //src/main/java/build/buildfarm:buildfarm-server -- Ex: bazelisk run //src/main/java/build/buildfarm:buildfarm-server -- --jvm_flag=-Djava.util.logging.config.file=$PWD/examples/logging.properties $PWD/examples/config.minimal.yml ``` @@ -40,8 +40,8 @@ Ex: bazelisk run //src/main/java/build/buildfarm:buildfarm-server -- --jvm_flag= Run via -``` -bazelisk run //src/main/java/build/buildfarm:buildfarm-shard-worker -- +```shell +$ bazelisk run //src/main/java/build/buildfarm:buildfarm-shard-worker -- Ex: bazelisk run //src/main/java/build/buildfarm:buildfarm-shard-worker -- --jvm_flag=-Djava.util.logging.config.file=$PWD/examples/logging.properties $PWD/examples/config.minimal.yml @@ -53,9 +53,9 @@ Ex: bazelisk run //src/main/java/build/buildfarm:buildfarm-shard-worker -- --jvm To use the example configured buildfarm with bazel (version 1.0 or higher), you can configure your `.bazelrc` as follows: -``` +```shell $ cat .bazelrc -build --remote_executor=grpc://localhost:8980 +$ build --remote_executor=grpc://localhost:8980 ``` Then run your build as you would normally do. @@ -67,20 +67,20 @@ Buildfarm uses [Java's Logging framework](https://docs.oracle.com/javase/10/core You can use typical Java logging configuration to filter these results and observe the flow of executions through your running services. An example `logging.properties` file has been provided at [examples/logging.properties](examples/logging.properties) for use as follows: -``` -bazel run //src/main/java/build/buildfarm:buildfarm-server -- --jvm_flag=-Djava.util.logging.config.file=$PWD/examples/logging.properties $PWD/examples/config.minimal.yml +```shell +$ bazel run //src/main/java/build/buildfarm:buildfarm-server -- --jvm_flag=-Djava.util.logging.config.file=$PWD/examples/logging.properties $PWD/examples/config.minimal.yml ``` and -``` -bazel run //src/main/java/build/buildfarm:buildfarm-shard-worker -- --jvm_flag=-Djava.util.logging.config.file=$PWD/examples/logging.properties $PWD/examples/config.minimal.yml +``` shell +$ bazel run //src/main/java/build/buildfarm:buildfarm-shard-worker -- --jvm_flag=-Djava.util.logging.config.file=$PWD/examples/logging.properties $PWD/examples/config.minimal.yml ``` To attach a remote debugger, run the executable with the `--debug=` flag. For example: -``` -bazel run //src/main/java/build/buildfarm:buildfarm-server -- --debug=5005 $PWD/examples/config.minimal.yml +```shell +$ bazel run //src/main/java/build/buildfarm:buildfarm-server -- --debug=5005 $PWD/examples/config.minimal.yml ``` diff --git a/_site/docs/configuration/configuration.md b/_site/docs/configuration/configuration.md index ae913adf88..466375373f 100644 --- a/_site/docs/configuration/configuration.md +++ b/_site/docs/configuration/configuration.md @@ -7,7 +7,7 @@ has_children: true Minimal required: -``` +```yaml backplane: redisUri: "redis://localhost:6379" queues: @@ -28,17 +28,18 @@ For an example configuration containing all of the configuration values, see `ex ### Common -| Configuration | Accepted and _Default_ Values | Command Line Argument | Description | -|----------------------|-------------------------------|-----------------------|---------------------------------------------------| -| digestFunction | _SHA256_, SHA1 | | Digest function for this implementation | -| defaultActionTimeout | Integer, _600_ | | Default timeout value for an action (seconds) | -| maximumActionTimeout | Integer, _3600_ | | Maximum allowed action timeout (seconds) | -| maxEntrySizeBytes | Long, _2147483648_ | | Maximum size of a single blob accepted (bytes) | -| prometheusPort | Integer, _9090_ | --prometheus_port | Listening port of the Prometheus metrics endpoint | +| Configuration | Accepted and _Default_ Values | Command Line Argument | Description | +|------------------------------|-------------------------------|-----------------------|--------------------------------------------------------------| +| digestFunction | _SHA256_, SHA1 | | Digest function for this implementation | +| defaultActionTimeout | Integer, _600_ | | Default timeout value for an action (seconds) | +| maximumActionTimeout | Integer, _3600_ | | Maximum allowed action timeout (seconds) | +| maxEntrySizeBytes | Long, _2147483648_ | | Maximum size of a single blob accepted (bytes) | +| prometheusPort | Integer, _9090_ | --prometheus_port | Listening port of the Prometheus metrics endpoint | +| allowSymlinkTargetAbsolute | boolean, _false_ | | Permit inputs to contain symlinks with absolute path targets | Example: -``` +```yaml digestFunction: SHA1 defaultActionTimeout: 1800 maximumActionTimeout: 1800 @@ -79,7 +80,7 @@ worker: Example: -``` +```yaml server: instanceType: SHARD name: shard @@ -96,7 +97,7 @@ server: Example: -``` +```yaml server: grpcMetrics: enabled: false @@ -114,7 +115,7 @@ server: Example: -``` +```yaml server: caches: directoryCacheMaxEntries: 10000 @@ -132,7 +133,7 @@ server: Example: -``` +```yaml server: admin: deploymentEnvironment: AWS @@ -151,14 +152,14 @@ server: Example: -``` +```yaml server: metrics: publisher: log logLevel: INFO ``` -``` +```yaml server: metrics: publisher: aws @@ -207,7 +208,7 @@ server: Example: -``` +```yaml backplane: type: SHARD redisUri: "redis://localhost:6379" @@ -224,7 +225,7 @@ backplane: Example: -``` +```yaml backplane: type: SHARD redisUri: "redis://localhost:6379" @@ -261,8 +262,9 @@ backplane: | errorOperationRemainingResources | boolean, _false_ | | | | realInputDirectories | List of Strings, _external_ | | A list of paths that will not be subject to the effects of linkInputDirectories setting, may also be used to provide writable directories as input roots for actions which expect to be able to write to an input location and will fail if they cannot | | gracefulShutdownSeconds | Integer, 0 | | Time in seconds to allow for operations in flight to finish when shutdown signal is received | +| createSymlinkOutputs | boolean, _false_ | | Creates SymlinkNodes for symbolic links discovered in output paths for actions. No verification of the symlink target path occurs. Buildstream, for example, requires this. | -``` +```yaml worker: port: 8981 publicName: "localhost:8981" @@ -279,7 +281,7 @@ worker: Example: -``` +```yaml worker: capabilities: cas: true @@ -296,7 +298,7 @@ worker: Example: -``` +```yaml worker: sandboxSettings: alwaysUse: true @@ -313,7 +315,7 @@ worker: Example: -``` +```yaml worker: dequeueMatchSettings: acceptEverything: true @@ -333,7 +335,7 @@ worker: Example: -``` +```yaml worker: storages: - type: FILESYSTEM @@ -361,7 +363,7 @@ worker: Example: -``` +```yaml worker: executionPolicies: - name: test diff --git a/_site/docs/execution/execution_policies.md b/_site/docs/execution/execution_policies.md index 19698a9925..f0eaf295d9 100644 --- a/_site/docs/execution/execution_policies.md +++ b/_site/docs/execution/execution_policies.md @@ -17,7 +17,7 @@ This policy type specifies that a worker should prepend a single path, and a num This example will use the buildfarm-provided executable `as-nobody`, which will upon execution demote itself to a `nobody` effective process owner uid, and perform an `execvp(2)` with the remaining provided program arguments, which will subsequently execute as a user that no longer matches the worker process. -``` +```yaml # default wrapper policy application worker: executionPolicies: @@ -50,7 +50,8 @@ These wrappers are used for detecting actions that rely on time. Below is a dem This addresses two problems in regards to an action's dependence on time. The 1st problem is when an action takes longer than it should because it's sleeping unnecessarily. The 2nd problem is when an action relies on time which causes it to eventually be broken on master despite the code not changing. Both problems are expressed below as unit tests. We demonstrate a time-spoofing mechanism (the re-writing of syscalls) which allows us to detect these problems generically over any action. The objective is to analyze builds for performance inefficiency and discover future instabilities before they occur. ### Issue 1 (slow test) -``` + +```bash #!/bin/bash set -euo pipefail @@ -58,16 +59,19 @@ echo -n "testing... " sleep 10; echo "done" ``` + The test takes 10 seconds to run on average. -``` -bazel test --runs_per_test=10 --config=remote //cloud/buildfarm:sleep_test + +```shell +$ bazel test --runs_per_test=10 --config=remote //cloud/buildfarm:sleep_test //cloud/buildfarm:sleep_test PASSED in 10.2s Stats over 10 runs: max = 10.2s, min = 10.1s, avg = 10.2s, dev = 0.0s ``` We can check for performance improvements by using the `skip-sleep` option. -``` -bazel test --runs_per_test=10 --config=remote --remote_default_exec_properties='skip-sleep=true' //cloud/buildfarm:sleep_test + +```shell +$ bazel test --runs_per_test=10 --config=remote --remote_default_exec_properties='skip-sleep=true' //cloud/buildfarm:sleep_test //cloud/buildfarm:sleep_test PASSED in 1.0s Stats over 10 runs: max = 1.0s, min = 0.9s, avg = 1.0s, dev = 0.0s ``` @@ -75,7 +79,8 @@ bazel test --runs_per_test=10 --config=remote --remote_default_exec_properties=' Now the test is 10x faster. If skipping sleep makes an action perform significantly faster without affecting its success rate, that would warrant further investigation into the action's implementation. ### Issue 2 (future failing test) -``` + +```bash #!/bin/bash set -euo pipefail @@ -89,12 +94,15 @@ echo "Times change." date exit -1; ``` + The test passes today, but will it pass tomorrow? Will it pass a year from now? We can find out by using the `time-shift` option. -``` -bazel test --test_output=streamed --remote_default_exec_properties='time-shift=31556952' --config=remote //cloud/buildfarm:future_fail + +```shell +$ bazel test --test_output=streamed --remote_default_exec_properties='time-shift=31556952' --config=remote //cloud/buildfarm:future_fail INFO: Found 1 test target... Times change. Mon Sep 25 18:31:09 UTC 2023 //cloud/buildfarm:future_fail FAILED in 18.0s ``` + Time is shifted to the year 2023 and the test now fails. We can fix the problem before others see it. diff --git a/_site/docs/execution/execution_properties.md b/_site/docs/execution/execution_properties.md index 85579ed099..7966c70dd2 100644 --- a/_site/docs/execution/execution_properties.md +++ b/_site/docs/execution/execution_properties.md @@ -76,37 +76,42 @@ Despite being given 1 core, they see all of the cpus and decide to spawn that ma **Standard Example:** This test will succeed when env var TESTVAR is foobar, and fail otherwise. -``` + +```shell #!/bin/bash [ "$TESTVAR" = "foobar" ] ``` -``` -./bazel test \ + +```shell +$ ./bazel test \ --remote_executor=grpc://127.0.0.1:8980 --noremote_accept_cached --nocache_test_results \ //env_test:main FAIL ``` -``` -./bazel test --remote_default_exec_properties='env-vars={"TESTVAR": "foobar"}' \ +```shell +$ ./bazel test --remote_default_exec_properties='env-vars={"TESTVAR": "foobar"}' \ --remote_executor=grpc://127.0.0.1:8980 --noremote_accept_cached --nocache_test_results \ //env_test:main PASS ``` + **Template Example:** If you give a range of cores, buildfarm has the authority to decide how many your operation actually claims. You can let buildfarm resolve this value for you (via [mustache](https://mustache.github.io/)). -``` +```bash #!/bin/bash [ "$MKL_NUM_THREADS" = "1" ] ``` -``` -./bazel test \ + +```shell +$ ./bazel test \ --remote_executor=grpc://127.0.0.1:8980 --noremote_accept_cached --nocache_test_results \ //env_test:main FAIL ``` -``` -./bazel test \ + +```shell +$ ./bazel test \ --remote_default_exec_properties='env-vars="MKL_NUM_THREADS": "{{limits.cpu.claimed}}"' \ --remote_executor=grpc://127.0.0.1:8980 --noremote_accept_cached --nocache_test_results \ //env_test:main diff --git a/_site/docs/quick_start.md b/_site/docs/quick_start.md index 7af957ee63..8a9a9234db 100644 --- a/_site/docs/quick_start.md +++ b/_site/docs/quick_start.md @@ -25,7 +25,8 @@ Let's start with a bazel workspace with a single file to compile into an executa Create a new directory for our workspace and add the following files: `main.cc`: -``` + +```c #include int main( int argc, char *argv[] ) @@ -35,7 +36,8 @@ int main( int argc, char *argv[] ) ``` `BUILD`: -``` + +```starlark cc_binary( name = "main", srcs = ["main.cc"], @@ -118,15 +120,18 @@ That `2 remote` indicates that your compile and link ran remotely. Congratulatio ## Container Quick Start To bring up a minimal buildfarm cluster, you can run: + +```shell +$ ./examples/bf-run start ``` -./examples/bf-run start -``` + This will start all of the necessary containers at the latest version. Once the containers are up, you can build with `bazel run --remote_executor=grpc://localhost:8980 :main`. To stop the containers, run: -``` -./examples/bf-run stop + +```shell +$ ./examples/bf-run stop ``` ## Next Steps @@ -137,8 +142,8 @@ We've started our worker on the same host as our server, and also the same host You can now easily launch a new Buildfarm cluster locally or in AWS using an open sourced [Buildfarm Manager](https://github.com/80degreeswest/bfmgr). -``` -wget https://github.com/80degreeswest/bfmgr/releases/download/1.0.7/bfmgr-1.0.7.jar -java -jar bfmgr-1.0.7.jar -Navigate to http://localhost +```shell +$ wget https://github.com/80degreeswest/bfmgr/releases/download/1.0.7/bfmgr-1.0.7.jar +$ java -jar bfmgr-1.0.7.jar +$ open http://localhost ``` diff --git a/admin/main/pom.xml b/admin/main/pom.xml index 006217ccb4..8e128ee64c 100644 --- a/admin/main/pom.xml +++ b/admin/main/pom.xml @@ -94,7 +94,7 @@ org.json json - 20230227 + 20231013 org.projectlombok diff --git a/admin/main/src/main/resources/proto/buildfarm.proto b/admin/main/src/main/resources/proto/buildfarm.proto index 8a000132a3..d391847cb9 100644 --- a/admin/main/src/main/resources/proto/buildfarm.proto +++ b/admin/main/src/main/resources/proto/buildfarm.proto @@ -520,7 +520,7 @@ message RedisShardBackplaneConfig { int32 max_attempts = 33; } -message ShardInstanceConfig { +message ServerInstanceConfig { bool run_dispatched_monitor = 1; int32 dispatched_monitor_interval_seconds = 2; @@ -544,7 +544,7 @@ message ShardInstanceConfig { google.protobuf.Duration grpc_timeout = 8; } -message ShardWorkerInstanceConfig { +message WorkerInstanceConfig { // whether to stream stdout from processes bool stream_stdout = 6; @@ -568,7 +568,7 @@ message ShardWorkerInstanceConfig { } message ShardWorkerConfig { - ShardWorkerInstanceConfig shard_worker_instance_config = 1; + WorkerInstanceConfig shard_worker_instance_config = 1; int32 port = 2; @@ -836,7 +836,7 @@ message InstanceConfig { oneof type { MemoryInstanceConfig memory_instance_config = 3; - ShardInstanceConfig shard_instance_config = 4; + ServerInstanceConfig shard_instance_config = 4; } } diff --git a/defs.bzl b/defs.bzl index 0510eb1cd9..4b0a5e49bf 100644 --- a/defs.bzl +++ b/defs.bzl @@ -8,7 +8,7 @@ load( "@io_bazel_rules_docker//repositories:repositories.bzl", container_repositories = "repositories", ) -load("@io_grpc_grpc_java//:repositories.bzl", "grpc_java_repositories") +load("@io_grpc_grpc_java//:repositories.bzl", "IO_GRPC_GRPC_JAVA_OVERRIDE_TARGETS", "grpc_java_repositories") load("@com_google_protobuf//:protobuf_deps.bzl", "protobuf_deps") load("@com_grail_bazel_toolchain//toolchain:rules.bzl", "llvm_toolchain") load("@io_bazel_rules_k8s//k8s:k8s.bzl", "k8s_repositories") @@ -80,8 +80,8 @@ def buildfarm_init(name = "buildfarm"): "com.google.guava:guava:32.1.1-jre", "com.google.j2objc:j2objc-annotations:1.1", "com.google.jimfs:jimfs:1.1", - "com.google.protobuf:protobuf-java-util:3.10.0", - "com.google.protobuf:protobuf-java:3.10.0", + "com.google.protobuf:protobuf-java-util:3.19.1", + "com.google.protobuf:protobuf-java:3.19.1", "com.google.truth:truth:0.44", "org.slf4j:slf4j-simple:1.7.35", "com.googlecode.json-simple:json-simple:1.1.1", @@ -115,6 +115,7 @@ def buildfarm_init(name = "buildfarm"): "org.projectlombok:lombok:1.18.24", ], generate_compat_repositories = True, + override_targets = IO_GRPC_GRPC_JAVA_OVERRIDE_TARGETS, repositories = [ "https://repo1.maven.org/maven2", "https://mirrors.ibiblio.org/pub/mirrors/maven2", diff --git a/deps.bzl b/deps.bzl index f802fb27e7..55b12ab136 100644 --- a/deps.bzl +++ b/deps.bzl @@ -36,9 +36,9 @@ def archive_dependencies(third_party): # Needed for "well-known protos" and @com_google_protobuf//:protoc. { "name": "com_google_protobuf", - "sha256": "dd513a79c7d7e45cbaeaf7655289f78fd6b806e52dbbd7018ef4e3cf5cff697a", - "strip_prefix": "protobuf-3.15.8", - "urls": ["https://github.com/protocolbuffers/protobuf/archive/v3.15.8.zip"], + "sha256": "25f1292d4ea6666f460a2a30038eef121e6c3937ae0f61d610611dfb14b0bd32", + "strip_prefix": "protobuf-3.19.1", + "urls": ["https://github.com/protocolbuffers/protobuf/archive/v3.19.1.zip"], }, { "name": "com_github_bazelbuild_buildtools", @@ -151,9 +151,11 @@ def archive_dependencies(third_party): }, { "name": "rules_oss_audit", - "sha256": "02962810bcf82d0c66f929ccc163423f53773b8b154574ca956345523243e70d", - "strip_prefix": "rules_oss_audit-1b2690cefd5a960c181e0d89bf3c076294a0e6f4", - "url": "https://github.com/vmware/rules_oss_audit/archive/1b2690cefd5a960c181e0d89bf3c076294a0e6f4.zip", + "sha256": "8ee8376b05b5ddd2287b070e9a88ec85ef907d47f44e321ce5d4bc2b192eed4e", + "strip_prefix": "rules_oss_audit-167dab5b16abdb5996438f22364de544ff24693f", + "url": "https://github.com/vmware/rules_oss_audit/archive/167dab5b16abdb5996438f22364de544ff24693f.zip", + "patch_args": ["-p1"], + "patches": ["%s:rules_oss_audit_pyyaml.patch" % third_party], }, ] diff --git a/examples/config.yml b/examples/config.yml index bce19a3d60..008bf2e1ca 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -3,6 +3,7 @@ defaultActionTimeout: 600 maximumActionTimeout: 3600 maxEntrySizeBytes: 2147483648 # 2 * 1024 * 1024 * 1024 prometheusPort: 9090 +allowSymlinkTargetAbsolute: false server: instanceType: SHARD name: shard @@ -130,6 +131,7 @@ worker: alwaysUse: false selectForBlockNetwork: false selectForTmpFs: false + createSymlinkOutputs: false executionPolicies: - name: test executionWrapper: diff --git a/images.bzl b/images.bzl index 9ab0a8b0b7..939a752ed5 100644 --- a/images.bzl +++ b/images.bzl @@ -27,25 +27,9 @@ def buildfarm_images(): ) container_pull( - name = "ubuntu-bionic", - digest = "sha256:4bc527c7a288da405f2041928c63d0a6479a120ad63461c2f124c944def54be2", + name = "ubuntu-mantic", + digest = "sha256:1419bba15470a95242e917b3abcd8981ae36707b99df5ac705e1eee4d920c51c", registry = "index.docker.io", repository = "bazelbuild/buildfarm-worker-base", - tag = "bionic-java11-gcc", - ) - - container_pull( - name = "ubuntu-jammy", - digest = "sha256:da847ee259ebe7f00631a2f0146d9add60ff0f94b031a2e522ce94c78b1335c2", - registry = "index.docker.io", - repository = "bazelbuild/buildfarm-worker-base", - tag = "jammy-java11-gcc", - ) - - container_pull( - name = "amazon_corretto_java_image_base", - registry = "index.docker.io", - repository = "amazoncorretto", - tag = "19", - digest = "sha256:81d0df4412140416b27211c999e1f3c4565ae89a5cd92889475d20af422ba507", + tag = "mantic-java17-gcc", ) diff --git a/jvm_flags.bzl b/jvm_flags.bzl index 363f161465..440e0718aa 100644 --- a/jvm_flags.bzl +++ b/jvm_flags.bzl @@ -54,6 +54,12 @@ def ensure_accurate_metadata(): "//config:windows": ["-Dsun.nio.fs.ensureAccurateMetadata=true"], }) +def add_opens_sun_nio_fs(): + return select({ + "//conditions:default": [], + "//config:windows": ["--add-opens java.base/sun.nio.fs=ALL-UNNAMED"], + }) + def server_telemetry(): return select({ "//config:open_telemetry": SERVER_TELEMETRY_JVM_FLAGS, @@ -67,7 +73,7 @@ def worker_telemetry(): }) def server_jvm_flags(): - return RECOMMENDED_JVM_FLAGS + DEFAULT_LOGGING_CONFIG + ensure_accurate_metadata() + server_telemetry() + return RECOMMENDED_JVM_FLAGS + DEFAULT_LOGGING_CONFIG + ensure_accurate_metadata() + add_opens_sun_nio_fs() + server_telemetry() def worker_jvm_flags(): - return RECOMMENDED_JVM_FLAGS + DEFAULT_LOGGING_CONFIG + ensure_accurate_metadata() + worker_telemetry() + return RECOMMENDED_JVM_FLAGS + DEFAULT_LOGGING_CONFIG + ensure_accurate_metadata() + add_opens_sun_nio_fs() + worker_telemetry() diff --git a/persistentworkers/src/main/java/persistent/common/processes/JavaProcessWrapper.java b/persistentworkers/src/main/java/persistent/common/processes/JavaProcessWrapper.java index a27b6a9e99..89f2e6a5be 100644 --- a/persistentworkers/src/main/java/persistent/common/processes/JavaProcessWrapper.java +++ b/persistentworkers/src/main/java/persistent/common/processes/JavaProcessWrapper.java @@ -10,12 +10,16 @@ public class JavaProcessWrapper extends ProcessWrapper { + // Get the path of the JVM from the current process to avoid breaking the Bazel sandbox + public static final String CURRENT_JVM_COMMAND = + ProcessHandle.current().info().command().orElseThrow(() -> new RuntimeException("Unable to retrieve the path of the running JVM")); + public JavaProcessWrapper( Path workDir, String classPath, String fullClassName, String[] args ) throws IOException { super(workDir, cmdArgs( new String[]{ - "java", + CURRENT_JVM_COMMAND, "-cp", classPath, fullClassName diff --git a/persistentworkers/src/test/java/persistent/bazel/processes/PersistentWorkerTest.java b/persistentworkers/src/test/java/persistent/bazel/processes/PersistentWorkerTest.java index 0cdc68a7ff..9712394203 100644 --- a/persistentworkers/src/test/java/persistent/bazel/processes/PersistentWorkerTest.java +++ b/persistentworkers/src/test/java/persistent/bazel/processes/PersistentWorkerTest.java @@ -16,6 +16,7 @@ import persistent.bazel.client.PersistentWorker; import persistent.bazel.client.WorkerKey; +import persistent.common.processes.JavaProcessWrapper; import persistent.testutil.ProcessUtils; import persistent.testutil.WorkerUtils; @@ -55,7 +56,7 @@ public void endToEndAdder() throws Exception { ); ImmutableList initCmd = ImmutableList.of( - "java", + JavaProcessWrapper.CURRENT_JVM_COMMAND, "-cp", jarPath.toString(), "adder.Adder", diff --git a/src/main/java/build/buildfarm/BUILD b/src/main/java/build/buildfarm/BUILD index 601aa38eb4..3cbdeb5231 100644 --- a/src/main/java/build/buildfarm/BUILD +++ b/src/main/java/build/buildfarm/BUILD @@ -1,4 +1,4 @@ -load("//:jvm_flags.bzl", "ensure_accurate_metadata") +load("//:jvm_flags.bzl", "add_opens_sun_nio_fs", "ensure_accurate_metadata") package( default_visibility = ["//src:__subpackages__"], @@ -15,7 +15,7 @@ java_binary( classpath_resources = [ ":configs", ], - jvm_flags = ensure_accurate_metadata(), + jvm_flags = ensure_accurate_metadata() + add_opens_sun_nio_fs(), main_class = "build.buildfarm.server.BuildFarmServer", visibility = ["//visibility:public"], runtime_deps = [ @@ -29,7 +29,7 @@ java_binary( classpath_resources = [ ":configs", ], - jvm_flags = ensure_accurate_metadata(), + jvm_flags = ensure_accurate_metadata() + add_opens_sun_nio_fs(), main_class = "build.buildfarm.worker.shard.Worker", visibility = ["//visibility:public"], runtime_deps = [ diff --git a/src/main/java/build/buildfarm/backplane/Backplane.java b/src/main/java/build/buildfarm/backplane/Backplane.java index f608d16b0f..df305099de 100644 --- a/src/main/java/build/buildfarm/backplane/Backplane.java +++ b/src/main/java/build/buildfarm/backplane/Backplane.java @@ -296,4 +296,24 @@ Map updateCasReadCount(Stream> casRe * @return total count of cas read count entries removed. */ int removeCasReadCountEntries(Stream digestsToBeRemoved) throws IOException; + + /** Set expiry time for digests */ + void updateDigestsExpiry(Iterable digests) throws IOException; + + /** + * Updates the read count for CAS entries based on the provided stream of digest and count. + * + * @param casReadCountStream A Stream of Digest and its corresponding read count. + * @return A Map containing the updated read counts for the specified CAS entries. + */ + Map updateCasReadCount(Stream> casReadCountStream) + throws IOException; + + /** + * Removes the CAS read count entries from the storage. + * + * @param digestsToBeRemoved CAS entries for which each read count needs to be removed. + * @return total count of cas read count entries removed. + */ + int removeCasReadCountEntries(Stream digestsToBeRemoved) throws IOException; } diff --git a/src/main/java/build/buildfarm/cas/BUILD b/src/main/java/build/buildfarm/cas/BUILD index ef66e171a6..cd58882d5e 100644 --- a/src/main/java/build/buildfarm/cas/BUILD +++ b/src/main/java/build/buildfarm/cas/BUILD @@ -28,6 +28,7 @@ java_library( "@maven//:io_grpc_grpc_core", "@maven//:io_grpc_grpc_protobuf", "@maven//:io_grpc_grpc_stub", + "@maven//:io_netty_netty_codec_http", "@maven//:io_prometheus_simpleclient", "@maven//:net_jcip_jcip_annotations", "@maven//:org_projectlombok_lombok", diff --git a/src/main/java/build/buildfarm/cas/cfc/CASFileCache.java b/src/main/java/build/buildfarm/cas/cfc/CASFileCache.java index c13d297ff6..d9ef94d9ba 100644 --- a/src/main/java/build/buildfarm/cas/cfc/CASFileCache.java +++ b/src/main/java/build/buildfarm/cas/cfc/CASFileCache.java @@ -92,6 +92,7 @@ import io.grpc.StatusException; import io.grpc.StatusRuntimeException; import io.grpc.stub.ServerCallStreamObserver; +import io.netty.handler.codec.http.QueryStringDecoder; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import io.prometheus.client.Histogram; @@ -99,6 +100,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedChannelException; import java.nio.file.FileAlreadyExistsException; @@ -610,6 +613,20 @@ public Blob get(Digest digest) { private static final int CHUNK_SIZE = 128 * 1024; + private static boolean shouldReadThrough(RequestMetadata requestMetadata) { + try { + URI uri = new URI(requestMetadata.getCorrelatedInvocationsId()); + QueryStringDecoder decoder = new QueryStringDecoder(uri); + return decoder + .parameters() + .getOrDefault("THROUGH", ImmutableList.of("false")) + .get(0) + .equals("true"); + } catch (URISyntaxException e) { + return false; + } + } + @Override public void get( Compressor.Value compressor, @@ -618,9 +635,28 @@ public void get( long count, ServerCallStreamObserver blobObserver, RequestMetadata requestMetadata) { + boolean readThrough = shouldReadThrough(requestMetadata); InputStream in; try { - in = newInput(compressor, digest, offset); + if (readThrough && !contains(digest, /* result=*/ null)) { + // really need to be able to reuse/restart the same write over + // multiple requests - if we get successive read throughs for a single + // digest, we should pick up from where we were last time + // Also servers should affinitize + // And share data, so that they can pick the same worker to pull from + // if possible. + Write write = getWrite(compressor, digest, UUID.randomUUID(), requestMetadata); + blobObserver.setOnCancelHandler(write::reset); + in = + new ReadThroughInputStream( + newExternalInput(compressor, digest, 0), + localOffset -> newTransparentInput(compressor, digest, localOffset), + digest.getSizeBytes(), + offset, + write); + } else { + in = newInput(compressor, digest, offset); + } } catch (IOException e) { blobObserver.onError(e); return; @@ -883,6 +919,9 @@ public synchronized void reset() { + key.getIdentifier(), e); } finally { + if (closedFuture != null) { + closedFuture.set(null); + } isReset = true; } } @@ -1919,7 +1958,7 @@ private Entry safeStorageInsertion(String key, Entry entry) { lock = keyLocks.get(key); } catch (ExecutionException e) { // impossible without exception instantiating lock - throw new RuntimeException(e); + throw new RuntimeException(e.getCause()); } lock.lock(); diff --git a/src/main/java/build/buildfarm/common/config/BuildfarmConfigs.java b/src/main/java/build/buildfarm/common/config/BuildfarmConfigs.java index 444d25a13d..05d7915b27 100644 --- a/src/main/java/build/buildfarm/common/config/BuildfarmConfigs.java +++ b/src/main/java/build/buildfarm/common/config/BuildfarmConfigs.java @@ -36,6 +36,7 @@ public final class BuildfarmConfigs { private long maximumActionTimeout = 3600; private long maxEntrySizeBytes = 2147483648L; // 2 * 1024 * 1024 * 1024 private int prometheusPort = 9090; + private boolean allowSymlinkTargetAbsolute = false; private Server server = new Server(); private Backplane backplane = new Backplane(); private Worker worker = new Worker(); @@ -71,7 +72,7 @@ public static BuildfarmConfigs loadServerConfigs(String[] args) throws Configura log.severe("Could not parse yml configuration file." + e); throw new RuntimeException(e); } - if (!options.publicName.isEmpty()) { + if (!Strings.isNullOrEmpty(options.publicName)) { buildfarmConfigs.getServer().setPublicName(options.publicName); } if (options.port > 0) { @@ -99,12 +100,18 @@ public static BuildfarmConfigs loadWorkerConfigs(String[] args) throws Configura if (!Strings.isNullOrEmpty(options.publicName)) { buildfarmConfigs.getWorker().setPublicName(options.publicName); } + if (options.port >= 0) { + buildfarmConfigs.getWorker().setPort(options.port); + } if (options.prometheusPort >= 0) { buildfarmConfigs.setPrometheusPort(options.prometheusPort); } if (!Strings.isNullOrEmpty(options.redisUri)) { buildfarmConfigs.getBackplane().setRedisUri(options.redisUri); } + if (!Strings.isNullOrEmpty(options.root)) { + buildfarmConfigs.getWorker().setRoot(options.root); + } adjustWorkerConfigs(buildfarmConfigs); return buildfarmConfigs; } diff --git a/src/main/java/build/buildfarm/common/config/BuildfarmOptions.java b/src/main/java/build/buildfarm/common/config/BuildfarmOptions.java index 1dbe0c7f98..45d6f4aea3 100644 --- a/src/main/java/build/buildfarm/common/config/BuildfarmOptions.java +++ b/src/main/java/build/buildfarm/common/config/BuildfarmOptions.java @@ -33,4 +33,7 @@ public class BuildfarmOptions extends OptionsBase { help = "URI for Redis connection. Use 'redis://' or 'rediss://' for the scheme", defaultValue = "") public String redisUri; + + @Option(name = "port", help = "Port for the buildfarm service.", defaultValue = "-1") + public int port; } diff --git a/src/main/java/build/buildfarm/common/config/ServerOptions.java b/src/main/java/build/buildfarm/common/config/ServerOptions.java index 35f47d6d13..b151c24c7b 100644 --- a/src/main/java/build/buildfarm/common/config/ServerOptions.java +++ b/src/main/java/build/buildfarm/common/config/ServerOptions.java @@ -18,9 +18,6 @@ /** Command-line options definition for example server. */ public class ServerOptions extends BuildfarmOptions { - @Option(name = "port", abbrev = 'p', help = "Port to use.", defaultValue = "-1") - public int port; - @Option(name = "public_name", abbrev = 'n', help = "Name of this server.", defaultValue = "") public String publicName; } diff --git a/src/main/java/build/buildfarm/common/config/Worker.java b/src/main/java/build/buildfarm/common/config/Worker.java index e987c1e370..c446134fe2 100644 --- a/src/main/java/build/buildfarm/common/config/Worker.java +++ b/src/main/java/build/buildfarm/common/config/Worker.java @@ -39,6 +39,7 @@ public class Worker { private int gracefulShutdownSeconds = 0; private ExecutionPolicy[] executionPolicies = {}; private SandboxSettings sandboxSettings = new SandboxSettings(); + private boolean createSymlinkOutputs = false; // These limited resources are only for the individual worker. // An example would be hardware resources such as GPUs. diff --git a/src/main/java/build/buildfarm/instance/server/BUILD b/src/main/java/build/buildfarm/instance/server/BUILD index ecff968e79..1c63e9896c 100644 --- a/src/main/java/build/buildfarm/instance/server/BUILD +++ b/src/main/java/build/buildfarm/instance/server/BUILD @@ -1,8 +1,8 @@ java_library( name = "server", srcs = [ - "AbstractServerInstance.java", "GetDirectoryFunction.java", + "NodeInstance.java", "OperationsMap.java", "WatchFuture.java", ], diff --git a/src/main/java/build/buildfarm/instance/server/AbstractServerInstance.java b/src/main/java/build/buildfarm/instance/server/NodeInstance.java similarity index 98% rename from src/main/java/build/buildfarm/instance/server/AbstractServerInstance.java rename to src/main/java/build/buildfarm/instance/server/NodeInstance.java index d828b7bd89..e44305f32e 100644 --- a/src/main/java/build/buildfarm/instance/server/AbstractServerInstance.java +++ b/src/main/java/build/buildfarm/instance/server/NodeInstance.java @@ -144,7 +144,7 @@ import lombok.extern.java.Log; @Log -public abstract class AbstractServerInstance implements Instance { +public abstract class NodeInstance implements Instance { private final String name; protected final ContentAddressableStorage contentAddressableStorage; protected final ActionCache actionCache; @@ -178,6 +178,8 @@ public abstract class AbstractServerInstance implements Instance { public static final String ENVIRONMENT_VARIABLES_NOT_SORTED = "The `Command`'s `environment_variables` are not correctly sorted by `name`."; + public static final String SYMLINK_TARGET_ABSOLUTE = "A symlink target is absolute."; + public static final String MISSING_ACTION = "The action was not found in the CAS."; public static final String MISSING_COMMAND = "The command was not found in the CAS."; @@ -227,7 +229,7 @@ public abstract class AbstractServerInstance implements Instance { public static final String NO_REQUEUE_COMPLETE_MESSAGE = "Operation %s not requeued. Operation has already completed."; - public AbstractServerInstance( + public NodeInstance( String name, DigestUtil digestUtil, ContentAddressableStorage contentAddressableStorage, @@ -790,6 +792,7 @@ public static void validateActionInputDirectory( Stack pathDigests, Set visited, Map directoriesIndex, + boolean allowSymlinkTargetAbsolute, Consumer onInputFile, Consumer onInputDirectory, Consumer onInputDigest, @@ -838,6 +841,14 @@ public static void validateActionInputDirectory( .setSubject("/" + directoryPath + ": " + lastSymlinkName + " > " + symlinkName) .setDescription(DIRECTORY_NOT_SORTED); } + String symlinkTarget = symlinkNode.getTarget(); + if (!allowSymlinkTargetAbsolute && symlinkTarget.charAt(0) == '/') { + preconditionFailure + .addViolationsBuilder() + .setType(VIOLATION_TYPE_INVALID) + .setSubject("/" + directoryPath + ": " + symlinkName + " -> " + symlinkTarget) + .setDescription(SYMLINK_TARGET_ABSOLUTE); + } /* FIXME serverside validity check? regex? Preconditions.checkState( isValidFilename(symlinkName), @@ -907,6 +918,7 @@ public static void validateActionInputDirectory( pathDigests, visited, directoriesIndex, + allowSymlinkTargetAbsolute, onInputFile, onInputDirectory, onInputDigest, @@ -922,6 +934,7 @@ private static void validateActionInputDirectoryDigest( Stack pathDigests, Set visited, Map directoriesIndex, + boolean allowSymlinkTargetAbsolute, Consumer onInputFile, Consumer onInputDirectory, Consumer onInputDigest, @@ -946,6 +959,7 @@ private static void validateActionInputDirectoryDigest( pathDigests, visited, directoriesIndex, + allowSymlinkTargetAbsolute, onInputFile, onInputDirectory, onInputDigest, @@ -1203,12 +1217,16 @@ protected void validateAction( ImmutableSet.Builder inputFilesBuilder = ImmutableSet.builder(); inputDirectoriesBuilder.add(ACTION_INPUT_ROOT_DIRECTORY_PATH); + boolean allowSymlinkTargetAbsolute = + getCacheCapabilities().getSymlinkAbsolutePathStrategy() + == SymlinkAbsolutePathStrategy.Value.ALLOWED; validateActionInputDirectoryDigest( ACTION_INPUT_ROOT_DIRECTORY_PATH, action.getInputRootDigest(), new Stack<>(), new HashSet<>(), directoriesIndex, + allowSymlinkTargetAbsolute, inputFilesBuilder::add, inputDirectoriesBuilder::add, onInputDigest, @@ -1949,19 +1967,18 @@ public ServerCapabilities getCapabilities() { @Override public WorkerProfileMessage getWorkerProfile() { throw new UnsupportedOperationException( - "AbstractServerInstance doesn't support getWorkerProfile() method."); + "NodeInstance doesn't support getWorkerProfile() method."); } @Override public WorkerListMessage getWorkerList() { - throw new UnsupportedOperationException( - "AbstractServerInstance doesn't support getWorkerList() method."); + throw new UnsupportedOperationException("NodeInstance doesn't support getWorkerList() method."); } @Override public PrepareWorkerForGracefulShutDownRequestResults shutDownWorkerGracefully() { throw new UnsupportedOperationException( - "AbstractServerInstance doesn't support drainWorkerPipeline() method."); + "NodeInstance doesn't support drainWorkerPipeline() method."); } @Override diff --git a/src/main/java/build/buildfarm/instance/shard/CasWorkerMap.java b/src/main/java/build/buildfarm/instance/shard/CasWorkerMap.java index 794b296a1f..55b2933e42 100644 --- a/src/main/java/build/buildfarm/instance/shard/CasWorkerMap.java +++ b/src/main/java/build/buildfarm/instance/shard/CasWorkerMap.java @@ -121,4 +121,11 @@ Map> getMap(RedisClient client, Iterable blobDigests * @note Suggested return identifier: mapSize. */ int size(RedisClient client) throws IOException; + + /** + * @brief Set the expiry duration for the digests. + * @param client Client used for interacting with redis when not using cacheMap. + * @param blobDigests The blob digests to set new the expiry duration. + */ + void setExpire(RedisClient client, Iterable blobDigests) throws IOException; } diff --git a/src/main/java/build/buildfarm/instance/shard/JedisCasWorkerMap.java b/src/main/java/build/buildfarm/instance/shard/JedisCasWorkerMap.java index d035d10491..65d1f06b87 100644 --- a/src/main/java/build/buildfarm/instance/shard/JedisCasWorkerMap.java +++ b/src/main/java/build/buildfarm/instance/shard/JedisCasWorkerMap.java @@ -234,6 +234,17 @@ public int size(RedisClient client) throws IOException { return client.call(jedis -> ScanCount.get(jedis, name + ":*", 1000)); } + @Override + public void setExpire(RedisClient client, Iterable blobDigests) throws IOException { + client.run( + jedis -> { + for (Digest blobDigest : blobDigests) { + String key = redisCasKey(blobDigest); + jedis.expire(key, keyExpiration_s); + } + }); + } + /** * @brief Get the redis key name. * @details This is to be used for the direct redis implementation. diff --git a/src/main/java/build/buildfarm/instance/shard/RedisShardBackplane.java b/src/main/java/build/buildfarm/instance/shard/RedisShardBackplane.java index b048161b0a..c2c0ea6d37 100644 --- a/src/main/java/build/buildfarm/instance/shard/RedisShardBackplane.java +++ b/src/main/java/build/buildfarm/instance/shard/RedisShardBackplane.java @@ -1520,4 +1520,9 @@ public GetClientStartTimeResult getClientStartTime(GetClientStartTimeRequest req } return GetClientStartTimeResult.newBuilder().addAllClientStartTime(startTimes).build(); } + + @Override + public void updateDigestsExpiry(Iterable digests) throws IOException { + state.casWorkerMap.setExpire(client, digests); + } } diff --git a/src/main/java/build/buildfarm/instance/shard/RedissonCasWorkerMap.java b/src/main/java/build/buildfarm/instance/shard/RedissonCasWorkerMap.java index 234e15fb15..52b010c31f 100644 --- a/src/main/java/build/buildfarm/instance/shard/RedissonCasWorkerMap.java +++ b/src/main/java/build/buildfarm/instance/shard/RedissonCasWorkerMap.java @@ -209,6 +209,14 @@ public int size(RedisClient client) { return cacheMap.size(); } + @Override + public void setExpire(RedisClient client, Iterable blobDigests) { + for (Digest blobDigest : blobDigests) { + String key = cacheMapCasKey(blobDigest); + cacheMap.expireKey(key, keyExpiration_s, TimeUnit.SECONDS); + } + } + /** * @brief Get a random element from the set. * @details Assumes the set is not empty. diff --git a/src/main/java/build/buildfarm/instance/shard/RemoteInputStreamFactory.java b/src/main/java/build/buildfarm/instance/shard/RemoteInputStreamFactory.java index 640878707f..8a00e9ae96 100644 --- a/src/main/java/build/buildfarm/instance/shard/RemoteInputStreamFactory.java +++ b/src/main/java/build/buildfarm/instance/shard/RemoteInputStreamFactory.java @@ -30,7 +30,7 @@ import build.buildfarm.common.DigestUtil; import build.buildfarm.common.InputStreamFactory; import build.buildfarm.instance.Instance; -import build.buildfarm.instance.shard.ShardInstance.WorkersCallback; +import build.buildfarm.instance.shard.ServerInstance.WorkersCallback; import com.google.common.base.Throwables; import com.google.common.cache.LoadingCache; import com.google.common.collect.Iterables; diff --git a/src/main/java/build/buildfarm/instance/shard/ShardInstance.java b/src/main/java/build/buildfarm/instance/shard/ServerInstance.java similarity index 96% rename from src/main/java/build/buildfarm/instance/shard/ShardInstance.java rename to src/main/java/build/buildfarm/instance/shard/ServerInstance.java index c1c07b08f0..b1cbed0e07 100644 --- a/src/main/java/build/buildfarm/instance/shard/ShardInstance.java +++ b/src/main/java/build/buildfarm/instance/shard/ServerInstance.java @@ -47,6 +47,7 @@ import build.bazel.remote.execution.v2.Action; import build.bazel.remote.execution.v2.ActionResult; import build.bazel.remote.execution.v2.BatchReadBlobsResponse.Response; +import build.bazel.remote.execution.v2.CacheCapabilities; import build.bazel.remote.execution.v2.Command; import build.bazel.remote.execution.v2.Compressor; import build.bazel.remote.execution.v2.Digest; @@ -60,6 +61,7 @@ import build.bazel.remote.execution.v2.Platform.Property; import build.bazel.remote.execution.v2.RequestMetadata; import build.bazel.remote.execution.v2.ResultsCachePolicy; +import build.bazel.remote.execution.v2.SymlinkAbsolutePathStrategy; import build.buildfarm.actioncache.ActionCache; import build.buildfarm.actioncache.ShardActionCache; import build.buildfarm.backplane.Backplane; @@ -79,7 +81,7 @@ import build.buildfarm.common.grpc.UniformDelegateServerCallStreamObserver; import build.buildfarm.instance.Instance; import build.buildfarm.instance.MatchListener; -import build.buildfarm.instance.server.AbstractServerInstance; +import build.buildfarm.instance.server.NodeInstance; import build.buildfarm.operations.EnrichedOperation; import build.buildfarm.operations.FindOperationsResults; import build.buildfarm.v1test.BackplaneStatus; @@ -133,10 +135,12 @@ import java.io.InputStream; import java.io.OutputStream; import java.time.Instant; +import java.util.AbstractMap; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.Deque; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -165,7 +169,7 @@ import lombok.extern.java.Log; @Log -public class ShardInstance extends AbstractServerInstance { +public class ServerInstance extends NodeInstance { private static final ListenableFuture IMMEDIATE_VOID_FUTURE = Futures.immediateFuture(null); private static final String TIMEOUT_OUT_OF_BOUNDS = @@ -257,14 +261,14 @@ private static Backplane createBackplane(String identifier) throws Configuration identifier, /* subscribeToBackplane=*/ true, configs.getServer().isRunFailsafeOperation(), - ShardInstance::stripOperation, - ShardInstance::stripQueuedOperation); + ServerInstance::stripOperation, + ServerInstance::stripQueuedOperation); } else { throw new IllegalArgumentException("Shard Backplane not set in config"); } } - public ShardInstance(String name, String identifier, DigestUtil digestUtil, Runnable onStop) + public ServerInstance(String name, String identifier, DigestUtil digestUtil, Runnable onStop) throws InterruptedException, ConfigurationException { this( name, @@ -274,7 +278,7 @@ public ShardInstance(String name, String identifier, DigestUtil digestUtil, Runn /* actionCacheFetchService=*/ BuildfarmExecutors.getActionCacheFetchServicePool()); } - private ShardInstance( + private ServerInstance( String name, DigestUtil digestUtil, Backplane backplane, @@ -326,7 +330,7 @@ void initializeCaches() { .build(); } - public ShardInstance( + public ServerInstance( String name, DigestUtil digestUtil, Backplane backplane, @@ -658,7 +662,7 @@ public ListenableFuture> findMissingBlobs( } if (configs.getServer().isFindMissingBlobsViaBackplane()) { - return findMissingBlobsViaBackplane(nonEmptyDigests); + return findMissingBlobsViaBackplane(nonEmptyDigests, requestMetadata); } return findMissingBlobsQueryingEachWorker(nonEmptyDigests, requestMetadata); @@ -725,24 +729,40 @@ private ListenableFuture> findMissingBlobsQueryingEachWorker( // out-of-date and the server lies about which blobs are actually present. We provide this // alternative strategy for calculating missing blobs. private ListenableFuture> findMissingBlobsViaBackplane( - Iterable nonEmptyDigests) { + Iterable nonEmptyDigests, RequestMetadata requestMetadata) { try { Set uniqueDigests = new HashSet<>(); nonEmptyDigests.forEach(uniqueDigests::add); Map> foundBlobs = backplane.getBlobDigestsWorkers(uniqueDigests); Set workerSet = backplane.getStorageWorkers(); Map workersStartTime = backplane.getWorkersStartTimeInEpochSecs(workerSet); - return immediateFuture( + Map> digestAndWorkersMap = uniqueDigests.stream() - .filter( // best effort to present digests only missing on active workers + .map( digest -> { Set initialWorkers = foundBlobs.getOrDefault(digest, Collections.emptySet()); - return filterAndAdjustWorkersForDigest( - digest, initialWorkers, workerSet, workersStartTime) - .isEmpty(); + return new AbstractMap.SimpleEntry<>( + digest, + filterAndAdjustWorkersForDigest( + digest, initialWorkers, workerSet, workersStartTime)); }) - .collect(Collectors.toList())); + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + ListenableFuture> missingDigestFuture = + immediateFuture( + digestAndWorkersMap.entrySet().stream() + .filter(entry -> entry.getValue().isEmpty()) + .map(Map.Entry::getKey) + .collect(Collectors.toList())); + return transformAsync( + missingDigestFuture, + (missingDigest) -> { + extendLeaseForDigests(digestAndWorkersMap, requestMetadata); + return immediateFuture(missingDigest); + }, + // Propagate context values but don't cascade its cancellation for downstream calls. + Context.current().fork().fixedContextExecutor(directExecutor())); } catch (Exception e) { log.log(Level.SEVERE, "find missing blob via backplane failed", e); return immediateFailedFuture(Status.fromThrowable(e).asException()); @@ -787,6 +807,29 @@ private Set filterAndAdjustWorkersForDigest( return workersStartedBeforeDigestInsertion; } + private void extendLeaseForDigests( + Map> digestAndWorkersMap, RequestMetadata requestMetadata) { + Map> workerAndDigestMap = new HashMap<>(); + digestAndWorkersMap.forEach( + (digest, workers) -> + workers.forEach( + worker -> + workerAndDigestMap.computeIfAbsent(worker, w -> new HashSet<>()).add(digest))); + + workerAndDigestMap.forEach( + (worker, digests) -> workerStub(worker).findMissingBlobs(digests, requestMetadata)); + + try { + backplane.updateDigestsExpiry(digestAndWorkersMap.keySet()); + } catch (IOException e) { + log.log( + Level.WARNING, + format( + "Failed to update expiry duration for digests (%s) insertion time", + digestAndWorkersMap.keySet())); + } + } + private void findMissingBlobsOnWorker( String requestId, Iterable blobDigests, @@ -1084,7 +1127,7 @@ public void onError(Throwable t) { backplane, workerSet, locationSet, - ShardInstance.this::workerStub, + ServerInstance.this::workerStub, blobDigest, directExecutor(), RequestMetadata.getDefaultInstance()), @@ -2249,7 +2292,7 @@ public ListenableFuture queue(ExecuteEntry executeEntry, Poller poller, Du log.log( Level.FINER, format( - "ShardInstance(%s): checkCache(%s): %sus elapsed", + "ServerInstance(%s): checkCache(%s): %sus elapsed", getName(), operation.getName(), checkCacheUSecs)); return IMMEDIATE_VOID_FUTURE; } @@ -2276,7 +2319,7 @@ private ListenableFuture transformAndQueue( log.log( Level.FINER, format( - "ShardInstance(%s): queue(%s): fetching action %s", + "ServerInstance(%s): queue(%s): fetching action %s", getName(), operation.getName(), actionDigest.getHash())); RequestMetadata requestMetadata = executeEntry.getRequestMetadata(); ListenableFuture actionFuture = @@ -2319,7 +2362,7 @@ private ListenableFuture transformAndQueue( log.log( Level.FINER, format( - "ShardInstance(%s): queue(%s): fetched action %s transforming queuedOperation", + "ServerInstance(%s): queue(%s): fetched action %s transforming queuedOperation", getName(), operation.getName(), actionDigest.getHash())); Stopwatch transformStopwatch = Stopwatch.createStarted(); return transform( @@ -2349,7 +2392,7 @@ private ListenableFuture transformAndQueue( log.log( Level.FINER, format( - "ShardInstance(%s): queue(%s): queuedOperation %s transformed, validating", + "ServerInstance(%s): queue(%s): queuedOperation %s transformed, validating", getName(), operation.getName(), DigestUtil.toString( @@ -2371,7 +2414,7 @@ private ListenableFuture transformAndQueue( log.log( Level.FINER, format( - "ShardInstance(%s): queue(%s): queuedOperation %s validated, uploading", + "ServerInstance(%s): queue(%s): queuedOperation %s validated, uploading", getName(), operation.getName(), DigestUtil.toString( @@ -2423,7 +2466,7 @@ public void onSuccess(ProfiledQueuedOperationMetadata profiledQueuedMetadata) { log.log( Level.FINER, format( - "ShardInstance(%s): queue(%s): %dus checkCache, %dus transform, %dus validate, %dus upload, %dus queue, %dus elapsed", + "ServerInstance(%s): queue(%s): %dus checkCache, %dus transform, %dus validate, %dus upload, %dus queue, %dus elapsed", getName(), queueOperation.getName(), checkCacheUSecs, @@ -2726,4 +2769,16 @@ private boolean inDenyList(RequestMetadata requestMetadata) throws IOException { } return backplane.isBlacklisted(requestMetadata); } + + @Override + protected CacheCapabilities getCacheCapabilities() { + SymlinkAbsolutePathStrategy.Value symlinkAbsolutePathStrategy = + configs.isAllowSymlinkTargetAbsolute() + ? SymlinkAbsolutePathStrategy.Value.ALLOWED + : SymlinkAbsolutePathStrategy.Value.DISALLOWED; + return super.getCacheCapabilities() + .toBuilder() + .setSymlinkAbsolutePathStrategy(symlinkAbsolutePathStrategy) + .build(); + } } diff --git a/src/main/java/build/buildfarm/server/BUILD b/src/main/java/build/buildfarm/server/BUILD index eccbf4fda2..2cd750a4ff 100644 --- a/src/main/java/build/buildfarm/server/BUILD +++ b/src/main/java/build/buildfarm/server/BUILD @@ -16,6 +16,8 @@ java_library( "//src/main/java/build/buildfarm/metrics/prometheus", "//src/main/java/build/buildfarm/server/services", "//src/main/protobuf:build_buildfarm_v1test_buildfarm_java_proto", + "@io_grpc_grpc_proto//:health_java_proto", + "@io_grpc_grpc_proto//:health_proto", "@maven//:com_github_pcj_google_options", "@maven//:com_google_guava_guava", "@maven//:io_grpc_grpc_api", diff --git a/src/main/java/build/buildfarm/server/BuildFarmServer.java b/src/main/java/build/buildfarm/server/BuildFarmServer.java index a20f4a6f77..d963454de0 100644 --- a/src/main/java/build/buildfarm/server/BuildFarmServer.java +++ b/src/main/java/build/buildfarm/server/BuildFarmServer.java @@ -29,7 +29,7 @@ import build.buildfarm.common.services.ByteStreamService; import build.buildfarm.common.services.ContentAddressableStorageService; import build.buildfarm.instance.Instance; -import build.buildfarm.instance.shard.ShardInstance; +import build.buildfarm.instance.shard.ServerInstance; import build.buildfarm.metrics.prometheus.PrometheusPublisher; import build.buildfarm.server.services.ActionCacheService; import build.buildfarm.server.services.CapabilitiesService; @@ -88,20 +88,20 @@ public class BuildFarmServer extends LoggingMain { */ public void prepareServerForGracefulShutdown() { if (configs.getServer().getGracefulShutdownSeconds() == 0) { - System.err.println( + log.info( String.format("Graceful Shutdown is not enabled. Server is shutting down immediately.")); } else { try { - System.err.println( + log.info( String.format( "Graceful Shutdown - Waiting %d to allow connections to drain.", configs.getServer().getGracefulShutdownSeconds())); SECONDS.sleep(configs.getServer().getGracefulShutdownSeconds()); } catch (InterruptedException e) { - System.err.println( + log.info( "Graceful Shutdown - The server graceful shutdown is interrupted: " + e.getMessage()); } finally { - System.err.println( + log.info( String.format( "Graceful Shutdown - It took the server %d seconds to shutdown", configs.getServer().getGracefulShutdownSeconds())); @@ -109,9 +109,9 @@ public void prepareServerForGracefulShutdown() { } } - private ShardInstance createInstance() + private ServerInstance createInstance() throws IOException, ConfigurationException, InterruptedException { - return new ShardInstance( + return new ServerInstance( configs.getServer().getName(), configs.getServer().getSession() + "-" + configs.getServer().getName(), new DigestUtil(configs.getDigestFunction()), @@ -195,8 +195,10 @@ synchronized void stop() throws InterruptedException { private void shutdown() throws InterruptedException { log.info("*** shutting down gRPC server since JVM is shutting down"); prepareServerForGracefulShutdown(); - healthStatusManager.setStatus( - HealthStatusManager.SERVICE_NAME_ALL_SERVICES, ServingStatus.NOT_SERVING); + if (healthStatusManager != null) { + healthStatusManager.setStatus( + HealthStatusManager.SERVICE_NAME_ALL_SERVICES, ServingStatus.NOT_SERVING); + } PrometheusPublisher.stopHttpServer(); healthCheckMetric.labels("stop").inc(); try { diff --git a/src/main/java/build/buildfarm/tools/Executor.java b/src/main/java/build/buildfarm/tools/Executor.java index 4f2f9f3d6f..901ff33a95 100644 --- a/src/main/java/build/buildfarm/tools/Executor.java +++ b/src/main/java/build/buildfarm/tools/Executor.java @@ -238,7 +238,7 @@ private static void loadFilesIntoCAS(String instanceName, Channel channel, Path ByteStreamStub bsStub = ByteStreamGrpc.newStub(channel); for (Digest missingDigest : missingDigests) { - Path path = blobsDir.resolve(missingDigest.getHash() + "_" + missingDigest.getSizeBytes()); + Path path = blobsDir.resolve(missingDigest.getHash()); if (missingDigest.getSizeBytes() < Size.mbToBytes(1)) { Request request = Request.newBuilder() diff --git a/src/main/java/build/buildfarm/worker/shard/BUILD b/src/main/java/build/buildfarm/worker/shard/BUILD index ec48ec5be0..b62e4369de 100644 --- a/src/main/java/build/buildfarm/worker/shard/BUILD +++ b/src/main/java/build/buildfarm/worker/shard/BUILD @@ -21,6 +21,8 @@ java_library( "//src/main/protobuf:build_buildfarm_v1test_buildfarm_java_grpc", "//src/main/protobuf:build_buildfarm_v1test_buildfarm_java_proto", "@googleapis//:google_rpc_error_details_java_proto", + "@io_grpc_grpc_proto//:health_java_proto", + "@io_grpc_grpc_proto//:health_proto", "@maven//:com_github_ben_manes_caffeine_caffeine", "@maven//:com_github_pcj_google_options", "@maven//:com_google_code_findbugs_jsr305", diff --git a/src/main/java/build/buildfarm/worker/shard/CFCExecFileSystem.java b/src/main/java/build/buildfarm/worker/shard/CFCExecFileSystem.java index 041868f69c..dba809fdcf 100644 --- a/src/main/java/build/buildfarm/worker/shard/CFCExecFileSystem.java +++ b/src/main/java/build/buildfarm/worker/shard/CFCExecFileSystem.java @@ -82,6 +82,9 @@ class CFCExecFileSystem implements ExecFileSystem { // indicate symlinking above for a set of matching paths private final Iterable linkedInputDirectories; + // permit symlinks to point to absolute paths in inputs + private final boolean allowSymlinkTargetAbsolute; + private final Map> rootInputFiles = new ConcurrentHashMap<>(); private final Map> rootInputDirectories = new ConcurrentHashMap<>(); private final ExecutorService fetchService = BuildfarmExecutors.getFetchServicePool(); @@ -95,6 +98,7 @@ class CFCExecFileSystem implements ExecFileSystem { @Nullable UserPrincipal owner, boolean linkInputDirectories, Iterable linkedInputDirectories, + boolean allowSymlinkTargetAbsolute, ExecutorService removeDirectoryService, ExecutorService accessRecorder) { this.root = root; @@ -104,6 +108,7 @@ class CFCExecFileSystem implements ExecFileSystem { this.linkedInputDirectories = Iterables.transform( linkedInputDirectories, realInputDirectory -> Pattern.compile(realInputDirectory)); + this.allowSymlinkTargetAbsolute = allowSymlinkTargetAbsolute; this.removeDirectoryService = removeDirectoryService; this.accessRecorder = accessRecorder; } @@ -179,7 +184,7 @@ public InputStream newInput(Compressor.Value compressor, Digest digest, long off private ListenableFuture putSymlink(Path path, SymlinkNode symlinkNode) { Path symlinkPath = path.resolve(symlinkNode.getName()); Path relativeTargetPath = path.getFileSystem().getPath(symlinkNode.getTarget()); - checkState(!relativeTargetPath.isAbsolute()); + checkState(allowSymlinkTargetAbsolute || !relativeTargetPath.isAbsolute()); return listeningDecorator(fetchService) .submit( () -> { @@ -397,16 +402,25 @@ private Set linkedDirectories( return ImmutableSet.of(); } + private OutputDirectory createOutputDirectory(Command command) { + Iterable files; + Iterable dirs; + if (command.getOutputPathsCount() != 0) { + files = command.getOutputPathsList(); + dirs = ImmutableList.of(); // output paths require the action to create their own directory + } else { + files = command.getOutputFilesList(); + dirs = command.getOutputDirectoriesList(); + } + return OutputDirectory.parse(files, dirs, command.getEnvironmentVariablesList()); + } + @Override public Path createExecDir( String operationName, Map directoriesIndex, Action action, Command command) throws IOException, InterruptedException { Digest inputRootDigest = action.getInputRootDigest(); - OutputDirectory outputDirectory = - OutputDirectory.parse( - command.getOutputFilesList(), - command.getOutputDirectoriesList(), - command.getEnvironmentVariablesList()); + OutputDirectory outputDirectory = createOutputDirectory(command); Path execDir = root.resolve(operationName); if (Files.exists(execDir)) { diff --git a/src/main/java/build/buildfarm/worker/shard/ShardWorkerContext.java b/src/main/java/build/buildfarm/worker/shard/ShardWorkerContext.java index c844f53963..98474b439c 100644 --- a/src/main/java/build/buildfarm/worker/shard/ShardWorkerContext.java +++ b/src/main/java/build/buildfarm/worker/shard/ShardWorkerContext.java @@ -31,6 +31,7 @@ import build.bazel.remote.execution.v2.ExecutionStage; import build.bazel.remote.execution.v2.FileNode; import build.bazel.remote.execution.v2.Platform; +import build.bazel.remote.execution.v2.SymlinkNode; import build.bazel.remote.execution.v2.Tree; import build.buildfarm.backplane.Backplane; import build.buildfarm.common.CommandUtils; @@ -572,6 +573,7 @@ private void uploadOutputFile( static class OutputDirectoryContext { private final List files = new ArrayList<>(); private final List directories = new ArrayList<>(); + private final List symlinks = new ArrayList<>(); void addFile(FileNode fileNode) { files.add(fileNode); @@ -581,10 +583,19 @@ void addDirectory(DirectoryNode directoryNode) { directories.add(directoryNode); } + void addSymlink(SymlinkNode symlinkNode) { + symlinks.add(symlinkNode); + } + Directory toDirectory() { files.sort(Comparator.comparing(FileNode::getName)); directories.sort(Comparator.comparing(DirectoryNode::getName)); - return Directory.newBuilder().addAllFiles(files).addAllDirectories(directories).build(); + symlinks.sort(Comparator.comparing(SymlinkNode::getName)); + return Directory.newBuilder() + .addAllFiles(files) + .addAllDirectories(directories) + .addAllSymlinks(symlinks) + .build(); } } @@ -621,8 +632,30 @@ private void uploadOutputDirectory( @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + if (configs.getWorker().isCreateSymlinkOutputs() && attrs.isSymbolicLink()) { + visitSymbolicLink(file); + } else { + visitRegularFile(file, attrs); + } + return FileVisitResult.CONTINUE; + } + + private void visitSymbolicLink(Path file) throws IOException { + // TODO convert symlinks with absolute targets within execution root to relative ones + currentDirectory.addSymlink( + SymlinkNode.newBuilder() + .setName(file.getFileName().toString()) + .setTarget(Files.readSymbolicLink(file).toString()) + .build()); + } + + private void visitRegularFile(Path file, BasicFileAttributes attrs) throws IOException { Digest digest; try { + // should we create symlink nodes in output? + // is buildstream trying to execute in a specific container?? + // can get to NSFE for nonexistent symlinks + // can fail outright for a symlink to a directory digest = getDigestUtil().compute(file); } catch (NoSuchFileException e) { log.log( @@ -631,7 +664,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) "error visiting file %s under output dir %s", outputDirPath.relativize(file), outputDirPath.toAbsolutePath()), e); - return FileVisitResult.CONTINUE; + return; } // should we cast to PosixFilePermissions and do gymnastics there for executable? @@ -655,7 +688,6 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) .setDescription( "An output could not be uploaded because it exceeded the maximum size of an entry"); } - return FileVisitResult.CONTINUE; } @Override diff --git a/src/main/java/build/buildfarm/worker/shard/Worker.java b/src/main/java/build/buildfarm/worker/shard/Worker.java index ac0852e874..2f48287b79 100644 --- a/src/main/java/build/buildfarm/worker/shard/Worker.java +++ b/src/main/java/build/buildfarm/worker/shard/Worker.java @@ -71,6 +71,7 @@ import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.health.v1.HealthCheckResponse.ServingStatus; +import io.grpc.protobuf.services.ProtoReflectionService; import io.grpc.services.HealthStatusManager; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; @@ -123,7 +124,7 @@ public final class Worker extends LoggingMain { private boolean inGracefulShutdown = false; private boolean isPaused = false; - private ShardWorkerInstance instance; + private WorkerInstance instance; @SuppressWarnings("deprecation") private final HealthStatusManager healthStatusManager = new HealthStatusManager(); @@ -138,6 +139,52 @@ public final class Worker extends LoggingMain { private AtomicBoolean released = new AtomicBoolean(true); @Nullable private CASAccessMetricsRecorder casAccessMetricsRecorder; + /** + * The method will prepare the worker for graceful shutdown when the worker is ready. Note on + * using stderr here instead of log. By the time this is called in PreDestroy, the log is no + * longer available and is not logging messages. + */ + public void prepareWorkerForGracefulShutdown() { + if (configs.getWorker().getGracefulShutdownSeconds() == 0) { + log.info( + String.format( + "Graceful Shutdown is not enabled. Worker is shutting down without finishing executions in progress.")); + } else { + inGracefulShutdown = true; + log.info( + "Graceful Shutdown - The current worker will not be registered again and should be shutdown gracefully!"); + pipeline.stopMatchingOperations(); + int scanRate = 30; // check every 30 seconds + int timeWaited = 0; + int timeOut = configs.getWorker().getGracefulShutdownSeconds(); + try { + if (pipeline.isEmpty()) { + log.info("Graceful Shutdown - no work in the pipeline."); + } else { + log.info(String.format("Graceful Shutdown - waiting for executions to finish.")); + } + while (!pipeline.isEmpty() && timeWaited < timeOut) { + SECONDS.sleep(scanRate); + timeWaited += scanRate; + log.info( + String.format( + "Graceful Shutdown - Pipeline is still not empty after %d seconds.", timeWaited)); + } + } catch (InterruptedException e) { + log.info( + "Graceful Shutdown - The worker gracefully shutdown is interrupted: " + e.getMessage()); + } finally { + log.info( + String.format( + "Graceful Shutdown - It took the worker %d seconds to %s", + timeWaited, + pipeline.isEmpty() + ? "finish all actions" + : "gracefully shutdown but still cannot finish all actions")); + } + } + } + private Worker() { super("BuildFarmShardWorker"); } @@ -160,6 +207,7 @@ private Server createServer( serverBuilder.addService(new ContentAddressableStorageService(instance)); serverBuilder.addService(new ByteStreamService(instance)); serverBuilder.addService(new ShutDownWorkerGracefully(this)); + serverBuilder.addService(ProtoReflectionService.newInstance()); // We will build a worker's server based on it's capabilities. // A worker that is capable of execution will construct an execution pipeline. @@ -339,6 +387,7 @@ private ExecFileSystem createCFCExecFileSystem( owner, configs.getWorker().isLinkInputDirectories(), configs.getWorker().getLinkedInputDirectories(), + configs.isAllowSymlinkTargetAbsolute(), removeDirectoryService, accessRecorder /* deadlineAfter=*/ @@ -542,8 +591,7 @@ public void start() throws ConfigurationException, InterruptedException, IOExcep remoteInputStreamFactory, removeDirectoryService, accessRecorder, storage); instance = - new ShardWorkerInstance( - configs.getWorker().getPublicName(), digestUtil, backplane, storage); + new WorkerInstance(configs.getWorker().getPublicName(), digestUtil, backplane, storage); // Create the appropriate writer for the context CasWriter writer; @@ -646,6 +694,7 @@ public synchronized void stop() throws InterruptedException { private void shutdown() throws InterruptedException { log.info("*** shutting down gRPC server since JVM is shutting down"); + prepareWorkerForGracefulShutdown(); PrometheusPublisher.stopHttpServer(); boolean interrupted = Thread.interrupted(); if (pipeline != null) { diff --git a/src/main/java/build/buildfarm/worker/shard/ShardWorkerInstance.java b/src/main/java/build/buildfarm/worker/shard/WorkerInstance.java similarity index 98% rename from src/main/java/build/buildfarm/worker/shard/ShardWorkerInstance.java rename to src/main/java/build/buildfarm/worker/shard/WorkerInstance.java index a891af7faa..e100417f53 100644 --- a/src/main/java/build/buildfarm/worker/shard/ShardWorkerInstance.java +++ b/src/main/java/build/buildfarm/worker/shard/WorkerInstance.java @@ -36,7 +36,7 @@ import build.buildfarm.common.Write; import build.buildfarm.common.grpc.UniformDelegateServerCallStreamObserver; import build.buildfarm.instance.MatchListener; -import build.buildfarm.instance.server.AbstractServerInstance; +import build.buildfarm.instance.server.NodeInstance; import build.buildfarm.operations.EnrichedOperation; import build.buildfarm.operations.FindOperationsResults; import build.buildfarm.v1test.BackplaneStatus; @@ -68,13 +68,13 @@ import lombok.extern.java.Log; @Log -public class ShardWorkerInstance extends AbstractServerInstance { +public class WorkerInstance extends NodeInstance { private static final Counter IO_METRIC = Counter.build().name("io_bytes_read").help("Read I/O (bytes)").register(); private final Backplane backplane; - public ShardWorkerInstance( + public WorkerInstance( String name, DigestUtil digestUtil, Backplane backplane, @@ -346,7 +346,7 @@ protected static ExecuteOperationMetadata expectExecuteOperationMetadata(Operati return null; } } else { - return AbstractServerInstance.expectExecuteOperationMetadata(operation); + return NodeInstance.expectExecuteOperationMetadata(operation); } } diff --git a/src/test/java/build/buildfarm/cas/BUILD b/src/test/java/build/buildfarm/cas/BUILD index 7355257bd0..3455c868db 100644 --- a/src/test/java/build/buildfarm/cas/BUILD +++ b/src/test/java/build/buildfarm/cas/BUILD @@ -1,10 +1,10 @@ -load("//:jvm_flags.bzl", "ensure_accurate_metadata") +load("//:jvm_flags.bzl", "add_opens_sun_nio_fs", "ensure_accurate_metadata") java_test( name = "tests", size = "small", srcs = glob(["**/*.java"]), - jvm_flags = ensure_accurate_metadata(), + jvm_flags = ensure_accurate_metadata() + add_opens_sun_nio_fs(), test_class = "build.buildfarm.AllTests", deps = [ "//src/main/java/build/buildfarm/backplane", diff --git a/src/test/java/build/buildfarm/cas/cfc/CASFileCacheTest.java b/src/test/java/build/buildfarm/cas/cfc/CASFileCacheTest.java index 8f1f24e55c..4f64fc0d5c 100644 --- a/src/test/java/build/buildfarm/cas/cfc/CASFileCacheTest.java +++ b/src/test/java/build/buildfarm/cas/cfc/CASFileCacheTest.java @@ -19,8 +19,11 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination; +import static java.lang.Thread.State.TERMINATED; +import static java.lang.Thread.State.WAITING; import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.fail; import static org.mockito.Mockito.any; @@ -58,6 +61,7 @@ import com.google.common.collect.Maps; import com.google.common.jimfs.Configuration; import com.google.common.jimfs.Jimfs; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -77,6 +81,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -1210,6 +1215,114 @@ protected InputStream newExternalInput( assertThat(expected).isNotNull(); } + @Test + public void testConcurrentWrites() throws Exception { + ByteString blob = ByteString.copyFromUtf8("concurrent write"); + Digest digest = DIGEST_UTIL.compute(blob); + UUID uuid = UUID.randomUUID(); + // The same instance of Write will be passed to both the threads, so that the both threads + // try to get same output stream. + Write write = + fileCache.getWrite( + Compressor.Value.IDENTITY, digest, uuid, RequestMetadata.getDefaultInstance()); + + CyclicBarrier barrier = new CyclicBarrier(3); + + Thread write1 = + new Thread( + () -> { + try { + ConcurrentWriteStreamObserver writeStreamObserver = + new ConcurrentWriteStreamObserver(write); + writeStreamObserver.registerCallback(); + barrier.await(); // let both the threads get same write stream. + writeStreamObserver.ownStream(); // let other thread get the ownership of stream + writeStreamObserver.write(blob); + writeStreamObserver.close(); + } catch (Exception e) { + // do nothing + } + }, + "FirstRequest"); + Thread write2 = + new Thread( + () -> { + try { + ConcurrentWriteStreamObserver writeStreamObserver = + new ConcurrentWriteStreamObserver(write); + writeStreamObserver.registerCallback(); + writeStreamObserver.ownStream(); // this thread will get the ownership of stream + barrier.await(); // let both the threads get same write stream. + while (write1.getState() != WAITING) ; // wait for first request to go in wait state + writeStreamObserver.write(blob); + writeStreamObserver.close(); + } catch (Exception e) { + // do nothing + } + }, + "SecondRequest"); + write1.start(); + write2.start(); + barrier.await(); // let both the requests reach the critical section + + // Wait for each write operation to complete, allowing a maximum of 100ms per write. + // Note: A 100ms wait time allowed 1000 * 8 successful test runs. + // In certain scenario, even this wait time may not be enough and test still be called flaky. + // But setting wait time 0 may cause test to wait forever (if there is issue in code) and the + // build might fail with timeout error. + write1.join(100); + write2.join(100); + + assertThat(write1.getState()).isEqualTo(TERMINATED); + assertThat(write2.getState()).isEqualTo(TERMINATED); + } + + static class ConcurrentWriteStreamObserver { + Write write; + FeedbackOutputStream out; + + ConcurrentWriteStreamObserver(Write write) { + this.write = write; + } + + void registerCallback() { + Futures.addCallback( + write.getFuture(), + new FutureCallback() { + @Override + public void onSuccess(Long committedSize) { + commit(); + } + + @Override + public void onFailure(Throwable t) { + // do nothing + } + }, + directExecutor()); + } + + synchronized void ownStream() throws Exception { + this.out = write.getOutput(10, MILLISECONDS, () -> {}); + } + /** + * Request 1 may invoke this method for request 2 or vice-versa via callback on + * write.getFuture(). Synchronization is necessary to prevent conflicts when this method is + * called simultaneously by different threads. + */ + synchronized void commit() { + // critical section + } + + void write(ByteString data) throws IOException { + data.writeTo(out); + } + + void close() throws IOException { + out.close(); + } + } + @RunWith(JUnit4.class) public static class NativeFileDirsIndexInMemoryCASFileCacheTest extends CASFileCacheTest { public NativeFileDirsIndexInMemoryCASFileCacheTest() throws IOException { diff --git a/src/test/java/build/buildfarm/common/BUILD b/src/test/java/build/buildfarm/common/BUILD index bda4f4243f..70ff4abc94 100644 --- a/src/test/java/build/buildfarm/common/BUILD +++ b/src/test/java/build/buildfarm/common/BUILD @@ -1,10 +1,10 @@ -load("//:jvm_flags.bzl", "ensure_accurate_metadata") +load("//:jvm_flags.bzl", "add_opens_sun_nio_fs", "ensure_accurate_metadata") java_test( name = "tests", size = "small", srcs = glob(["*.java"]), - jvm_flags = ensure_accurate_metadata(), + jvm_flags = ensure_accurate_metadata() + add_opens_sun_nio_fs(), test_class = "build.buildfarm.AllTests", deps = [ "//src/main/java/build/buildfarm/common", diff --git a/src/test/java/build/buildfarm/instance/server/AbstractServerInstanceTest.java b/src/test/java/build/buildfarm/instance/server/NodeInstanceTest.java similarity index 89% rename from src/test/java/build/buildfarm/instance/server/AbstractServerInstanceTest.java rename to src/test/java/build/buildfarm/instance/server/NodeInstanceTest.java index add9136633..454440281f 100644 --- a/src/test/java/build/buildfarm/instance/server/AbstractServerInstanceTest.java +++ b/src/test/java/build/buildfarm/instance/server/NodeInstanceTest.java @@ -17,12 +17,13 @@ import static build.buildfarm.common.Actions.checkPreconditionFailure; import static build.buildfarm.common.Errors.VIOLATION_TYPE_INVALID; import static build.buildfarm.common.Errors.VIOLATION_TYPE_MISSING; -import static build.buildfarm.instance.server.AbstractServerInstance.ACTION_INPUT_ROOT_DIRECTORY_PATH; -import static build.buildfarm.instance.server.AbstractServerInstance.DIRECTORY_NOT_SORTED; -import static build.buildfarm.instance.server.AbstractServerInstance.DUPLICATE_DIRENT; -import static build.buildfarm.instance.server.AbstractServerInstance.INVALID_COMMAND; -import static build.buildfarm.instance.server.AbstractServerInstance.OUTPUT_DIRECTORY_IS_OUTPUT_ANCESTOR; -import static build.buildfarm.instance.server.AbstractServerInstance.OUTPUT_FILE_IS_OUTPUT_ANCESTOR; +import static build.buildfarm.instance.server.NodeInstance.ACTION_INPUT_ROOT_DIRECTORY_PATH; +import static build.buildfarm.instance.server.NodeInstance.DIRECTORY_NOT_SORTED; +import static build.buildfarm.instance.server.NodeInstance.DUPLICATE_DIRENT; +import static build.buildfarm.instance.server.NodeInstance.INVALID_COMMAND; +import static build.buildfarm.instance.server.NodeInstance.OUTPUT_DIRECTORY_IS_OUTPUT_ANCESTOR; +import static build.buildfarm.instance.server.NodeInstance.OUTPUT_FILE_IS_OUTPUT_ANCESTOR; +import static build.buildfarm.instance.server.NodeInstance.SYMLINK_TARGET_ABSOLUTE; import static com.google.common.truth.Truth.assertThat; import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.mockito.Mockito.any; @@ -43,6 +44,7 @@ import build.bazel.remote.execution.v2.OutputDirectory; import build.bazel.remote.execution.v2.Platform; import build.bazel.remote.execution.v2.RequestMetadata; +import build.bazel.remote.execution.v2.SymlinkNode; import build.bazel.remote.execution.v2.Tree; import build.buildfarm.actioncache.ActionCache; import build.buildfarm.cas.ContentAddressableStorage; @@ -97,10 +99,10 @@ @RunWith(JUnit4.class) @Log -public class AbstractServerInstanceTest { +public class NodeInstanceTest { private static final DigestUtil DIGEST_UTIL = new DigestUtil(HashFunction.SHA256); - static class DummyServerInstance extends AbstractServerInstance { + static class DummyServerInstance extends NodeInstance { DummyServerInstance( ContentAddressableStorage contentAddressableStorage, ActionCache actionCache) { super( @@ -259,7 +261,7 @@ public PrepareWorkerForGracefulShutDownRequestResults shutDownWorkerGracefully() @Test public void duplicateFileInputIsInvalid() { PreconditionFailure.Builder preconditionFailure = PreconditionFailure.newBuilder(); - AbstractServerInstance.validateActionInputDirectory( + NodeInstance.validateActionInputDirectory( ACTION_INPUT_ROOT_DIRECTORY_PATH, Directory.newBuilder() .addAllFiles( @@ -270,6 +272,7 @@ public void duplicateFileInputIsInvalid() { /* pathDigests=*/ new Stack<>(), /* visited=*/ Sets.newHashSet(), /* directoriesIndex=*/ Maps.newHashMap(), + /* allowSymlinkTargetAbsolute=*/ false, /* onInputFile=*/ file -> {}, /* onInputDirectorie=*/ directory -> {}, /* onInputDigest=*/ digest -> {}, @@ -287,7 +290,7 @@ public void duplicateEmptyDirectoryCheckPasses() throws StatusException { Directory emptyDirectory = Directory.getDefaultInstance(); Digest emptyDirectoryDigest = DIGEST_UTIL.compute(emptyDirectory); PreconditionFailure.Builder preconditionFailure = PreconditionFailure.newBuilder(); - AbstractServerInstance.validateActionInputDirectory( + NodeInstance.validateActionInputDirectory( ACTION_INPUT_ROOT_DIRECTORY_PATH, Directory.newBuilder() .addAllDirectories( @@ -304,6 +307,7 @@ public void duplicateEmptyDirectoryCheckPasses() throws StatusException { /* pathDigests=*/ new Stack<>(), /* visited=*/ Sets.newHashSet(), /* directoriesIndex=*/ ImmutableMap.of(Digest.getDefaultInstance(), emptyDirectory), + /* allowSymlinkTargetAbsolute=*/ false, /* onInputFiles=*/ file -> {}, /* onInputDirectories=*/ directory -> {}, /* onInputDigests=*/ digest -> {}, @@ -316,7 +320,7 @@ public void duplicateEmptyDirectoryCheckPasses() throws StatusException { @Test public void unsortedFileInputIsInvalid() { PreconditionFailure.Builder preconditionFailure = PreconditionFailure.newBuilder(); - AbstractServerInstance.validateActionInputDirectory( + NodeInstance.validateActionInputDirectory( ACTION_INPUT_ROOT_DIRECTORY_PATH, Directory.newBuilder() .addAllFiles( @@ -327,6 +331,7 @@ public void unsortedFileInputIsInvalid() { /* pathDigests=*/ new Stack<>(), /* visited=*/ Sets.newHashSet(), /* directoriesIndex=*/ Maps.newHashMap(), + /* allowSymlinkTargetAbsolute=*/ false, /* onInputFiles=*/ file -> {}, /* onInputDirectories=*/ directory -> {}, /* onInputDigests=*/ digest -> {}, @@ -344,7 +349,7 @@ public void duplicateDirectoryInputIsInvalid() { Directory emptyDirectory = Directory.getDefaultInstance(); Digest emptyDirectoryDigest = DIGEST_UTIL.compute(emptyDirectory); PreconditionFailure.Builder preconditionFailure = PreconditionFailure.newBuilder(); - AbstractServerInstance.validateActionInputDirectory( + NodeInstance.validateActionInputDirectory( ACTION_INPUT_ROOT_DIRECTORY_PATH, Directory.newBuilder() .addAllDirectories( @@ -361,6 +366,7 @@ public void duplicateDirectoryInputIsInvalid() { /* pathDigests=*/ new Stack<>(), /* visited=*/ Sets.newHashSet(), /* directoriesIndex=*/ ImmutableMap.of(emptyDirectoryDigest, emptyDirectory), + /* allowSymlinkTargetAbsolute=*/ false, /* onInputFiles=*/ file -> {}, /* onInputDirectories=*/ directory -> {}, /* onInputDigests=*/ digest -> {}, @@ -378,7 +384,7 @@ public void unsortedDirectoryInputIsInvalid() { Directory emptyDirectory = Directory.getDefaultInstance(); Digest emptyDirectoryDigest = DIGEST_UTIL.compute(emptyDirectory); PreconditionFailure.Builder preconditionFailure = PreconditionFailure.newBuilder(); - AbstractServerInstance.validateActionInputDirectory( + NodeInstance.validateActionInputDirectory( ACTION_INPUT_ROOT_DIRECTORY_PATH, Directory.newBuilder() .addAllDirectories( @@ -395,6 +401,7 @@ public void unsortedDirectoryInputIsInvalid() { /* pathDigests=*/ new Stack<>(), /* visited=*/ Sets.newHashSet(), /* directoriesIndex=*/ ImmutableMap.of(emptyDirectoryDigest, emptyDirectory), + /* allowSymlinkTargetAbsolute=*/ false, /* onInputFiles=*/ file -> {}, /* onInputDirectories=*/ directory -> {}, /* onInputDigests=*/ digest -> {}, @@ -407,10 +414,52 @@ public void unsortedDirectoryInputIsInvalid() { assertThat(violation.getDescription()).isEqualTo(DIRECTORY_NOT_SORTED); } + @Test + public void shouldValidateIfSymlinkTargetAbsolute() { + // invalid for disallowed + PreconditionFailure.Builder preconditionFailure = PreconditionFailure.newBuilder(); + Directory absoluteSymlinkDirectory = + Directory.newBuilder() + .addSymlinks(SymlinkNode.newBuilder().setName("foo").setTarget("/root/secret").build()) + .build(); + NodeInstance.validateActionInputDirectory( + ACTION_INPUT_ROOT_DIRECTORY_PATH, + absoluteSymlinkDirectory, + /* pathDigests=*/ new Stack<>(), + /* visited=*/ Sets.newHashSet(), + /* directoriesIndex=*/ Maps.newHashMap(), + /* allowSymlinkTargetAbsolute=*/ false, + /* onInputFile=*/ file -> {}, + /* onInputDirectorie=*/ directory -> {}, + /* onInputDigest=*/ digest -> {}, + preconditionFailure); + + assertThat(preconditionFailure.getViolationsCount()).isEqualTo(1); + Violation violation = preconditionFailure.getViolationsList().get(0); + assertThat(violation.getType()).isEqualTo(VIOLATION_TYPE_INVALID); + assertThat(violation.getSubject()).isEqualTo("/: foo -> /root/secret"); + assertThat(violation.getDescription()).isEqualTo(SYMLINK_TARGET_ABSOLUTE); + + // valid for allowed + preconditionFailure = PreconditionFailure.newBuilder(); + NodeInstance.validateActionInputDirectory( + ACTION_INPUT_ROOT_DIRECTORY_PATH, + absoluteSymlinkDirectory, + /* pathDigests=*/ new Stack<>(), + /* visited=*/ Sets.newHashSet(), + /* directoriesIndex=*/ Maps.newHashMap(), + /* allowSymlinkTargetAbsolute=*/ true, + /* onInputFile=*/ file -> {}, + /* onInputDirectorie=*/ directory -> {}, + /* onInputDigest=*/ digest -> {}, + preconditionFailure); + assertThat(preconditionFailure.getViolationsCount()).isEqualTo(0); + } + @Test public void nestedOutputDirectoriesAreInvalid() { PreconditionFailure.Builder preconditionFailureBuilder = PreconditionFailure.newBuilder(); - AbstractServerInstance.validateOutputs( + NodeInstance.validateOutputs( ImmutableSet.of(), ImmutableSet.of(), ImmutableSet.of(), @@ -427,7 +476,7 @@ public void nestedOutputDirectoriesAreInvalid() { @Test public void outputDirectoriesContainingOutputFilesAreInvalid() { PreconditionFailure.Builder preconditionFailureBuilder = PreconditionFailure.newBuilder(); - AbstractServerInstance.validateOutputs( + NodeInstance.validateOutputs( ImmutableSet.of(), ImmutableSet.of(), ImmutableSet.of("foo/bar"), @@ -444,7 +493,7 @@ public void outputDirectoriesContainingOutputFilesAreInvalid() { @Test public void outputFilesAsOutputDirectoryAncestorsAreInvalid() { PreconditionFailure.Builder preconditionFailureBuilder = PreconditionFailure.newBuilder(); - AbstractServerInstance.validateOutputs( + NodeInstance.validateOutputs( ImmutableSet.of(), ImmutableSet.of(), ImmutableSet.of("foo"), @@ -460,7 +509,7 @@ public void outputFilesAsOutputDirectoryAncestorsAreInvalid() { @Test public void emptyArgumentListIsInvalid() { - AbstractServerInstance instance = new DummyServerInstance(); + NodeInstance instance = new DummyServerInstance(); PreconditionFailure.Builder preconditionFailureBuilder = PreconditionFailure.newBuilder(); instance.validateCommand( @@ -480,7 +529,7 @@ public void emptyArgumentListIsInvalid() { @Test public void absoluteWorkingDirectoryIsInvalid() { - AbstractServerInstance instance = new DummyServerInstance(); + NodeInstance instance = new DummyServerInstance(); PreconditionFailure.Builder preconditionFailureBuilder = PreconditionFailure.newBuilder(); instance.validateCommand( @@ -500,7 +549,7 @@ public void absoluteWorkingDirectoryIsInvalid() { @Test public void undeclaredWorkingDirectoryIsInvalid() { - AbstractServerInstance instance = new DummyServerInstance(); + NodeInstance instance = new DummyServerInstance(); Digest inputRootDigest = DIGEST_UTIL.compute(Directory.getDefaultInstance()); PreconditionFailure.Builder preconditionFailureBuilder = PreconditionFailure.newBuilder(); @@ -541,12 +590,13 @@ public void multipleIdenticalDirectoryMissingAreAllPreconditionFailures() { .setDigest(missingDirectoryDigest) .build())) .build(); - AbstractServerInstance.validateActionInputDirectory( + NodeInstance.validateActionInputDirectory( ACTION_INPUT_ROOT_DIRECTORY_PATH, root, /* pathDigests=*/ new Stack<>(), /* visited=*/ Sets.newHashSet(), /* directoriesIndex=*/ ImmutableMap.of(), + /* allowSymlinkTargetAbsolute=*/ false, /* onInputFiles=*/ file -> {}, /* onInputDirectories=*/ directory -> {}, /* onInputDigests=*/ digest -> {}, @@ -602,12 +652,13 @@ public void validationRevisitReplicatesPreconditionFailures() { DirectoryNode.newBuilder().setName("bar").setDigest(fooDigest).build(), DirectoryNode.newBuilder().setName("foo").setDigest(fooDigest).build())) .build(); - AbstractServerInstance.validateActionInputDirectory( + NodeInstance.validateActionInputDirectory( ACTION_INPUT_ROOT_DIRECTORY_PATH, root, /* pathDigests=*/ new Stack<>(), /* visited=*/ Sets.newHashSet(), /* directoriesIndex=*/ ImmutableMap.of(fooDigest, foo), + /* allowSymlinkTargetAbsolute=*/ false, /* onInputFiles=*/ file -> {}, /* onInputDirectories=*/ directory -> {}, /* onInputDigests=*/ digest -> {}, @@ -670,8 +721,7 @@ public void outputDirectoriesFilesAreEnsuredPresent() throws Exception { .build(); ContentAddressableStorage contentAddressableStorage = mock(ContentAddressableStorage.class); ActionCache actionCache = mock(ActionCache.class); - AbstractServerInstance instance = - new DummyServerInstance(contentAddressableStorage, actionCache); + NodeInstance instance = new DummyServerInstance(contentAddressableStorage, actionCache); Tree tree = Tree.newBuilder() @@ -731,7 +781,7 @@ public void fetchBlobWriteCompleteIsSuccess() throws Exception { Digest expectedDigest = contentDigest.toBuilder().setSizeBytes(-1).build(); ContentAddressableStorage contentAddressableStorage = mock(ContentAddressableStorage.class); - AbstractServerInstance instance = new DummyServerInstance(contentAddressableStorage, null); + NodeInstance instance = new DummyServerInstance(contentAddressableStorage, null); RequestMetadata requestMetadata = RequestMetadata.getDefaultInstance(); Write write = mock(Write.class); diff --git a/src/test/java/build/buildfarm/instance/shard/BUILD b/src/test/java/build/buildfarm/instance/shard/BUILD index cd0ae18d24..2f08b1003d 100644 --- a/src/test/java/build/buildfarm/instance/shard/BUILD +++ b/src/test/java/build/buildfarm/instance/shard/BUILD @@ -110,10 +110,10 @@ java_test( ) java_test( - name = "ShardInstanceTest", + name = "ServerInstanceTest", size = "small", srcs = [ - "ShardInstanceTest.java", + "ServerInstanceTest.java", "UnobservableWatcher.java", ], data = ["//examples:example_configs"], @@ -219,3 +219,22 @@ java_test( "@remote_apis//:build_bazel_remote_execution_v2_remote_execution_java_proto", ], ) + +java_test( + name = "JedisCasWorkerMapTest", + size = "small", + srcs = [ + "JedisCasWorkerMapTest.java", + ], + test_class = "build.buildfarm.AllTests", + deps = [ + "//src/main/java/build/buildfarm/common", + "//src/main/java/build/buildfarm/common/redis", + "//src/main/java/build/buildfarm/instance/shard", + "//src/main/protobuf:build_buildfarm_v1test_buildfarm_java_proto", + "//src/test/java/build/buildfarm:test_runner", + "//third_party/jedis", + "@maven//:com_github_fppt_jedis_mock", + "@maven//:com_google_truth_truth", + ], +) diff --git a/src/test/java/build/buildfarm/instance/shard/JedisCasWorkerMapTest.java b/src/test/java/build/buildfarm/instance/shard/JedisCasWorkerMapTest.java new file mode 100644 index 0000000000..caa69536c1 --- /dev/null +++ b/src/test/java/build/buildfarm/instance/shard/JedisCasWorkerMapTest.java @@ -0,0 +1,63 @@ +package build.buildfarm.instance.shard; + +import static com.google.common.truth.Truth.assertThat; + +import build.bazel.remote.execution.v2.Digest; +import build.buildfarm.common.DigestUtil; +import build.buildfarm.common.redis.RedisClient; +import com.github.fppt.jedismock.RedisServer; +import com.github.fppt.jedismock.server.ServiceOptions; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; + +@RunWith(JUnit4.class) +public class JedisCasWorkerMapTest { + + private static final String CAS_PREFIX = "ContentAddressableStorage"; + + private RedisServer redisServer; + private RedisClient redisClient; + private JedisCasWorkerMap jedisCasWorkerMap; + + @Before + public void setup() throws IOException { + redisServer = + RedisServer.newRedisServer() + .setOptions(ServiceOptions.defaultOptions().withClusterModeEnabled()) + .start(); + redisClient = + new RedisClient( + new JedisCluster( + Collections.singleton( + new HostAndPort(redisServer.getHost(), redisServer.getBindPort())))); + jedisCasWorkerMap = new JedisCasWorkerMap(CAS_PREFIX, 60); + } + + @Test + public void testSetExpire() throws IOException { + Digest testDigest1 = Digest.newBuilder().setHash("abc").build(); + Digest testDigest2 = Digest.newBuilder().setHash("xyz").build(); + + String casKey1 = CAS_PREFIX + ":" + DigestUtil.toString(testDigest1); + String casKey2 = CAS_PREFIX + ":" + DigestUtil.toString(testDigest2); + + redisClient.run(jedis -> jedis.sadd(casKey1, "worker1")); + jedisCasWorkerMap.setExpire(redisClient, Arrays.asList(testDigest1, testDigest2)); + + assertThat((Long) redisClient.call(jedis -> jedis.ttl(casKey1))).isGreaterThan(0L); + assertThat((Long) redisClient.call(jedis -> jedis.ttl(casKey2))).isEqualTo(-2L); + } + + @After + public void tearDown() throws IOException { + redisServer.stop(); + } +} diff --git a/src/test/java/build/buildfarm/instance/shard/ShardInstanceTest.java b/src/test/java/build/buildfarm/instance/shard/ServerInstanceTest.java similarity index 97% rename from src/test/java/build/buildfarm/instance/shard/ShardInstanceTest.java rename to src/test/java/build/buildfarm/instance/shard/ServerInstanceTest.java index aa1adc5d3d..d66a600dab 100644 --- a/src/test/java/build/buildfarm/instance/shard/ShardInstanceTest.java +++ b/src/test/java/build/buildfarm/instance/shard/ServerInstanceTest.java @@ -20,9 +20,9 @@ import static build.buildfarm.common.Actions.invalidActionVerboseMessage; import static build.buildfarm.common.Errors.VIOLATION_TYPE_INVALID; import static build.buildfarm.common.Errors.VIOLATION_TYPE_MISSING; -import static build.buildfarm.instance.server.AbstractServerInstance.INVALID_PLATFORM; -import static build.buildfarm.instance.server.AbstractServerInstance.MISSING_ACTION; -import static build.buildfarm.instance.server.AbstractServerInstance.MISSING_COMMAND; +import static build.buildfarm.instance.server.NodeInstance.INVALID_PLATFORM; +import static build.buildfarm.instance.server.NodeInstance.MISSING_ACTION; +import static build.buildfarm.instance.server.NodeInstance.MISSING_COMMAND; import static com.google.common.base.Predicates.notNull; import static com.google.common.truth.Truth.assertThat; import static com.google.common.util.concurrent.Futures.immediateFuture; @@ -31,9 +31,13 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.TimeUnit.SECONDS; import static org.mockito.AdditionalAnswers.answer; +import static org.mockito.ArgumentMatchers.anyIterable; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -80,6 +84,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.longrunning.Operation; import com.google.protobuf.Any; @@ -95,6 +100,7 @@ import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -121,14 +127,14 @@ import org.mockito.stubbing.Answer; @RunWith(JUnit4.class) -public class ShardInstanceTest { +public class ServerInstanceTest { private static final DigestUtil DIGEST_UTIL = new DigestUtil(HashFunction.SHA256); private static final long QUEUE_TEST_TIMEOUT_SECONDS = 3; private static final Duration DEFAULT_TIMEOUT = Durations.fromSeconds(60); private static final Command SIMPLE_COMMAND = Command.newBuilder().addAllArguments(ImmutableList.of("true")).build(); - private ShardInstance instance; + private ServerInstance instance; private Map blobDigests; @Mock private Backplane mockBackplane; @@ -145,7 +151,7 @@ public void setUp() throws InterruptedException { blobDigests = Maps.newHashMap(); ActionCache actionCache = new ShardActionCache(10, mockBackplane, newDirectExecutorService()); instance = - new ShardInstance( + new ServerInstance( "shard", DIGEST_UTIL, mockBackplane, @@ -1110,8 +1116,12 @@ public void findMissingBlobsTest_ViaBackPlane() throws Exception { buildfarmConfigs.getServer().setFindMissingBlobsViaBackplane(true); Set activeAndImposterWorkers = Sets.newHashSet(Iterables.concat(activeWorkers, imposterWorkers)); + when(mockBackplane.getStorageWorkers()).thenReturn(activeAndImposterWorkers); when(mockBackplane.getBlobDigestsWorkers(any(Iterable.class))).thenReturn(digestAndWorkersMap); + when(mockInstanceLoader.load(anyString())).thenReturn(mockWorkerInstance); + when(mockWorkerInstance.findMissingBlobs(anyIterable(), any(RequestMetadata.class))) + .thenReturn(Futures.immediateFuture(new ArrayList<>())); long serverStartTime = 1686951033L; // june 15th, 2023 Map workersStartTime = new HashMap<>(); @@ -1134,6 +1144,10 @@ public void findMissingBlobsTest_ViaBackPlane() throws Exception { Iterables.concat(missingDigests, digestAvailableOnImposters); assertThat(actualMissingDigests).containsExactlyElementsIn(expectedMissingDigests); + verify(mockWorkerInstance, atMost(3)) + .findMissingBlobs(anyIterable(), any(RequestMetadata.class)); + verify(mockWorkerInstance, atLeast(1)) + .findMissingBlobs(anyIterable(), any(RequestMetadata.class)); for (Digest digest : actualMissingDigests) { assertThat(digest).isNotIn(availableDigests); diff --git a/src/test/java/build/buildfarm/worker/shard/ShardWorkerInstanceTest.java b/src/test/java/build/buildfarm/worker/shard/WorkerInstanceTest.java similarity index 97% rename from src/test/java/build/buildfarm/worker/shard/ShardWorkerInstanceTest.java rename to src/test/java/build/buildfarm/worker/shard/WorkerInstanceTest.java index 7a4e40be26..3df73187bb 100644 --- a/src/test/java/build/buildfarm/worker/shard/ShardWorkerInstanceTest.java +++ b/src/test/java/build/buildfarm/worker/shard/WorkerInstanceTest.java @@ -50,19 +50,19 @@ import org.mockito.MockitoAnnotations; @RunWith(JUnit4.class) -public class ShardWorkerInstanceTest { +public class WorkerInstanceTest { private final DigestUtil DIGEST_UTIL = new DigestUtil(HashFunction.SHA256); @Mock private Backplane backplane; @Mock private ContentAddressableStorage storage; - private ShardWorkerInstance instance; + private WorkerInstance instance; @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); - instance = new ShardWorkerInstance("test", DIGEST_UTIL, backplane, storage); + instance = new WorkerInstance("test", DIGEST_UTIL, backplane, storage); } @SuppressWarnings("unchecked") diff --git a/third_party/rules_oss_audit_pyyaml.patch b/third_party/rules_oss_audit_pyyaml.patch new file mode 100644 index 0000000000..d2297f018c --- /dev/null +++ b/third_party/rules_oss_audit_pyyaml.patch @@ -0,0 +1,7 @@ +diff --git a/oss_audit/tools/requirements.txt b/oss_audit/tools/requirements.txt +index 932bd69..be2b74d 100644 +--- a/oss_audit/tools/requirements.txt ++++ b/oss_audit/tools/requirements.txt +@@ -1 +1 @@ +-PyYAML==5.4.1 ++PyYAML==6.0.1