Skip to content

Commit

Permalink
Enable Gzip compression support to GRPC
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Jun 28, 2022
1 parent 16a1a18 commit 438afbf
Show file tree
Hide file tree
Showing 68 changed files with 753 additions and 347 deletions.
3 changes: 3 additions & 0 deletions .bazelignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
.cache
.config
bazel-bin
bazel-out
bazel-remote-bin
bazel-remote-out
bazel-remote-testlogs
bazel-remote-turbo-cache
bazel-root
bazel-testlogs
bazel-turbo-cache
100 changes: 63 additions & 37 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ futures = "0.3.17"
tokio = { version = "1.13.0", features = ["macros", "io-util", "fs", "rt-multi-thread", "parking_lot"] }
tokio-stream = { version = "0.1.8", features = ["fs", "sync"] }
tokio-util = { version = "0.6.9", features = ["io", "io-util", "codec"] }
tonic = "0.7.1"
tonic = { version = "0.7.1", features = ["compression"] }
lazy-init = "0.5.0"
log = "0.4.14"
env_logger = "0.9.0"
Expand Down Expand Up @@ -48,7 +48,7 @@ relative-path = "1.7.0"
[dev-dependencies]
stdext = "0.2.1"
prost-build = "0.10.1"
tonic-build = "0.7.0"
tonic-build = { version = "0.7.0", features = ["compression"] }
pretty_assertions = "0.7.2"
# Crates.io does not have `rustfmt` that is sufficient version, so load from github directly.
rustfmt-nightly = { git = "https://github.com/rust-lang/rustfmt", tag = "v1.4.38" }
Expand Down
18 changes: 12 additions & 6 deletions cas/cas_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,47 +126,53 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
services
.ac
.map_or(Ok(None), |cfg| {
AcServer::new(&cfg, &store_manager).and_then(|v| Ok(Some(v.into_service())))
AcServer::new(&cfg, &store_manager)
.and_then(|v| Ok(Some(v.into_service().accept_gzip().send_gzip())))
})
.err_tip(|| "Could not create AC service")?,
)
.add_optional_service(
services
.cas
.map_or(Ok(None), |cfg| {
CasServer::new(&cfg, &store_manager).and_then(|v| Ok(Some(v.into_service())))
CasServer::new(&cfg, &store_manager)
.and_then(|v| Ok(Some(v.into_service().accept_gzip().send_gzip())))
})
.err_tip(|| "Could not create CAS service")?,
)
.add_optional_service(
services
.execution
.map_or(Ok(None), |cfg| {
ExecutionServer::new(&cfg, &schedulers, &store_manager).and_then(|v| Ok(Some(v.into_service())))
ExecutionServer::new(&cfg, &schedulers, &store_manager)
.and_then(|v| Ok(Some(v.into_service().accept_gzip().send_gzip())))
})
.err_tip(|| "Could not create Execution service")?,
)
.add_optional_service(
services
.bytestream
.map_or(Ok(None), |cfg| {
ByteStreamServer::new(&cfg, &store_manager).and_then(|v| Ok(Some(v.into_service())))
ByteStreamServer::new(&cfg, &store_manager)
.and_then(|v| Ok(Some(v.into_service().accept_gzip().send_gzip())))
})
.err_tip(|| "Could not create ByteStream service")?,
)
.add_optional_service(
services
.capabilities
.map_or(Ok(None), |cfg| {
CapabilitiesServer::new(&cfg, &schedulers).and_then(|v| Ok(Some(v.into_service())))
CapabilitiesServer::new(&cfg, &schedulers)
.and_then(|v| Ok(Some(v.into_service().accept_gzip().send_gzip())))
})
.err_tip(|| "Could not create Capabilities service")?,
)
.add_optional_service(
services
.worker_api
.map_or(Ok(None), |cfg| {
WorkerApiServer::new(&cfg, &schedulers).and_then(|v| Ok(Some(v.into_service())))
WorkerApiServer::new(&cfg, &schedulers)
.and_then(|v| Ok(Some(v.into_service().accept_gzip().send_gzip())))
})
.err_tip(|| "Could not create WorkerApi service")?,
);
Expand Down
7 changes: 4 additions & 3 deletions cas/grpc_service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub mod write_tests {

use tonic::{
codec::Codec, // Needed for .decoder().
codec::CompressionEncoding,
codec::ProstCodec,
transport::Body,
Streaming,
Expand All @@ -74,7 +75,7 @@ pub mod write_tests {
let (tx, body) = Body::channel();
let mut codec = ProstCodec::<WriteRequest, WriteRequest>::default();
// Note: This is an undocumented function.
let stream = Streaming::new_request(codec.decoder(), body);
let stream = Streaming::new_request(codec.decoder(), body, Some(CompressionEncoding::Gzip));

let join_handle = tokio::spawn(async move {
let response_future = bs_server.write(Request::new(stream));
Expand Down Expand Up @@ -299,7 +300,7 @@ pub mod query_tests {
byte_stream_server::ByteStream, QueryWriteStatusRequest, QueryWriteStatusResponse, WriteRequest,
};

use tonic::{codec::Codec, codec::ProstCodec, transport::Body, Streaming};
use tonic::{codec::Codec, codec::CompressionEncoding, codec::ProstCodec, transport::Body, Streaming};

#[tokio::test]
pub async fn test_query_write_status_smoke_test() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -336,7 +337,7 @@ pub mod query_tests {
let (tx, body) = Body::channel();
let mut codec = ProstCodec::<WriteRequest, WriteRequest>::default();
// Note: This is an undocumented function.
let stream = Streaming::new_request(codec.decoder(), body);
let stream = Streaming::new_request(codec.decoder(), body, Some(CompressionEncoding::Gzip));

let bs_server_clone = bs_server.clone();
let join_handle = tokio::spawn(async move {
Expand Down

0 comments on commit 438afbf

Please sign in to comment.