From f4fcff29ea6274356d501b3791940ce5c1346ec7 Mon Sep 17 00:00:00 2001 From: allada Date: Tue, 4 Jul 2023 12:29:01 -0500 Subject: [PATCH] Gzip compression is now disabled by default and configurable Gzip is quite an expensive compression algorithm and many configurations are server-to-server or partially localhost setups. Because of this, it is better to have gzip disabled by default and enable it only for use cases that specifically request it to be enabled through the config files. These numbers are served from localhost compiling turbo-cache twice once to populate the cache and another to serve the cache. Between runs the cache was cleared completely and restarted. Gzip enabled ~48.151s Gzip disabled ~2.475s fixes #109 --- cas/cas_main.rs | 135 +++++++++++++++++++++++++++++++++---------- config/cas_server.rs | 44 ++++++++++++++ 2 files changed, 148 insertions(+), 31 deletions(-) diff --git a/cas/cas_main.rs b/cas/cas_main.rs index cbeeeb0f6..5f12682c8 100644 --- a/cas/cas_main.rs +++ b/cas/cas_main.rs @@ -27,7 +27,7 @@ use bytestream_server::ByteStreamServer; use capabilities_server::CapabilitiesServer; use cas_server::CasServer; use common::fs::set_open_file_limit; -use config::cas_server::{CasConfig, GlobalConfig, WorkerConfig}; +use config::cas_server::{CasConfig, CompressionAlgorithm, GlobalConfig, WorkerConfig}; use default_store_factory::store_factory; use error::{make_err, Code, Error, ResultExt}; use execution_server::ExecutionServer; @@ -130,6 +130,13 @@ async fn main() -> Result<(), Box> { futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v))); } + fn into_encoding(from: &CompressionAlgorithm) -> Option { + match from { + CompressionAlgorithm::Gzip => Some(CompressionEncoding::Gzip), + CompressionAlgorithm::None => None, + } + } + for server_cfg in cfg.servers { let server = Server::builder(); let services = server_cfg.services.ok_or_else(|| "'services' must be configured")?; @@ -143,11 +150,22 @@ async fn main() -> Result<(), Box> { .ac .map_or(Ok(None), |cfg| { AcServer::new(&cfg, &store_manager).and_then(|v| { - Ok(Some( - v.into_service() - .send_compressed(CompressionEncoding::Gzip) - .accept_compressed(CompressionEncoding::Gzip), - )) + let mut service = v.into_service(); + let send_algo = &server_cfg.compression.send_compression_algorithm; + if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) { + service = service.send_compressed(encoding); + } + for encoding in server_cfg + .compression + .accepted_compression_algorithms + .iter() + .map(into_encoding) + // Filter None values. + .filter_map(|v| v) + { + service = service.accept_compressed(encoding); + } + Ok(Some(service)) }) }) .err_tip(|| "Could not create AC service")?, @@ -157,11 +175,22 @@ async fn main() -> Result<(), Box> { .cas .map_or(Ok(None), |cfg| { CasServer::new(&cfg, &store_manager).and_then(|v| { - Ok(Some( - v.into_service() - .send_compressed(CompressionEncoding::Gzip) - .accept_compressed(CompressionEncoding::Gzip), - )) + let mut service = v.into_service(); + let send_algo = &server_cfg.compression.send_compression_algorithm; + if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) { + service = service.send_compressed(encoding); + } + for encoding in server_cfg + .compression + .accepted_compression_algorithms + .iter() + .map(into_encoding) + // Filter None values. + .filter_map(|v| v) + { + service = service.accept_compressed(encoding); + } + Ok(Some(service)) }) }) .err_tip(|| "Could not create CAS service")?, @@ -171,11 +200,22 @@ async fn main() -> Result<(), Box> { .execution .map_or(Ok(None), |cfg| { ExecutionServer::new(&cfg, &schedulers, &store_manager).and_then(|v| { - Ok(Some( - v.into_service() - .send_compressed(CompressionEncoding::Gzip) - .accept_compressed(CompressionEncoding::Gzip), - )) + let mut service = v.into_service(); + let send_algo = &server_cfg.compression.send_compression_algorithm; + if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) { + service = service.send_compressed(encoding); + } + for encoding in server_cfg + .compression + .accepted_compression_algorithms + .iter() + .map(into_encoding) + // Filter None values. + .filter_map(|v| v) + { + service = service.accept_compressed(encoding); + } + Ok(Some(service)) }) }) .err_tip(|| "Could not create Execution service")?, @@ -185,11 +225,22 @@ async fn main() -> Result<(), Box> { .bytestream .map_or(Ok(None), |cfg| { ByteStreamServer::new(&cfg, &store_manager).and_then(|v| { - Ok(Some( - v.into_service() - .send_compressed(CompressionEncoding::Gzip) - .accept_compressed(CompressionEncoding::Gzip), - )) + let mut service = v.into_service(); + let send_algo = &server_cfg.compression.send_compression_algorithm; + if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) { + service = service.send_compressed(encoding); + } + for encoding in server_cfg + .compression + .accepted_compression_algorithms + .iter() + .map(into_encoding) + // Filter None values. + .filter_map(|v| v) + { + service = service.accept_compressed(encoding); + } + Ok(Some(service)) }) }) .err_tip(|| "Could not create ByteStream service")?, @@ -199,11 +250,22 @@ async fn main() -> Result<(), Box> { .capabilities .map_or(Ok(None), |cfg| { CapabilitiesServer::new(&cfg, &schedulers).and_then(|v| { - Ok(Some( - v.into_service() - .send_compressed(CompressionEncoding::Gzip) - .accept_compressed(CompressionEncoding::Gzip), - )) + let mut service = v.into_service(); + let send_algo = &server_cfg.compression.send_compression_algorithm; + if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) { + service = service.send_compressed(encoding); + } + for encoding in server_cfg + .compression + .accepted_compression_algorithms + .iter() + .map(into_encoding) + // Filter None values. + .filter_map(|v| v) + { + service = service.accept_compressed(encoding); + } + Ok(Some(service)) }) }) .err_tip(|| "Could not create Capabilities service")?, @@ -213,11 +275,22 @@ async fn main() -> Result<(), Box> { .worker_api .map_or(Ok(None), |cfg| { WorkerApiServer::new(&cfg, &schedulers).and_then(|v| { - Ok(Some( - v.into_service() - .send_compressed(CompressionEncoding::Gzip) - .accept_compressed(CompressionEncoding::Gzip), - )) + let mut service = v.into_service(); + let send_algo = &server_cfg.compression.send_compression_algorithm; + if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) { + service = service.send_compressed(encoding); + } + for encoding in server_cfg + .compression + .accepted_compression_algorithms + .iter() + .map(into_encoding) + // Filter None values. + .filter_map(|v| v) + { + service = service.accept_compressed(encoding); + } + Ok(Some(service)) }) }) .err_tip(|| "Could not create WorkerApi service")?, diff --git a/config/cas_server.rs b/config/cas_server.rs index 3bd0a52cf..9f3b83273 100644 --- a/config/cas_server.rs +++ b/config/cas_server.rs @@ -30,6 +30,46 @@ pub type SchedulerRefName = String; /// Used when the config references `instance_name` in the protocol. pub type InstanceName = String; +#[derive(Deserialize, Debug, Default, Clone, Copy)] +pub enum CompressionAlgorithm { + /// No compression. + #[default] + None, + /// Zlib compression. + Gzip, +} + +/// Note: Compressing data in the cloud rarely has a benefit, since most +/// cloud providers have very high bandwidth backplanes. However, for +/// clients not inside the data center, it might be a good idea to +/// compress data to and from the cloud. This will however come at a high +/// CPU and performance cost. If you are making remote execution share the +/// same CAS/AC servers as client's remote cache, you can create multiple +/// services with different compression settings that are served on +/// different ports. Then configure the non-cloud clients to use one port +/// and cloud-clients to use another. +#[derive(Deserialize, Debug, Default)] +pub struct CompressionConfig { + /// The compression algorithm that the server will use when sending + /// responses to clients. Enabling this will likely save a lot of + /// data transfer, but will consume a lot of CPU and add a lot of + /// latency. + /// see: https://github.com/allada/turbo-cache/issues/109 + /// + /// Default: CompressionAlgorithm::None + pub send_compression_algorithm: Option, + + /// The compression algorithm that the server will accept from clients. + /// The server will broadcast the supported compression algorithms to + /// clients and the client will choose which compression algorithm to + /// use. Enabling this will likely save a lot of data transfer, but + /// will consume a lot of CPU and add a lot of latency. + /// see: https://github.com/allada/turbo-cache/issues/109 + /// + /// Defaults: + pub accepted_compression_algorithms: Vec, +} + #[derive(Deserialize, Debug)] pub struct AcStoreConfig { /// The store name referenced in the `stores` map in the main config. @@ -203,6 +243,10 @@ pub struct ServerConfig { #[serde(deserialize_with = "convert_string_with_shellexpand")] pub listen_address: String, + /// Data transport compression configuration to use for this service. + #[serde(default)] + pub compression: CompressionConfig, + /// Services to attach to server. pub services: Option, }