Skip to content

Commit

Permalink
Add ability to use env variables in config files
Browse files Browse the repository at this point in the history
This will make it much easier to use the same config with slightly
different configs.

Also adds a few useful debug tips on errors.
  • Loading branch information
allada committed Jun 17, 2022
1 parent 117e173 commit d54b38e
Show file tree
Hide file tree
Showing 16 changed files with 149 additions and 95 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -35,6 +35,7 @@ fast-async-mutex = "0.6.7"
lz4_flex = "0.9.0"
bincode = "1.3.3"
bytes = "1.1.0"
shellexpand = "2.1.0"
byteorder = "1.4.3"
lazy_static = "1.4.0"
filetime = "0.2.15"
Expand Down
6 changes: 5 additions & 1 deletion cas/cas_main.rs
Expand Up @@ -57,7 +57,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.format_timestamp_millis()
.init();

let json_contents = String::from_utf8(tokio::fs::read(config_file).await?)?;
let json_contents = String::from_utf8(
tokio::fs::read(&config_file)
.await
.err_tip(|| format!("Could not open config file {}", config_file))?,
)?;
let cfg: CasConfig = json5::from_str(&json_contents)?;

// Note: If the default changes make sure you update the documentation in
Expand Down
2 changes: 2 additions & 0 deletions cas/store/BUILD
Expand Up @@ -223,6 +223,7 @@ rust_library(
"//third_party:futures",
"//third_party:nix",
"//third_party:rand",
"//third_party:shellexpand",
"//third_party:tokio",
"//third_party:tokio_stream",
"//util:buf_channel",
Expand All @@ -243,6 +244,7 @@ rust_library(
"//proto",
"//third_party:fast_async_mutex",
"//third_party:futures",
"//third_party:shellexpand",
"//third_party:tokio",
"//third_party:tonic",
"//third_party:uuid",
Expand Down
27 changes: 19 additions & 8 deletions cas/store/filesystem_store.rs
Expand Up @@ -13,6 +13,7 @@ use filetime::{set_file_atime, FileTime};
use futures::stream::{StreamExt, TryStreamExt};
use nix::fcntl::{renameat2, RenameFlags};
use rand::{thread_rng, Rng};
use shellexpand;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom, Take};
use tokio::task::spawn_blocking;
use tokio::time::sleep;
Expand Down Expand Up @@ -291,15 +292,25 @@ impl FilesystemStore {
let eviction_policy = config.eviction_policy.as_ref().unwrap_or(&empty_policy);
let evicting_map = EvictingMap::new(eviction_policy, now);

fs::create_dir_all(&config.temp_path)
let temp_path = shellexpand::full(&config.temp_path)
.map_err(|e| make_input_err!("{}", e))
.err_tip(|| "Could expand temp_path in FilesystemStore")?
.to_string();

let content_path = shellexpand::full(&config.content_path)
.map_err(|e| make_input_err!("{}", e))
.err_tip(|| "Could expand content_path in FilesystemStore")?
.to_string();

fs::create_dir_all(&temp_path)
.await
.err_tip(|| format!("Failed to temp directory {:?}", &config.temp_path))?;
fs::create_dir_all(&config.content_path)
.err_tip(|| format!("Failed to temp directory {:?}", &temp_path))?;
fs::create_dir_all(&content_path)
.await
.err_tip(|| format!("Failed to content directory {:?}", &config.content_path))?;
.err_tip(|| format!("Failed to content directory {:?}", &content_path))?;

let temp_path = Arc::new(config.temp_path.clone());
let content_path = Arc::new(config.content_path.clone());
let temp_path = Arc::new(temp_path);
let content_path = Arc::new(content_path);
add_files_to_cache(&evicting_map, &now, &temp_path, &content_path).await?;
prune_temp_path(&temp_path.as_ref()).await?;

Expand All @@ -309,8 +320,8 @@ impl FilesystemStore {
config.read_buffer_size as usize
};
let store = Self {
temp_path: Arc::new(config.temp_path.clone()),
content_path: Arc::new(config.content_path.clone()),
temp_path,
content_path,
evicting_map,
read_buffer_size,
file_evicted_callback: None,
Expand Down
8 changes: 7 additions & 1 deletion cas/store/grpc_store.rs
Expand Up @@ -6,13 +6,14 @@ use std::sync::Arc;
use async_trait::async_trait;
use fast_async_mutex::mutex::Mutex;
use futures::{stream::unfold, Stream};
use shellexpand;
use tonic::{transport, IntoRequest, Request, Response, Streaming};
use uuid::Uuid;

use buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use common::{log, DigestInfo};
use config;
use error::{error_if, Error, ResultExt};
use error::{error_if, make_input_err, Error, ResultExt};
use proto::build::bazel::remote::execution::v2::{
action_cache_client::ActionCacheClient, content_addressable_storage_client::ContentAddressableStorageClient,
ActionResult, BatchReadBlobsRequest, BatchReadBlobsResponse, BatchUpdateBlobsRequest, BatchUpdateBlobsResponse,
Expand Down Expand Up @@ -41,6 +42,11 @@ impl GrpcStore {
error_if!(config.endpoints.len() == 0, "Expected at least 1 endpoint in GrpcStore");
let mut endpoints = Vec::with_capacity(config.endpoints.len());
for endpoint in &config.endpoints {
let endpoint = shellexpand::env(&endpoint)
.map_err(|e| make_input_err!("{}", e))
.err_tip(|| "Could expand endpoint in GrpcStore")?
.to_string();

endpoints.push(
transport::Endpoint::new(endpoint.clone())
.err_tip(|| format!("Could not connect to {} in GrpcStore", endpoint))?,
Expand Down
1 change: 1 addition & 0 deletions cas/worker/BUILD
Expand Up @@ -12,6 +12,7 @@ rust_library(
"//config",
"//proto",
"//third_party:futures",
"//third_party:shellexpand",
"//third_party:tokio",
"//third_party:tokio_stream",
"//third_party:tonic",
Expand Down
26 changes: 17 additions & 9 deletions cas/worker/local_worker.rs
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use std::time::Duration;

use futures::{future::BoxFuture, select, stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt};
use shellexpand;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tokio_stream::wrappers::UnboundedReceiverStream;
Expand Down Expand Up @@ -213,15 +214,16 @@ pub async fn new_local_worker(
.err_tip(|| "Expected store for LocalWorker's store to be a FastSlowStore")?
.clone();

fs::create_dir_all(&config.work_directory)
let work_directory = shellexpand::full(&config.work_directory)
.map_err(|e| make_input_err!("{}", e))
.err_tip(|| "Could expand work_directory in LocalWorker")?
.to_string();

fs::create_dir_all(&work_directory)
.await
.err_tip(|| format!("Could not make work_directory : {}", config.work_directory))?;
.err_tip(|| format!("Could not make work_directory : {}", work_directory))?;

let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(
config.work_directory.clone(),
fast_slow_store,
)?)
.clone();
let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(work_directory, fast_slow_store)?).clone();
Ok(LocalWorker::new_with_connection_factory_and_actions_manager(
config.clone(),
running_actions_manager,
Expand All @@ -230,7 +232,13 @@ pub async fn new_local_worker(
Box::pin(async move {
let timeout = config.worker_api_endpoint.timeout.unwrap_or(DEFAULT_ENDPOINT_TIMEOUT_S);
let timeout_duration = Duration::from_secs_f32(timeout);
let uri = (&config.worker_api_endpoint.uri)

let uri_string = shellexpand::env(&config.worker_api_endpoint.uri)
.map_err(|e| make_input_err!("{}", e))
.err_tip(|| "Could expand worker_api_endpoint.uri in LocalWorker")?
.to_string();
let uri = uri_string
.clone()
.try_into()
.map_err(|e| make_input_err!("Invalid URI for worker endpoint : {:?}", e))?;
let endpoint = TonicChannel::builder(uri)
Expand All @@ -239,7 +247,7 @@ pub async fn new_local_worker(
let transport = endpoint
.connect()
.await
.map_err(|e| make_err!(Code::Internal, "Could not connect to endpoint : {:?}", e))?;
.map_err(|e| make_err!(Code::Internal, "Could not connect to endpoint {}: {:?}", uri_string, e))?;
Ok(WorkerApiClient::new(transport).into())
})
}),
Expand Down
3 changes: 3 additions & 0 deletions deployment-examples/docker-compose/docker-compose.yml
Expand Up @@ -27,6 +27,7 @@ services:
target: /root
environment:
RUST_LOG: ${RUST_LOG:-}
CAS_ENDPOINT: turbo_cache_local_cas
ports: [ "50052:50052/tcp" ]
command: |
cas /root/scheduler.json
Expand All @@ -40,5 +41,7 @@ services:
target: /root
environment:
RUST_LOG: ${RUST_LOG:-}
CAS_ENDPOINT: turbo_cache_local_cas
SCHEDULER_ENDPOINT: turbo_cache_scheduler
command: |
cas /root/worker.json
10 changes: 5 additions & 5 deletions deployment-examples/docker-compose/local-storage-cas.json
@@ -1,5 +1,5 @@
// This configuration will place objects in various folders in
// `/root/.cache/turbo-cache`. It will store all data on disk and
// `~/.cache/turbo-cache`. It will store all data on disk and
// allows for restarts of the underlying service. It is optimized
// so objects are compressed, deduplicated and uses some in-memory
// optimizations for certain hot paths.
Expand All @@ -14,8 +14,8 @@
},
"backend": {
"filesystem": {
"content_path": "/root/.cache/turbo-cache/content_path-cas",
"temp_path": "/root/.cache/turbo-cache/tmp_path-cas",
"content_path": "~/.cache/turbo-cache/content_path-cas",
"temp_path": "~/.cache/turbo-cache/tmp_path-cas",
"eviction_policy": {
// 10gb.
"max_bytes": 10000000000,
Expand All @@ -30,8 +30,8 @@
},
"AC_MAIN_STORE": {
"filesystem": {
"content_path": "/root/.cache/turbo-cache/content_path-ac",
"temp_path": "/root/.cache/turbo-cache/tmp_path-ac",
"content_path": "~/.cache/turbo-cache/content_path-ac",
"temp_path": "~/.cache/turbo-cache/tmp_path-ac",
"eviction_policy": {
// 500mb.
"max_bytes": 500000000,
Expand Down
65 changes: 0 additions & 65 deletions deployment-examples/docker-compose/memory-cas.json

This file was deleted.

2 changes: 1 addition & 1 deletion deployment-examples/docker-compose/scheduler.json
Expand Up @@ -4,7 +4,7 @@
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://turbo_cache_local_cas:50051"]
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"]
}
}
},
Expand Down
10 changes: 5 additions & 5 deletions deployment-examples/docker-compose/worker.json
Expand Up @@ -4,15 +4,15 @@
// Note: This file is used to test GRPC store.
"grpc": {
"instance_name": "main",
"endpoints": ["grpc://turbo_cache_local_cas:50051"]
"endpoints": ["grpc://${CAS_ENDPOINT:-127.0.0.1}:50051"]
}
},
"WORKER_FAST_SLOW_STORE": {
"fast_slow": {
"fast": {
"filesystem": {
"content_path": "/root/.cache/turbo-cache/data-worker-test/content_path-cas",
"temp_path": "/root/.cache/turbo-cache/data-worker-test/tmp_path-cas",
"content_path": "~/.cache/turbo-cache/data-worker-test/content_path-cas",
"temp_path": "~/.cache/turbo-cache/data-worker-test/tmp_path-cas",
"eviction_policy": {
// 10gb.
"max_bytes": 10000000000,
Expand All @@ -30,12 +30,12 @@
"workers": [{
"local": {
"worker_api_endpoint": {
"uri": "grpc://turbo_cache_scheduler:50061",
"uri": "grpc://${SCHEDULER_ENDPOINT:-127.0.0.1}:50061",
},
"entrypoint_cmd": "$@",
"cas_fast_slow_store": "WORKER_FAST_SLOW_STORE",
"ac_store": "GRPC_LOCAL_STORE",
"work_directory": "/root/.cache/turbo-cache/work",
"work_directory": "~/.cache/turbo-cache/work",
"platform_properties": {
"cpu_count": {
"values": ["1"],
Expand Down
9 changes: 9 additions & 0 deletions third_party/BUILD.bazel
Expand Up @@ -354,6 +354,15 @@ alias(
],
)

alias(
name = "shellexpand",
actual = "@raze__shellexpand__2_1_0//:shellexpand",
tags = [
"cargo-raze",
"manual",
],
)

alias(
name = "shlex",
actual = "@raze__shlex__1_1_0//:shlex",
Expand Down
10 changes: 10 additions & 0 deletions third_party/crates.bzl
Expand Up @@ -1945,6 +1945,16 @@ def raze_fetch_remote_crates():
build_file = Label("//third_party/remote:BUILD.sha2-0.9.9.bazel"),
)

maybe(
http_archive,
name = "raze__shellexpand__2_1_0",
url = "https://crates.io/api/v1/crates/shellexpand/2.1.0/download",
type = "tar.gz",
sha256 = "83bdb7831b2d85ddf4a7b148aa19d0587eddbe8671a436b7bd1182eaad0f2829",
strip_prefix = "shellexpand-2.1.0",
build_file = Label("//third_party/remote:BUILD.shellexpand-2.1.0.bazel"),
)

maybe(
http_archive,
name = "raze__shlex__0_1_1",
Expand Down

0 comments on commit d54b38e

Please sign in to comment.