diff --git a/deployment-examples/docker-compose/local-storage-cas.json b/deployment-examples/docker-compose/local-storage-cas.json index 84e4ecc7f..05ab7179a 100644 --- a/deployment-examples/docker-compose/local-storage-cas.json +++ b/deployment-examples/docker-compose/local-storage-cas.json @@ -40,7 +40,11 @@ } }, "servers": [{ - "listen_address": "0.0.0.0:50051", + "listener": { + "http": { + "socket_address": "0.0.0.0:50051" + } + }, "services": { "cas": { "main": { @@ -63,7 +67,11 @@ } }, { // Only publish metrics on a private port. - "listen_address": "0.0.0.0:50061", + "listener": { + "http": { + "socket_address": "0.0.0.0:50061" + } + }, "services": { "prometheus": { "path": "/metrics" @@ -71,10 +79,14 @@ } }, { - "listen_address": "0.0.0.0:50071", - "tls": { - "cert_file": "/root/example-do-not-use-in-prod-rootca.crt", - "key_file": "/root/example-do-not-use-in-prod-key.pem" + "listener": { + "http": { + "socket_address": "0.0.0.0:50071", + "tls": { + "cert_file": "/root/example-do-not-use-in-prod-rootca.crt", + "key_file": "/root/example-do-not-use-in-prod-key.pem" + } + } }, "services": { "cas": { diff --git a/deployment-examples/docker-compose/scheduler.json b/deployment-examples/docker-compose/scheduler.json index b18c4dbdd..dabde30a0 100644 --- a/deployment-examples/docker-compose/scheduler.json +++ b/deployment-examples/docker-compose/scheduler.json @@ -27,7 +27,11 @@ } }, "servers": [{ - "listen_address": "0.0.0.0:50052", + "listener": { + "http": { + "socket_address": "0.0.0.0:50052", + } + }, "services": { "ac": { "main": { @@ -49,7 +53,11 @@ } } }, { - "listen_address": "0.0.0.0:50061", + "listener": { + "http": { + "socket_address": "0.0.0.0:50061", + } + }, "services": { // Note: This should be served on a different port, because it has // a different permission set than the other services. diff --git a/deployment-examples/terraform/AWS/scripts/cas.json b/deployment-examples/terraform/AWS/scripts/cas.json index 0d1741555..4e9e77ec4 100644 --- a/deployment-examples/terraform/AWS/scripts/cas.json +++ b/deployment-examples/terraform/AWS/scripts/cas.json @@ -110,7 +110,7 @@ "verify_size": true, "verify_hash": true } - }, + } }, "schedulers": { "MAIN_SCHEDULER": { @@ -122,7 +122,11 @@ } }, "servers": [{ - "listen_address": "0.0.0.0:50051", + "listener": { + "http": { + "socket_address": "0.0.0.0:50051", + } + }, "services": { "cas": { "main": { diff --git a/deployment-examples/terraform/AWS/scripts/scheduler.json b/deployment-examples/terraform/AWS/scripts/scheduler.json index 4cb108aa3..04a0af1e3 100644 --- a/deployment-examples/terraform/AWS/scripts/scheduler.json +++ b/deployment-examples/terraform/AWS/scripts/scheduler.json @@ -119,7 +119,11 @@ } }, "servers": [{ - "listen_address": "0.0.0.0:50052", + "listener": { + "http": { + "socket_address": "0.0.0.0:50052", + } + }, "services": { "ac": { "main": { @@ -141,7 +145,11 @@ } } }, { - "listen_address": "0.0.0.0:50061", + "listener": { + "http": { + "socket_address": "0.0.0.0:50061", + } + }, "services": { "prometheus": { "path": "/metrics" diff --git a/deployment-examples/terraform/GCP/module/scripts/browser_proxy.json b/deployment-examples/terraform/GCP/module/scripts/browser_proxy.json index 3b0f88c4a..5c7d45014 100644 --- a/deployment-examples/terraform/GCP/module/scripts/browser_proxy.json +++ b/deployment-examples/terraform/GCP/module/scripts/browser_proxy.json @@ -40,7 +40,11 @@ "servers": [{ // Non-public apis. We re-export the CAS services so we don't need to go through // an external load balancer. - "listen_address": "0.0.0.0:50052", + "listener": { + "http": { + "socket_address": "0.0.0.0:50052", + } + }, "services": { "prometheus": { "path": "/metrics" diff --git a/deployment-examples/terraform/GCP/module/scripts/cas.json b/deployment-examples/terraform/GCP/module/scripts/cas.json index 15fea37f7..5abde8a57 100644 --- a/deployment-examples/terraform/GCP/module/scripts/cas.json +++ b/deployment-examples/terraform/GCP/module/scripts/cas.json @@ -77,18 +77,22 @@ } }, "servers": [{ - "listen_address": "0.0.0.0:50051", - "tls": { - "cert_file": "${NATIVELINK_CERT_FILE:-}", - "key_file": "${NATIVELINK_KEY_FILE:-}" - }, - "advanced_http": { - "http2_keep_alive_interval": 10 - }, - // External apis support compression. - "compression": { - "send_compression_algorithm": "gzip", - "accepted_compression_algorithms": ["gzip"] + "listener": { + "http": { + "socket_address": "0.0.0.0:50051", + "tls": { + "cert_file": "${NATIVELINK_CERT_FILE:-}", + "key_file": "${NATIVELINK_KEY_FILE:-}" + }, + "advanced_http": { + "http2_keep_alive_interval": 10 + }, + // External apis support compression. + "compression": { + "send_compression_algorithm": "gzip", + "accepted_compression_algorithms": ["gzip"] + } + } }, "services": { "cas": { @@ -115,9 +119,13 @@ }, { // Non-public apis. We re-export the CAS services so we don't need to go through // an external load balancer. - "listen_address": "0.0.0.0:50052", - "advanced_http": { - "http2_keep_alive_interval": 10 + "listener": { + "http": { + "socket_address": "0.0.0.0:50052", + "advanced_http": { + "http2_keep_alive_interval": 10 + } + } }, "services": { "prometheus": { diff --git a/deployment-examples/terraform/GCP/module/scripts/scheduler.json b/deployment-examples/terraform/GCP/module/scripts/scheduler.json index cc2c8b4f4..c09f7026c 100644 --- a/deployment-examples/terraform/GCP/module/scripts/scheduler.json +++ b/deployment-examples/terraform/GCP/module/scripts/scheduler.json @@ -86,13 +86,17 @@ } }, "servers": [{ - "listen_address": "0.0.0.0:50051", - "tls": { - "cert_file": "${nativelink_CERT_FILE:-}", - "key_file": "${nativelink_KEY_FILE:-}" - }, - "advanced_http": { - "http2_keep_alive_interval": 10 + "listener": { + "http": { + "socket_address": "0.0.0.0:50051", + "tls": { + "cert_file": "${nativelink_CERT_FILE:-}", + "key_file": "${nativelink_KEY_FILE:-}" + }, + "advanced_http": { + "http2_keep_alive_interval": 10 + } + } }, "services": { "ac": { @@ -117,9 +121,13 @@ }, { // Non-public apis. We re-export the Scheduler services on a non-tls connection // for local services that don't need a load balancer. - "listen_address": "0.0.0.0:50052", - "advanced_http": { - "http2_keep_alive_interval": 10 + "listener": { + "http": { + "socket_address": "0.0.0.0:50052", + "advanced_http": { + "http2_keep_alive_interval": 10 + } + } }, "services": { "prometheus": { @@ -146,9 +154,13 @@ } }, { // Internal Worker endpoint. - "listen_address": "0.0.0.0:50061", - "advanced_http": { - "http2_keep_alive_interval": 10 + "listener": { + "http": { + "socket_address": "0.0.0.0:50061", + "advanced_http": { + "http2_keep_alive_interval": 10 + } + } }, "services": { // Note: This should be served on a different port, because it has diff --git a/deployment-examples/terraform/GCP/module/scripts/worker.json b/deployment-examples/terraform/GCP/module/scripts/worker.json index a0ed665c6..028135b1e 100644 --- a/deployment-examples/terraform/GCP/module/scripts/worker.json +++ b/deployment-examples/terraform/GCP/module/scripts/worker.json @@ -84,16 +84,24 @@ } }], "servers": [{ - "listen_address": "0.0.0.0:50051", - "advanced_http": { - "http2_keep_alive_interval": 10 + "listener": { + "http": { + "socket_address": "0.0.0.0:50051", + "advanced_http": { + "http2_keep_alive_interval": 10 + } + } }, // Only /status will be served. "services": {} }, { - "listen_address": "0.0.0.0:50052", - "advanced_http": { - "http2_keep_alive_interval": 10 + "listener": { + "http": { + "socket_address": "0.0.0.0:50052", + "advanced_http": { + "http2_keep_alive_interval": 10 + } + } }, "services": { "prometheus": { diff --git a/nativelink-config/README.md b/nativelink-config/README.md index cd13c3155..f95cbf587 100644 --- a/nativelink-config/README.md +++ b/nativelink-config/README.md @@ -35,7 +35,14 @@ A very basic configuration that is a pure in-memory store is: } }, "servers": [{ - "listen_address": "0.0.0.0:50051", + "listener": { + "http": { + "socket_address": "0.0.0.0:50051", + "advanced_http": { + "http2_keep_alive_interval": 10 + } + } + }, "services": { "cas": { "main": { diff --git a/nativelink-config/examples/basic_cas.json b/nativelink-config/examples/basic_cas.json index 17a69e5d9..ad407f813 100644 --- a/nativelink-config/examples/basic_cas.json +++ b/nativelink-config/examples/basic_cas.json @@ -91,7 +91,11 @@ }], "servers": [{ "name": "public", - "listen_address": "0.0.0.0:50051", + "listener": { + "http": { + "socket_address": "0.0.0.0:50051" + } + }, "services": { "cas": { "main": { @@ -126,7 +130,11 @@ } }, { "name": "private_workers_servers", - "listen_address": "0.0.0.0:50061", + "listener": { + "http": { + "socket_address": "0.0.0.0:50061" + } + }, "services": { "prometheus": { "path": "/metrics" diff --git a/nativelink-config/examples/filesystem_cas.json b/nativelink-config/examples/filesystem_cas.json index 0ba916c1e..07c4b987a 100644 --- a/nativelink-config/examples/filesystem_cas.json +++ b/nativelink-config/examples/filesystem_cas.json @@ -114,7 +114,11 @@ } }, "servers": [{ - "listen_address": "0.0.0.0:50051", + "listener": { + "http": { + "socket_address": "0.0.0.0:50051" + } + }, "services": { "cas": { "main": { @@ -148,7 +152,11 @@ } } }, { - "listen_address": "0.0.0.0:50061", + "listener": { + "http": { + "socket_address": "0.0.0.0:50061" + } + }, "services": { // Note: This should be served on a different port, because it has // a different permission set than the other services. diff --git a/nativelink-config/examples/s3_backend_with_local_fast_cas.json b/nativelink-config/examples/s3_backend_with_local_fast_cas.json index 772c4d4ee..5695e1bbd 100644 --- a/nativelink-config/examples/s3_backend_with_local_fast_cas.json +++ b/nativelink-config/examples/s3_backend_with_local_fast_cas.json @@ -129,7 +129,11 @@ } }, "servers": [{ - "listen_address": "0.0.0.0:50051", + "listener": { + "http": { + "socket_address": "0.0.0.0:50051" + } + }, "services": { "cas": { "main": { diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs index 9d7f60598..8ebc9930b 100644 --- a/nativelink-config/src/cas_server.rs +++ b/nativelink-config/src/cas_server.rs @@ -35,6 +35,7 @@ pub enum CompressionAlgorithm { /// No compression. #[default] none, + /// Zlib compression. gzip, } @@ -226,45 +227,56 @@ pub struct TlsConfig { /// specified. #[derive(Deserialize, Debug, Default)] pub struct HttpServerConfig { + /// Interval to send keep-alive pings via HTTP2. + /// Note: This is in seconds. #[serde(default, deserialize_with = "convert_optinoal_numeric_with_shellexpand")] - pub http2_max_pending_accept_reset_streams: Option, + pub http2_keep_alive_interval: Option, + #[serde(default, deserialize_with = "convert_optinoal_numeric_with_shellexpand")] - pub http2_initial_stream_window_size: Option, + pub experimental_http2_max_pending_accept_reset_streams: Option, + #[serde(default, deserialize_with = "convert_optinoal_numeric_with_shellexpand")] - pub http2_initial_connection_window_size: Option, - #[serde(default)] - pub http2_adaptive_window: Option, + pub experimental_http2_initial_stream_window_size: Option, + #[serde(default, deserialize_with = "convert_optinoal_numeric_with_shellexpand")] - pub http2_max_frame_size: Option, + pub experimental_http2_initial_connection_window_size: Option, + + #[serde(default)] + pub experimental_http2_adaptive_window: Option, + #[serde(default, deserialize_with = "convert_optinoal_numeric_with_shellexpand")] - pub http2_max_concurrent_streams: Option, - /// Note: This is in seconds. + pub experimental_http2_max_frame_size: Option, + #[serde(default, deserialize_with = "convert_optinoal_numeric_with_shellexpand")] - pub http2_keep_alive_interval: Option, + pub experimental_http2_max_concurrent_streams: Option, + /// Note: This is in seconds. #[serde(default, deserialize_with = "convert_optinoal_numeric_with_shellexpand")] - pub http2_keep_alive_timeout: Option, + pub experimental_http2_keep_alive_timeout: Option, + #[serde(default, deserialize_with = "convert_optinoal_numeric_with_shellexpand")] - pub http2_max_send_buf_size: Option, + pub experimental_http2_max_send_buf_size: Option, + #[serde(default)] - pub http2_enable_connect_protocol: Option, + pub experimental_http2_enable_connect_protocol: Option, + #[serde(default, deserialize_with = "convert_optinoal_numeric_with_shellexpand")] - pub http2_max_header_list_size: Option, + pub experimental_http2_max_header_list_size: Option, } +#[allow(non_camel_case_types)] #[derive(Deserialize, Debug)] -pub struct ServerConfig { - /// Name of the server. This is used to help identify the service - /// for telemetry and logs. - /// - /// Default: {index of server in config} - #[serde(default, deserialize_with = "convert_string_with_shellexpand")] - pub name: String, +pub enum ListenerConfig { + /// Listener for HTTP/HTTPS/HTTP2 sockets. + http(HttpListener), +} +#[derive(Deserialize, Debug)] +pub struct HttpListener { /// Address to listen on. Example: `127.0.0.1:8080` or `:8080` to listen /// to all IPs. #[serde(deserialize_with = "convert_string_with_shellexpand")] - pub listen_address: String, + pub socket_address: String, /// Data transport compression configuration to use for this service. #[serde(default)] @@ -274,9 +286,6 @@ pub struct ServerConfig { #[serde(default)] pub advanced_http: HttpServerConfig, - /// Services to attach to server. - pub services: Option, - /// Tls Configuration for this server. /// If not set, the server will not use TLS. /// @@ -285,6 +294,22 @@ pub struct ServerConfig { pub tls: Option, } +#[derive(Deserialize, Debug)] +pub struct ServerConfig { + /// Name of the server. This is used to help identify the service + /// for telemetry and logs. + /// + /// Default: {index of server in config} + #[serde(default, deserialize_with = "convert_string_with_shellexpand")] + pub name: String, + + /// Configuration + pub listener: ListenerConfig, + + /// Services to attach to server. + pub services: Option, +} + #[allow(non_camel_case_types)] #[derive(Deserialize, Debug)] pub enum WorkerProperty { @@ -330,7 +355,7 @@ pub enum UploadCacheResultsStrategy { #[allow(non_camel_case_types)] #[derive(Clone, Deserialize, Debug)] pub enum EnvironmentSource { - /// The name of the property in the action to get the value from. + /// The name of the platform property in the action to get the value from. property(String), /// The raw value to set. @@ -517,7 +542,12 @@ pub enum WorkerConfig { #[allow(non_camel_case_types)] #[derive(Deserialize, Debug, Clone, Copy)] pub enum ConfigDigestHashFunction { + /// Use the sha256 hash function. + /// sha256, + + /// Use the blake3 hash function. + /// blake3, } @@ -567,7 +597,7 @@ pub struct GlobalConfig { /// Default hash function to use while uploading blobs to the CAS when not set /// by client. /// - /// Default: ConfigDigestHashFunction::Sha256 + /// Default: ConfigDigestHashFunction::sha256 pub default_digest_hash_function: Option, } diff --git a/src/bin/cas.rs b/src/bin/cas.rs index c6b5abc5c..f1509b37e 100644 --- a/src/bin/cas.rs +++ b/src/bin/cas.rs @@ -26,7 +26,7 @@ use futures::FutureExt; use hyper::server::conn::Http; use hyper::{Response, StatusCode}; use nativelink_config::cas_server::{ - CasConfig, CompressionAlgorithm, ConfigDigestHashFunction, GlobalConfig, ServerConfig, WorkerConfig, + CasConfig, CompressionAlgorithm, ConfigDigestHashFunction, GlobalConfig, ListenerConfig, ServerConfig, WorkerConfig, }; use nativelink_scheduler::default_scheduler_factory::scheduler_factory; use nativelink_scheduler::worker::WorkerId; @@ -189,6 +189,9 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B for (server_cfg, connected_clients_mux) in servers_and_clients { let services = server_cfg.services.ok_or("'services' must be configured")?; + // Currently we only support http as our socket type. + let ListenerConfig::http(http_config) = server_cfg.listener; + let tonic_services = TonicServer::builder() .add_optional_service( services @@ -196,11 +199,11 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B .map_or(Ok(None), |cfg| { AcServer::new(&cfg, &store_manager).map(|v| { let mut service = v.into_service(); - let send_algo = &server_cfg.compression.send_compression_algorithm; + let send_algo = &http_config.compression.send_compression_algorithm; if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::none)) { service = service.send_compressed(encoding); } - for encoding in server_cfg + for encoding in http_config .compression .accepted_compression_algorithms .iter() @@ -220,11 +223,11 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B .map_or(Ok(None), |cfg| { CasServer::new(&cfg, &store_manager).map(|v| { let mut service = v.into_service(); - let send_algo = &server_cfg.compression.send_compression_algorithm; + let send_algo = &http_config.compression.send_compression_algorithm; if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::none)) { service = service.send_compressed(encoding); } - for encoding in server_cfg + for encoding in http_config .compression .accepted_compression_algorithms .iter() @@ -244,11 +247,11 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B .map_or(Ok(None), |cfg| { ExecutionServer::new(&cfg, &action_schedulers, &store_manager).map(|v| { let mut service = v.into_service(); - let send_algo = &server_cfg.compression.send_compression_algorithm; + let send_algo = &http_config.compression.send_compression_algorithm; if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::none)) { service = service.send_compressed(encoding); } - for encoding in server_cfg + for encoding in http_config .compression .accepted_compression_algorithms .iter() @@ -268,11 +271,11 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B .map_or(Ok(None), |cfg| { ByteStreamServer::new(&cfg, &store_manager).map(|v| { let mut service = v.into_service(); - let send_algo = &server_cfg.compression.send_compression_algorithm; + let send_algo = &http_config.compression.send_compression_algorithm; if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::none)) { service = service.send_compressed(encoding); } - for encoding in server_cfg + for encoding in http_config .compression .accepted_compression_algorithms .iter() @@ -301,11 +304,11 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B .err_tip(|| "Could not create Capabilities service")? .map(|v| { let mut service = v.into_service(); - let send_algo = &server_cfg.compression.send_compression_algorithm; + let send_algo = &http_config.compression.send_compression_algorithm; if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::none)) { service = service.send_compressed(encoding); } - for encoding in server_cfg + for encoding in http_config .compression .accepted_compression_algorithms .iter() @@ -323,11 +326,11 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B .map_or(Ok(None), |cfg| { WorkerApiServer::new(&cfg, &worker_schedulers).map(|v| { let mut service = v.into_service(); - let send_algo = &server_cfg.compression.send_compression_algorithm; + let send_algo = &http_config.compression.send_compression_algorithm; if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::none)) { service = service.send_compressed(encoding); } - for encoding in server_cfg + for encoding in http_config .compression .accepted_compression_algorithms .iter() @@ -438,7 +441,7 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B } // Configure our TLS acceptor if we have TLS configured. - let maybe_tls_acceptor = server_cfg.tls.map_or(Ok(None), |tls_config| { + let maybe_tls_acceptor = http_config.tls.map_or(Ok(None), |tls_config| { let mut cert_reader = std::io::BufReader::new( std::fs::File::open(&tls_config.cert_file) .err_tip(|| format!("Could not open cert file {}", tls_config.cert_file))?, @@ -472,45 +475,47 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B Ok(Some(TlsAcceptor::from(Arc::new(config)))) })?; - let socket_addr = server_cfg.listen_address.parse::()?; + let socket_addr = http_config.socket_address.parse::()?; let tcp_listener = TcpListener::bind(&socket_addr).await?; let mut http = Http::new(); - let http_config = &server_cfg.advanced_http; - if let Some(value) = http_config.http2_max_pending_accept_reset_streams { + let http_config = &http_config.advanced_http; + if let Some(value) = http_config.http2_keep_alive_interval { + http.http2_keep_alive_interval(Duration::from_secs(u64::from(value))); + } + + if let Some(value) = http_config.experimental_http2_max_pending_accept_reset_streams { http.http2_max_pending_accept_reset_streams( - usize::try_from(value).err_tip(|| "Could not convert http2_max_pending_accept_reset_streams")?, + usize::try_from(value) + .err_tip(|| "Could not convert experimental_http2_max_pending_accept_reset_streams")?, ); } - if let Some(value) = http_config.http2_initial_stream_window_size { + if let Some(value) = http_config.experimental_http2_initial_stream_window_size { http.http2_initial_stream_window_size(value); } - if let Some(value) = http_config.http2_initial_connection_window_size { + if let Some(value) = http_config.experimental_http2_initial_connection_window_size { http.http2_initial_connection_window_size(value); } - if let Some(value) = http_config.http2_adaptive_window { + if let Some(value) = http_config.experimental_http2_adaptive_window { http.http2_adaptive_window(value); } - if let Some(value) = http_config.http2_max_frame_size { + if let Some(value) = http_config.experimental_http2_max_frame_size { http.http2_max_frame_size(value); } - if let Some(value) = http_config.http2_max_concurrent_streams { + if let Some(value) = http_config.experimental_http2_max_concurrent_streams { http.http2_max_concurrent_streams(value); } - if let Some(value) = http_config.http2_keep_alive_interval { - http.http2_keep_alive_interval(Duration::from_secs(u64::from(value))); - } - if let Some(value) = http_config.http2_keep_alive_timeout { + if let Some(value) = http_config.experimental_http2_keep_alive_timeout { http.http2_keep_alive_timeout(Duration::from_secs(u64::from(value))); } - if let Some(value) = http_config.http2_max_send_buf_size { + if let Some(value) = http_config.experimental_http2_max_send_buf_size { http.http2_max_send_buf_size( usize::try_from(value).err_tip(|| "Could not convert http2_max_send_buf_size")?, ); } - if let Some(true) = http_config.http2_enable_connect_protocol { + if let Some(true) = http_config.experimental_http2_enable_connect_protocol { http.http2_enable_connect_protocol(); } - if let Some(value) = http_config.http2_max_header_list_size { + if let Some(value) = http_config.experimental_http2_max_header_list_size { http.http2_max_header_list_size(value); }