Skip to content

Commit

Permalink
rework worker cache handling
Browse files Browse the repository at this point in the history
bazelbuild#1: Include details such as the target architecture in the cache folder
hash. Previously only the path to rustc and the compilation mode was
used to calculate the hash, and this was likely the cause of ICEs.
We now include all of the env vars passed into the request in the hash,
so that separate folders are created for different crate names, crate
versions, and target architectures.

bazelbuild#2: A new worker was previously being created for every rustc invocation,
because the env parameter contained crate-specific details that caused
WorkerKey to change each time. Instead of passing the environment in
directly, persist it to a file, and pass it to process-wrapper with
--env-file instead. We also use the contents of this file in order
to generate the hash mentioned above.

This will mean you'll be limited to 4 workers by default; you may
want to also pass in --worker_max_instances=Rustc=HOST_CPUS*0.5 or
similar, as JIT startup is not a concern for the Rust workers, and
parts of rustc's compilation process are unable to take advantage of
more than 1 CPU core.

bazelbuild#3: Instead of storing incremental files in $TEMP, the worker
is now enabled by providing it with a folder that it should store its
incremental files in, such as:

bazel build --@rules_rust//worker:cache_root=/path/to/cache/files

This mimics the behaviour of --disk_cache and --repository_cache, and
makes it clear where the files are being persisted to.
  • Loading branch information
dae committed Mar 30, 2021
1 parent b32648e commit 4c7f36c
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 58 deletions.
52 changes: 46 additions & 6 deletions rust/private/rustc.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def construct_arguments(
maker_path = None,
aspect = False,
emit = ["dep-info", "link"],
use_worker = False):
worker_env_file = None):
"""Builds an Args object containing common rustc flags
Args:
Expand All @@ -361,7 +361,7 @@ def construct_arguments(
maker_path (File): An optional clippy marker file
aspect (bool): True if called in an aspect context.
emit (list): Values for the --emit flag to rustc.
use_worker (bool): If True, sets up the arguments in a worker-compatible fashion
worker_env_file (File): If provided, arguments will be set up for use with a worker.
Returns:
tuple: A tuple of the following items
Expand All @@ -376,11 +376,16 @@ def construct_arguments(

# Wrapper args first
args = ctx.actions.args()
if use_worker:
if worker_env_file:
# Write the args to a param file that will be used by Bazel to send messages to the worker.
args.set_param_file_format("multiline")
args.use_param_file("@%s", use_always = True)

# The worker needs its environment passed in as a file, which we'll write later. The worker
# expects these arguments to come first.
args.add("--env-file")
args.add(worker_env_file)

for build_env_file in build_env_files:
args.add("--env-file", build_env_file)

Expand Down Expand Up @@ -536,7 +541,15 @@ def rustc_compile_action(
- (DepInfo): The transitive dependencies of this crate.
- (DefaultInfo): The output file for this crate, and its runfiles.
"""

worker_binary = ctx.toolchains["@rules_rust//worker:toolchain_type"].worker_binary
if worker_binary:
worker_env_file = ctx.actions.declare_file(ctx.label.name + "_worker_env")
worker_cache_root = ctx.toolchains["@rules_rust//worker:toolchain_type"].cache_root
else:
worker_env_file = None
worker_cache_root = ""

cc_toolchain, feature_configuration = find_cc_toolchain(ctx)

dep_info, build_info = collect_deps(
Expand Down Expand Up @@ -573,7 +586,7 @@ def rustc_compile_action(
out_dir,
build_env_files,
build_flags_files,
use_worker = worker_binary != None,
worker_env_file = worker_env_file,
)

if hasattr(ctx.attr, "version") and ctx.attr.version != "0.0.0":
Expand All @@ -583,8 +596,35 @@ def rustc_compile_action(

if worker_binary != None:
executable = worker_binary
tools = [ctx.executable._process_wrapper]
arguments = [ctx.executable._process_wrapper.path, toolchain.rustc.path, ctx.var["COMPILATION_MODE"], args]
tools = [ctx.executable._process_wrapper, worker_env_file]

# Add compilation mode to the environment, so the worker includes it in the hash.
env["WORKER_COMPILATION_MODE"] = ctx.var["COMPILATION_MODE"]

# We need to pass the environment in to the worker via a file, as it changes each time.
ctx.actions.write(
output = worker_env_file,
content = "\n".join([k + "=" + v for (k, v) in env.items()]) + "\n",
)

# All the CARGO_* env vars will be passed in via the env file, and we don't want to provide them
# directly to the worker, as that will result in a new WorkerKey being generated each time,
# preventing a worker from being reused. But we can't just clear out env, because on macOS, Bazel
# looks for the presence of specific env vars to decide whether to inject extra required
# variables like DEVELOPER_DIR.
env = dict([
(k, v)
for (k, v) in env.items()
if k.startswith("APPLE_") or k.startswith("XCODE_")
])

arguments = [
ctx.executable._process_wrapper.path,
toolchain.rustc.path,
worker_cache_root,
# the final arg will be passed as a flagfile over the worker protocol
args,
]
execution_requirements = {"supports-workers": "1"}
else:
# Not all execution platforms support a worker.
Expand Down
15 changes: 7 additions & 8 deletions worker/BUILD
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
load("@bazel_skylib//:bzl_library.bzl", "bzl_library")
load("//worker:toolchain.bzl", "worker_toolchain")
load("//worker:toolchain.bzl", "string_flag", "worker_toolchain")
load(":bootstrap.bzl", "rust_cargo_binary")
load("@bazel_skylib//rules:common_settings.bzl", "bool_flag")

package(default_visibility = ["//visibility:public"])

Expand All @@ -14,13 +13,13 @@ bzl_library(
srcs = glob(["**/*.bzl"]),
)

bool_flag(
name = "use_worker",
build_setting_default = False,
)

toolchain_type(name = "toolchain_type")

string_flag(
name = "cache_root",
build_setting_default = "",
)

rust_cargo_binary(
name = "rustc-worker",
srcs = [
Expand All @@ -33,7 +32,7 @@ rust_cargo_binary(

worker_toolchain(
name = "worker_toolchain",
enabled = ":use_worker",
cache_root = ":cache_root",
worker_binary = ":rustc-worker",
)

Expand Down
80 changes: 51 additions & 29 deletions worker/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,67 @@ use protobuf::CodedInputStream;
use protobuf::CodedOutputStream;
use protobuf::Message;
use protobuf::ProtobufResult;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hash;
use std::hash::Hasher;
use std::io;
use std::io::BufRead;
use std::path::PathBuf;
use std::{collections::hash_map::DefaultHasher, fs, path::Path};

mod worker_protocol;
use worker_protocol::WorkRequest;
use worker_protocol::WorkResponse;

pub struct Worker {
program_path: PathBuf,
incremental_dir: std::path::PathBuf,
process_wrapper: PathBuf,
rustc: PathBuf,
cache_root: PathBuf,
}

impl Worker {
pub fn new<C: Into<String>>(
program_path: PathBuf,
rustc: PathBuf,
compilation_mode: C,
) -> io::Result<Self> {
// The incremental cache directory includes the rustc wrapper's hash to discriminate
// between multiple workspaces having the same name (usually __main__).
let mut cache_path = std::env::temp_dir();
pub fn new(process_wrapper: PathBuf, rustc: PathBuf, cache_root: PathBuf) -> Self {
fs::create_dir_all(&cache_root).expect("unable to create cache dir");

Worker {
process_wrapper,
rustc,
cache_root,
}
}

/// Get a worker dir unique to the provided rustc and environment.
fn get_worker_dir(&self, env_file_path: &str) -> PathBuf {
let env_file_text = fs::read_to_string(env_file_path).expect("unable to read env file");
let env_file_base = Path::new(env_file_path)
.file_name()
.unwrap()
.to_string_lossy();

let mut hasher = DefaultHasher::new();
rustc.hash(&mut hasher);

cache_path.push(format!(
"rustc-worker-{}-{}",
hasher.finish(),
compilation_mode.into()
));
std::fs::create_dir_all(&cache_path)?;
Ok(Worker {
program_path,
incremental_dir: cache_path,
})
self.rustc.hash(&mut hasher);
env_file_text.hash(&mut hasher);

let cache_path = self
.cache_root
.join(format!("{}-{}", env_file_base, hasher.finish()));
fs::create_dir_all(&cache_path).expect("unable to create cache dir");

cache_path
}

fn handle_request(&self, request: WorkRequest) -> ProtobufResult<WorkResponse> {
let mut incremental_arg = std::ffi::OsString::from("incremental=");
incremental_arg.push(&self.incremental_dir);
let mut cmd = std::process::Command::new(&self.program_path);
cmd.args(request.get_arguments());
let args = request.get_arguments();
let env_file_path = get_env_file_path(args);
let worker_dir = self.get_worker_dir(&env_file_path);

let mut cmd = std::process::Command::new(&self.process_wrapper);
cmd.args(args);

cmd.arg("--codegen");
let mut incremental_arg = std::ffi::OsString::from("incremental=");
incremental_arg.push(&worker_dir);
cmd.arg(incremental_arg);

let output = cmd.output()?;
Ok(WorkResponse {
request_id: request.request_id,
Expand Down Expand Up @@ -84,16 +98,24 @@ impl Worker {
&self,
response_file_path: P,
) -> io::Result<std::process::ExitStatus> {
let file = std::io::BufReader::new(std::fs::File::open(response_file_path)?);
let file = std::io::BufReader::new(fs::File::open(response_file_path)?);

let mut cmd = std::process::Command::new(&self.program_path);
let mut cmd = std::process::Command::new(&self.process_wrapper);
for line in file.lines() {
cmd.arg(line?);
}
cmd.status()
}
}

fn get_env_file_path(args: &[String]) -> &String {
let env_file = args.get(1).expect("missing env file");
if !env_file.ends_with("_worker_env") {
panic!("worker_env file not in expected place")
}
env_file
}

#[cfg(test)]
mod test {
#[test]
Expand Down
26 changes: 13 additions & 13 deletions worker/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{fs, path::PathBuf};

use protobuf::ProtobufResult;
mod lib;

Expand All @@ -6,15 +8,11 @@ fn main() -> ProtobufResult<()> {
// Always discard the executable name.
args.next().unwrap();

let program = std::fs::canonicalize(args.next().expect("program name"))?;
let rustc_path = std::fs::canonicalize(args.next().expect("rustc path"))?;
let compilation_mode = args
.next()
.expect("compilation mode")
.into_string()
.expect("compilation mode must be valid utf-8");
// TODO: program and rustc_path will combine when this is merged into rules_rust.
let worker = lib::Worker::new(program, rustc_path, compilation_mode)?;
let process_wrapper = fs::canonicalize(args.next().expect("process_wrapper name"))?;
let rustc_path = fs::canonicalize(args.next().expect("rustc path"))?;
let cache_root = PathBuf::from(args.next().expect("cache root"));

let worker = lib::Worker::new(process_wrapper, rustc_path, cache_root);

// If started as a persistent worker.
if let Some(arg) = args.peek() {
Expand All @@ -27,17 +25,19 @@ fn main() -> ProtobufResult<()> {
}
}

// Spawn process as normal.
// The process wrapper does not support response files.
// Otherwise, spawn process as normal.

// The process wrapper does not support response files
let response_file_arg = args
.next()
.unwrap()
.into_string()
.expect("response file path is valid utf-8");
assert!(response_file_arg.starts_with('@'));
let response_file_path = &response_file_arg[1..];

// The response file has to be the last (and only) argument left.
assert!(args.peek().is_none(), "iterator should be consumed!");
assert!(response_file_arg.starts_with("@"));
let response_file_path = &response_file_arg[1..];
let status = worker.once_with_response_file(response_file_path)?;
std::process::exit(status.code().unwrap());
}
19 changes: 17 additions & 2 deletions worker/toolchain.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,36 @@ Define a worker toolchain.

load("@bazel_skylib//rules:common_settings.bzl", "BuildSettingInfo")

def _string_flag_impl(ctx):
value = ctx.build_setting_value
return BuildSettingInfo(value = value)

string_flag = rule(
build_setting = config.string(flag = True),
implementation = _string_flag_impl,
)

def _worker_toolchain_impl(ctx):
if ctx.attr.enabled[BuildSettingInfo].value:
root = ctx.attr.cache_root[BuildSettingInfo].value

if root:
binary = ctx.executable.worker_binary
else:
binary = None

toolchain_info = platform_common.ToolchainInfo(
worker_binary = binary,
cache_root = root,
)
return [toolchain_info]

worker_toolchain = rule(
implementation = _worker_toolchain_impl,
attrs = {
"worker_binary": attr.label(allow_single_file = True, executable = True, cfg = "exec"),
"enabled": attr.label(),
"cache_root": attr.label(
doc = """Root folder that the workers should store cache files into.
If empty, worker is disabled.""",
),
},
)

0 comments on commit 4c7f36c

Please sign in to comment.