Skip to content

Commit

Permalink
Gzip compression is now disabled by default and configurable
Browse files Browse the repository at this point in the history
Gzip is quite an expensive compression algorithm and many
configurations are server-to-server or partially localhost
setups. Because of this, it is better to have gzip disabled by
default and enable it only for use cases that specifically request
it to be enabled through the config files.

These numbers are served from localhost compiling turbo-cache twice
once to populate the cache and another to serve the cache. Between
runs the cache was cleared completely and restarted.
Gzip enabled ~48.151s
Gzip disabled ~2.475s

fixes #109
  • Loading branch information
allada committed Jul 4, 2023
1 parent 9351f26 commit f4fcff2
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 31 deletions.
135 changes: 104 additions & 31 deletions cas/cas_main.rs
Expand Up @@ -27,7 +27,7 @@ use bytestream_server::ByteStreamServer;
use capabilities_server::CapabilitiesServer;
use cas_server::CasServer;
use common::fs::set_open_file_limit;
use config::cas_server::{CasConfig, GlobalConfig, WorkerConfig};
use config::cas_server::{CasConfig, CompressionAlgorithm, GlobalConfig, WorkerConfig};
use default_store_factory::store_factory;
use error::{make_err, Code, Error, ResultExt};
use execution_server::ExecutionServer;
Expand Down Expand Up @@ -130,6 +130,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v)));
}

fn into_encoding(from: &CompressionAlgorithm) -> Option<CompressionEncoding> {
match from {
CompressionAlgorithm::Gzip => Some(CompressionEncoding::Gzip),
CompressionAlgorithm::None => None,
}
}

for server_cfg in cfg.servers {
let server = Server::builder();
let services = server_cfg.services.ok_or_else(|| "'services' must be configured")?;
Expand All @@ -143,11 +150,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.ac
.map_or(Ok(None), |cfg| {
AcServer::new(&cfg, &store_manager).and_then(|v| {
Ok(Some(
v.into_service()
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip),
))
let mut service = v.into_service();
let send_algo = &server_cfg.compression.send_compression_algorithm;
if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) {
service = service.send_compressed(encoding);
}
for encoding in server_cfg
.compression
.accepted_compression_algorithms
.iter()
.map(into_encoding)
// Filter None values.
.filter_map(|v| v)
{
service = service.accept_compressed(encoding);
}
Ok(Some(service))
})
})
.err_tip(|| "Could not create AC service")?,
Expand All @@ -157,11 +175,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.cas
.map_or(Ok(None), |cfg| {
CasServer::new(&cfg, &store_manager).and_then(|v| {
Ok(Some(
v.into_service()
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip),
))
let mut service = v.into_service();
let send_algo = &server_cfg.compression.send_compression_algorithm;
if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) {
service = service.send_compressed(encoding);
}
for encoding in server_cfg
.compression
.accepted_compression_algorithms
.iter()
.map(into_encoding)
// Filter None values.
.filter_map(|v| v)
{
service = service.accept_compressed(encoding);
}
Ok(Some(service))
})
})
.err_tip(|| "Could not create CAS service")?,
Expand All @@ -171,11 +200,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.execution
.map_or(Ok(None), |cfg| {
ExecutionServer::new(&cfg, &schedulers, &store_manager).and_then(|v| {
Ok(Some(
v.into_service()
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip),
))
let mut service = v.into_service();
let send_algo = &server_cfg.compression.send_compression_algorithm;
if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) {
service = service.send_compressed(encoding);
}
for encoding in server_cfg
.compression
.accepted_compression_algorithms
.iter()
.map(into_encoding)
// Filter None values.
.filter_map(|v| v)
{
service = service.accept_compressed(encoding);
}
Ok(Some(service))
})
})
.err_tip(|| "Could not create Execution service")?,
Expand All @@ -185,11 +225,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.bytestream
.map_or(Ok(None), |cfg| {
ByteStreamServer::new(&cfg, &store_manager).and_then(|v| {
Ok(Some(
v.into_service()
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip),
))
let mut service = v.into_service();
let send_algo = &server_cfg.compression.send_compression_algorithm;
if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) {
service = service.send_compressed(encoding);
}
for encoding in server_cfg
.compression
.accepted_compression_algorithms
.iter()
.map(into_encoding)
// Filter None values.
.filter_map(|v| v)
{
service = service.accept_compressed(encoding);
}
Ok(Some(service))
})
})
.err_tip(|| "Could not create ByteStream service")?,
Expand All @@ -199,11 +250,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.capabilities
.map_or(Ok(None), |cfg| {
CapabilitiesServer::new(&cfg, &schedulers).and_then(|v| {
Ok(Some(
v.into_service()
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip),
))
let mut service = v.into_service();
let send_algo = &server_cfg.compression.send_compression_algorithm;
if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) {
service = service.send_compressed(encoding);
}
for encoding in server_cfg
.compression
.accepted_compression_algorithms
.iter()
.map(into_encoding)
// Filter None values.
.filter_map(|v| v)
{
service = service.accept_compressed(encoding);
}
Ok(Some(service))
})
})
.err_tip(|| "Could not create Capabilities service")?,
Expand All @@ -213,11 +275,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.worker_api
.map_or(Ok(None), |cfg| {
WorkerApiServer::new(&cfg, &schedulers).and_then(|v| {
Ok(Some(
v.into_service()
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip),
))
let mut service = v.into_service();
let send_algo = &server_cfg.compression.send_compression_algorithm;
if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) {
service = service.send_compressed(encoding);
}
for encoding in server_cfg
.compression
.accepted_compression_algorithms
.iter()
.map(into_encoding)
// Filter None values.
.filter_map(|v| v)
{
service = service.accept_compressed(encoding);
}
Ok(Some(service))
})
})
.err_tip(|| "Could not create WorkerApi service")?,
Expand Down
44 changes: 44 additions & 0 deletions config/cas_server.rs
Expand Up @@ -30,6 +30,46 @@ pub type SchedulerRefName = String;
/// Used when the config references `instance_name` in the protocol.
pub type InstanceName = String;

#[derive(Deserialize, Debug, Default, Clone, Copy)]
pub enum CompressionAlgorithm {
/// No compression.
#[default]
None,
/// Zlib compression.
Gzip,
}

/// Note: Compressing data in the cloud rarely has a benefit, since most
/// cloud providers have very high bandwidth backplanes. However, for
/// clients not inside the data center, it might be a good idea to
/// compress data to and from the cloud. This will however come at a high
/// CPU and performance cost. If you are making remote execution share the
/// same CAS/AC servers as client's remote cache, you can create multiple
/// services with different compression settings that are served on
/// different ports. Then configure the non-cloud clients to use one port
/// and cloud-clients to use another.
#[derive(Deserialize, Debug, Default)]
pub struct CompressionConfig {
/// The compression algorithm that the server will use when sending
/// responses to clients. Enabling this will likely save a lot of
/// data transfer, but will consume a lot of CPU and add a lot of
/// latency.
/// see: https://github.com/allada/turbo-cache/issues/109
///
/// Default: CompressionAlgorithm::None
pub send_compression_algorithm: Option<CompressionAlgorithm>,

/// The compression algorithm that the server will accept from clients.
/// The server will broadcast the supported compression algorithms to
/// clients and the client will choose which compression algorithm to
/// use. Enabling this will likely save a lot of data transfer, but
/// will consume a lot of CPU and add a lot of latency.
/// see: https://github.com/allada/turbo-cache/issues/109
///
/// Defaults: <no supported compression>
pub accepted_compression_algorithms: Vec<CompressionAlgorithm>,
}

#[derive(Deserialize, Debug)]
pub struct AcStoreConfig {
/// The store name referenced in the `stores` map in the main config.
Expand Down Expand Up @@ -203,6 +243,10 @@ pub struct ServerConfig {
#[serde(deserialize_with = "convert_string_with_shellexpand")]
pub listen_address: String,

/// Data transport compression configuration to use for this service.
#[serde(default)]
pub compression: CompressionConfig,

/// Services to attach to server.
pub services: Option<ServicesConfig>,
}
Expand Down

0 comments on commit f4fcff2

Please sign in to comment.