From 34d92a6de1257bfff065c84e24af0b20f02adcc7 Mon Sep 17 00:00:00 2001 From: Alexis Aune Date: Tue, 26 Jul 2022 11:43:53 +0200 Subject: [PATCH 1/5] Implemented Content-Encoding aware decompression support --- Cargo.toml | 6 ++ integrations/actix-web/src/request.rs | 7 ++ integrations/axum/src/extract.rs | 9 ++ integrations/poem/src/extractor.rs | 8 ++ integrations/rocket/src/lib.rs | 1 + integrations/tide/src/lib.rs | 8 +- integrations/warp/src/batch_request.rs | 4 +- src/http/mod.rs | 128 ++++++++++++++++++++++--- src/http/multipart.rs | 9 +- 9 files changed, 166 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d4e06f2b1..c809b9b26 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ string_number = [] tokio-sync = ["tokio"] tracing = ["tracinglib", "tracing-futures"] unblock = ["blocking"] +compression = ["brotli", "flate2", "zstd"] [dependencies] async-graphql-derive = { path = "derive", version = "4.0.6" } @@ -99,6 +100,11 @@ serde_cbor = { version = "0.11.1", optional = true } sha2 = { version = "0.10.2", optional = true } zxcvbn = { version = "2.1.2", optional = true } +# compress feature +brotli = { version = "3.3.4", optional = true } +flate2 = { version = "1.0.24", optional = true } +zstd = { version = "0.11.2", optional = true } + [dev-dependencies] futures-channel = "0.3.13" tokio = { version = "1.4.0", features = [ diff --git a/integrations/actix-web/src/request.rs b/integrations/actix-web/src/request.rs index fbc480ee7..315f3703a 100644 --- a/integrations/actix-web/src/request.rs +++ b/integrations/actix-web/src/request.rs @@ -84,6 +84,12 @@ impl FromRequest for GraphQLBatchRequest { .and_then(|value| value.to_str().ok()) .map(|value| value.to_string()); + let content_encoding = req + .headers() + .get(http::header::CONTENT_ENCODING) + .and_then(|value| value.to_str().ok()) + .map(|value| value.to_string()); + let (tx, rx) = async_channel::bounded(16); // Payload is !Send so we create indirection with a channel @@ -100,6 +106,7 @@ impl FromRequest for GraphQLBatchRequest { Ok(GraphQLBatchRequest( async_graphql::http::receive_batch_body( content_type, + content_encoding, rx.map_err(|e| match e { PayloadError::Incomplete(Some(e)) | PayloadError::Io(e) => e, PayloadError::Incomplete(None) => { diff --git a/integrations/axum/src/extract.rs b/integrations/axum/src/extract.rs index 2ec55254a..39c59e17f 100644 --- a/integrations/axum/src/extract.rs +++ b/integrations/axum/src/extract.rs @@ -120,6 +120,13 @@ where .get(http::header::CONTENT_TYPE) .and_then(|value| value.to_str().ok()) .map(ToString::to_string); + + let content_encoding = req + .headers() + .get(http::header::CONTENT_ENCODING) + .and_then(|value| value.to_str().ok()) + .map(ToString::to_string); + let body_stream = BodyStream::from_request(req) .await .map_err(|_| { @@ -129,10 +136,12 @@ where )) })? .map_err(|err| std::io::Error::new(ErrorKind::Other, err.to_string())); + let body_reader = tokio_util::io::StreamReader::new(body_stream).compat(); Ok(Self( async_graphql::http::receive_batch_body( content_type, + content_encoding, body_reader, MultipartOptions::default(), ) diff --git a/integrations/poem/src/extractor.rs b/integrations/poem/src/extractor.rs index 4d33a989a..bbb3cb43b 100644 --- a/integrations/poem/src/extractor.rs +++ b/integrations/poem/src/extractor.rs @@ -76,9 +76,17 @@ impl<'a> FromRequest<'a> for GraphQLBatchRequest { .get(header::CONTENT_TYPE) .and_then(|value| value.to_str().ok()) .map(ToString::to_string); + + let content_encoding = req + .headers() + .get(header::CONTENT_ENCODING) + .and_then(|value| value.to_str().ok()) + .map(ToString::to_string); + Ok(Self( async_graphql::http::receive_batch_body( content_type, + content_encoding, body.take()?.into_async_read().compat(), MultipartOptions::default(), ) diff --git a/integrations/rocket/src/lib.rs b/integrations/rocket/src/lib.rs index d8dad1ea5..6809ad185 100644 --- a/integrations/rocket/src/lib.rs +++ b/integrations/rocket/src/lib.rs @@ -63,6 +63,7 @@ impl<'r> FromData<'r> for GraphQLBatchRequest { let request = async_graphql::http::receive_batch_body( req.headers().get_one("Content-Type"), + req.headers().get_one("Content-Encoding"), data.open( req.limits() .get("graphql") diff --git a/integrations/tide/src/lib.rs b/integrations/tide/src/lib.rs index a180374e3..035758b59 100644 --- a/integrations/tide/src/lib.rs +++ b/integrations/tide/src/lib.rs @@ -135,12 +135,18 @@ pub async fn receive_batch_request_opts( request.query::().map(Into::into) } else if request.method() == Method::Post { let body = request.take_body(); + let content_type = request .header(headers::CONTENT_TYPE) .and_then(|values| values.get(0)) .map(HeaderValue::as_str); - async_graphql::http::receive_batch_body(content_type, body, opts) + let content_encoding = request + .header(headers::CONTENT_ENCODING) + .and_then(|values| values.get(0)) + .map(HeaderValue::as_str); + + async_graphql::http::receive_batch_body(content_type, content_encoding, body, opts) .await .map_err(|e| { tide::Error::new( diff --git a/integrations/warp/src/batch_request.rs b/integrations/warp/src/batch_request.rs index a619bd1f9..019676f00 100644 --- a/integrations/warp/src/batch_request.rs +++ b/integrations/warp/src/batch_request.rs @@ -38,10 +38,12 @@ where .and(warp::get().and(warp::query()).map(BatchRequest::Single)) .or(warp::post() .and(warp::header::optional::("content-type")) + .and(warp::header::optional::("content-encoding")) .and(warp::body::stream()) - .and_then(move |content_type, body| async move { + .and_then(move |content_type, content_encoding, body| async move { async_graphql::http::receive_batch_body( content_type, + content_encoding, TryStreamExt::map_err(body, |e| io::Error::new(ErrorKind::Other, e)) .map_ok(|mut buf| { let remaining = Buf::remaining(&buf); diff --git a/src/http/mod.rs b/src/http/mod.rs index 3b1c5b3c6..a43e4b2a7 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -16,13 +16,30 @@ pub use websocket::{ use crate::{BatchRequest, ParseRequestError, Request}; +/// supported content-encoding values +#[derive(Debug, Clone, Copy)] +pub enum ContentEncoding { + /// LZ77 + Gzip, + + /// Deflate + Deflate, + + /// Brotli + Br, + + /// Zstd + Zstd, +} + /// Receive a GraphQL request from a content type and body. pub async fn receive_body( content_type: Option>, + content_encoding: Option>, body: impl AsyncRead + Send, opts: MultipartOptions, ) -> Result { - receive_batch_body(content_type, body, opts) + receive_batch_body(content_type, content_encoding, body, opts) .await? .into_single() } @@ -30,6 +47,7 @@ pub async fn receive_body( /// Receive a GraphQL request from a content type and body. pub async fn receive_batch_body( content_type: Option>, + content_encoding: Option>, body: impl AsyncRead + Send, opts: MultipartOptions, ) -> Result { @@ -39,13 +57,27 @@ pub async fn receive_batch_body( .map(AsRef::as_ref) .unwrap_or("application/json"); + // parse the content-encoding + let content_encoding = match content_encoding.as_ref().map(AsRef::as_ref) { + Some("gzip") => Some(ContentEncoding::Gzip), + Some("deflate") => Some(ContentEncoding::Deflate), + Some("br") => Some(ContentEncoding::Br), + _ => None, + }; + let content_type: mime::Mime = content_type.parse()?; match (content_type.type_(), content_type.subtype()) { // try to use multipart (mime::MULTIPART, _) => { if let Some(boundary) = content_type.get_param("boundary") { - multipart::receive_batch_multipart(body, boundary.to_string(), opts).await + multipart::receive_batch_multipart( + body, + content_encoding, + boundary.to_string(), + opts, + ) + .await } else { Err(ParseRequestError::InvalidMultipart( multer::Error::NoBoundary, @@ -56,7 +88,7 @@ pub async fn receive_batch_body( // cbor is in application/octet-stream. // Note: cbor will only match if feature ``cbor`` is active // TODO: wait for mime to add application/cbor and match against that too - _ => receive_batch_body_no_multipart(&content_type, body).await, + _ => receive_batch_body_no_multipart(&content_type, content_encoding, body).await, } } @@ -65,6 +97,7 @@ pub async fn receive_batch_body( /// and [``multipart::receive_batch_multipart``] pub(super) async fn receive_batch_body_no_multipart( content_type: &mime::Mime, + content_encoding: Option, body: impl AsyncRead + Send, ) -> Result { assert_ne!(content_type.type_(), mime::MULTIPART, "received multipart"); @@ -73,24 +106,36 @@ pub(super) async fn receive_batch_body_no_multipart( // cbor is in application/octet-stream. // TODO: wait for mime to add application/cbor and match against that too (mime::OCTET_STREAM, _) | (mime::APPLICATION, mime::OCTET_STREAM) => { - receive_batch_cbor(body).await + receive_batch_cbor(body, content_encoding).await } // default to json - _ => receive_batch_json(body).await, + _ => receive_batch_json(body, content_encoding).await, } } /// Receive a GraphQL request from a body as JSON. -pub async fn receive_json(body: impl AsyncRead) -> Result { - receive_batch_json(body).await?.into_single() +pub async fn receive_json( + body: impl AsyncRead, + content_encoding: Option, +) -> Result { + receive_batch_json(body, content_encoding) + .await? + .into_single() } /// Receive a GraphQL batch request from a body as JSON. -pub async fn receive_batch_json(body: impl AsyncRead) -> Result { +pub async fn receive_batch_json( + body: impl AsyncRead, + content_encoding: Option, +) -> Result { let mut data = Vec::new(); futures_util::pin_mut!(body); + body.read_to_end(&mut data) .await .map_err(ParseRequestError::Io)?; + + data = handle_content_encoding(data, content_encoding)?; + serde_json::from_slice::(&data) .map_err(|e| ParseRequestError::InvalidRequest(Box::new(e))) } @@ -98,19 +143,80 @@ pub async fn receive_batch_json(body: impl AsyncRead) -> Result Result { - receive_batch_cbor(body).await?.into_single() +pub async fn receive_cbor( + body: impl AsyncRead, + content_encoding: Option, +) -> Result { + receive_batch_cbor(body, content_encoding) + .await? + .into_single() } /// Receive a GraphQL batch request from a body as CBOR #[cfg(feature = "cbor")] #[cfg_attr(docsrs, doc(cfg(feature = "cbor")))] -pub async fn receive_batch_cbor(body: impl AsyncRead) -> Result { +pub async fn receive_batch_cbor( + body: impl AsyncRead, + content_encoding: Option, +) -> Result { let mut data = Vec::new(); futures_util::pin_mut!(body); body.read_to_end(&mut data) .await .map_err(ParseRequestError::Io)?; + + data = handle_content_encoding(data, content_encoding)?; + serde_cbor::from_slice::(&data) .map_err(|e| ParseRequestError::InvalidRequest(Box::new(e))) } + +/// decompress data if needed +#[cfg(not(feature = "compression"))] +fn handle_content_encoding( + data: Vec, + _: Option, +) -> Result, ParseRequestError> { + Ok(data) +} + +/// decompress data if needed +#[cfg(feature = "compression")] +fn handle_content_encoding( + data: Vec, + content_encoding: Option, +) -> Result, ParseRequestError> { + use std::io::prelude::*; + + use flate2::read::{GzDecoder, ZlibDecoder}; + + match content_encoding { + Some(ContentEncoding::Gzip) => { + let mut buff = Vec::new(); + GzDecoder::new(data.as_slice()) + .read_to_end(&mut buff) + .map_err(ParseRequestError::Io)?; + Ok(buff) + } + Some(ContentEncoding::Deflate) => { + let mut buff = Vec::new(); + ZlibDecoder::new(data.as_slice()) + .read_to_end(&mut buff) + .map_err(ParseRequestError::Io)?; + Ok(buff) + } + Some(ContentEncoding::Br) => { + let mut buff = Vec::new(); + brotli::Decompressor::new(data.as_slice(), 8192) + .read_to_end(&mut buff) + .map_err(ParseRequestError::Io)?; + Ok(buff) + } + Some(ContentEncoding::Zstd) => { + let mut buff = Vec::new(); + zstd::stream::copy_decode(data.as_slice(), &mut buff).map_err(ParseRequestError::Io)?; + Ok(buff) + } + None => Ok(data), + } +} diff --git a/src/http/multipart.rs b/src/http/multipart.rs index 3b9bab412..0a071acc4 100644 --- a/src/http/multipart.rs +++ b/src/http/multipart.rs @@ -9,6 +9,7 @@ use futures_util::{io::AsyncRead, stream::Stream}; use multer::{Constraints, Multipart, SizeLimit}; use pin_project_lite::pin_project; +use super::ContentEncoding; use crate::{BatchRequest, ParseRequestError, UploadValue}; /// Options for `receive_multipart`. @@ -43,6 +44,7 @@ impl MultipartOptions { pub(super) async fn receive_batch_multipart( body: impl AsyncRead + Send, + content_encoding: Option, boundary: impl Into, opts: MultipartOptions, ) -> Result { @@ -79,7 +81,12 @@ pub(super) async fn receive_batch_multipart( Some("operations") => { let body = field.bytes().await?; request = Some( - super::receive_batch_body_no_multipart(&content_type, body.as_ref()).await?, + super::receive_batch_body_no_multipart( + &content_type, + content_encoding, + body.as_ref(), + ) + .await?, ) } Some("map") => { From 13e25f05ce1d2d51da16f16a47ac1e912d2ea51d Mon Sep 17 00:00:00 2001 From: Alexis Aune Date: Tue, 26 Jul 2022 15:36:10 +0200 Subject: [PATCH 2/5] Added unit tests for actix-web and tide --- Cargo.toml | 2 +- integrations/actix-web/Cargo.toml | 4 ++ integrations/actix-web/tests/graphql.rs | 43 ++++++++++++++++ integrations/actix-web/tests/test_utils.rs | 60 ++++++++++++++++++++++ integrations/tide/Cargo.toml | 4 ++ integrations/tide/tests/graphql.rs | 48 +++++++++++++++++ integrations/tide/tests/test_utils.rs | 57 +++++++++++++++++++- src/http/mod.rs | 1 + 8 files changed, 217 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c809b9b26..5cb3f810e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,7 +100,7 @@ serde_cbor = { version = "0.11.1", optional = true } sha2 = { version = "0.10.2", optional = true } zxcvbn = { version = "2.1.2", optional = true } -# compress feature +# compression feature brotli = { version = "3.3.4", optional = true } flate2 = { version = "1.0.24", optional = true } zstd = { version = "0.11.2", optional = true } diff --git a/integrations/actix-web/Cargo.toml b/integrations/actix-web/Cargo.toml index 165c2ba28..8434a6ec8 100644 --- a/integrations/actix-web/Cargo.toml +++ b/integrations/actix-web/Cargo.toml @@ -31,6 +31,10 @@ cbor = ["serde_cbor"] default = [] [dev-dependencies] +async-graphql = { path = "../..", version = "4.0.6", default-features = false, features = [ "compression" ] } actix-rt = "2.6.0" async-mutex = "1.4.0" serde = { version = "1", features = ["derive"] } +brotli = { version = "3.3.4" } +flate2 = { version = "1.0.24" } +zstd = { version = "0.11.2" } diff --git a/integrations/actix-web/tests/graphql.rs b/integrations/actix-web/tests/graphql.rs index 02ce42aeb..7ba598b36 100644 --- a/integrations/actix-web/tests/graphql.rs +++ b/integrations/actix-web/tests/graphql.rs @@ -266,3 +266,46 @@ async fn test_cbor() { } ); } + +#[actix_rt::test] +async fn test_compression() { + let srv = test::init_service( + App::new() + .app_data(Data::new(Schema::new( + AddQueryRoot, + EmptyMutation, + EmptySubscription, + ))) + .service( + web::resource("/") + .guard(guard::Post()) + .to(gql_handle_schema::), + ), + ) + .await; + + for &encoding in ContentEncoding::ALL { + let response = srv + .call( + test::TestRequest::with_uri("/") + .method(Method::POST) + .set_payload(compress_query( + r#"{"query":"{ add(a: 10, b: 20) }"}"#, + encoding, + )) + .insert_header((actix_http::header::ACCEPT, "application/json")) + .insert_header((actix_web::http::header::CONTENT_ENCODING, encoding.header())) + .to_request(), + ) + .await + .unwrap(); + + assert!(response.status().is_success()); + let body = response.into_body(); + + assert_eq!( + actix_web::body::to_bytes(body).await.unwrap(), + json!({"data": {"add": 30}}).to_string().into_bytes() + ); + } +} diff --git a/integrations/actix-web/tests/test_utils.rs b/integrations/actix-web/tests/test_utils.rs index e4f89da51..a45e72dce 100644 --- a/integrations/actix-web/tests/test_utils.rs +++ b/integrations/actix-web/tests/test_utils.rs @@ -1,3 +1,5 @@ +use std::io::Write; + use actix_web::{web, HttpRequest, HttpResponse}; use async_graphql::{ http::{playground_source, GraphQLPlaygroundConfig}, @@ -89,3 +91,61 @@ pub async fn gql_handle_schema_with_header( } schema.execute(request).await.into() } + +#[derive(Debug, Clone, Copy)] +pub enum ContentEncoding { + Gzip, + Deflate, + Br, + Zstd, +} + +impl ContentEncoding { + pub const fn header(&self) -> &'static str { + match self { + ContentEncoding::Gzip => "gzip", + ContentEncoding::Deflate => "deflate", + ContentEncoding::Br => "br", + ContentEncoding::Zstd => "zstd", + } + } + + pub const ALL: &'static [ContentEncoding] = &[ + ContentEncoding::Gzip, + ContentEncoding::Deflate, + ContentEncoding::Br, + ContentEncoding::Zstd, + ]; +} + +// #[cfg(feature = "compression")] +pub fn compress_query(data: impl AsRef, algo: ContentEncoding) -> Vec { + match algo { + ContentEncoding::Gzip => { + let mut encoder = + flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); + encoder.write_all(data.as_ref().as_bytes()).unwrap(); + encoder.finish().unwrap() + } + ContentEncoding::Deflate => { + let mut encoder = + flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default()); + encoder.write_all(data.as_ref().as_bytes()).unwrap(); + encoder.finish().unwrap() + } + ContentEncoding::Br => { + let mut buff = Vec::new(); + let mut encoder = + brotli::CompressorWriter::with_params(&mut buff, 4096, &Default::default()); + encoder.write_all(data.as_ref().as_bytes()).unwrap(); + encoder.flush().unwrap(); + encoder.into_inner().to_vec() + } + ContentEncoding::Zstd => { + let mut buff = Vec::new(); + let mut encoder = zstd::stream::Encoder::new(&mut buff, 9).unwrap(); + encoder.write_all(data.as_ref().as_bytes()).unwrap(); + encoder.finish().unwrap().to_vec() + } + } +} diff --git a/integrations/tide/Cargo.toml b/integrations/tide/Cargo.toml index ea69052c3..78fafa0dc 100644 --- a/integrations/tide/Cargo.toml +++ b/integrations/tide/Cargo.toml @@ -28,6 +28,7 @@ tide = { version = "0.16.0", default-features = false, features = [ tide-websockets = { version = "0.4.0", optional = true } [dev-dependencies] +async-graphql = { path = "../..", version = "4.0.6", default-features = false, features = [ "compression" ] } # Surf lacks multipart support async-std = { version = "1.9.0", features = ["attributes", "tokio1"] } reqwest = { version = "0.11.2", default-features = false, features = [ @@ -35,3 +36,6 @@ reqwest = { version = "0.11.2", default-features = false, features = [ "multipart", ] } serde_json = "1.0.64" +brotli = { version = "3.3.4" } +flate2 = { version = "1.0.24" } +zstd = { version = "0.11.2" } diff --git a/integrations/tide/tests/graphql.rs b/integrations/tide/tests/graphql.rs index aaf451e84..65f9bafd6 100644 --- a/integrations/tide/tests/graphql.rs +++ b/integrations/tide/tests/graphql.rs @@ -5,6 +5,9 @@ use std::io::Read; use async_graphql::*; use reqwest::{header, StatusCode}; use serde_json::json; +use test_utils::ContentEncoding; + +use crate::test_utils::compress_query; type Result = std::result::Result>; @@ -215,3 +218,48 @@ async fn upload() -> Result<()> { Ok(()) } + + +#[async_std::test] +async fn compression() -> Result<()> { + let listen_addr = "127.0.0.1:8081"; + + async_std::task::spawn(async move { + struct QueryRoot; + #[Object] + impl QueryRoot { + /// Returns the sum of a and b + async fn add(&self, a: i32, b: i32) -> i32 { + a + b + } + } + + let schema = Schema::build(QueryRoot, EmptyMutation, EmptySubscription).finish(); + + let mut app = tide::new(); + let endpoint = async_graphql_tide::graphql(schema); + app.at("/").post(endpoint.clone()).get(endpoint); + app.listen(listen_addr).await + }); + + test_utils::wait_server_ready().await; + + let client = test_utils::client(); + + for &encoding in ContentEncoding::ALL { + let resp = client + .post(&format!("http://{}", listen_addr)) + .header("Content-Encoding", encoding.header()) + .body(compress_query(r#"{"query":"{ add(a: 10, b: 20) }"}"#, encoding)) + .send() + .await?; + + assert_eq!(resp.status(), StatusCode::OK); + let string = resp.text().await?; + println!("via post {}", string); + + assert_eq!(string, json!({"data": {"add": 30}}).to_string()); + } + + Ok(()) +} \ No newline at end of file diff --git a/integrations/tide/tests/test_utils.rs b/integrations/tide/tests/test_utils.rs index 2cf2bf145..0c89af6c3 100644 --- a/integrations/tide/tests/test_utils.rs +++ b/integrations/tide/tests/test_utils.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{time::Duration, io::Write}; use reqwest::Client; @@ -9,3 +9,58 @@ pub fn client() -> Client { pub async fn wait_server_ready() { async_std::task::sleep(Duration::from_secs(1)).await; } + +#[derive(Debug, Clone, Copy)] +pub enum ContentEncoding { + Gzip, + Deflate, + Br, + Zstd +} + +impl ContentEncoding { + pub const fn header(&self) -> &'static str { + match self { + ContentEncoding::Gzip => "gzip", + ContentEncoding::Deflate => "deflate", + ContentEncoding::Br => "br", + ContentEncoding::Zstd => "zstd", + } + } + + pub const ALL: &'static [ContentEncoding] = &[ + ContentEncoding::Gzip, + ContentEncoding::Deflate, + ContentEncoding::Br, + ContentEncoding::Zstd, + ]; +} + +// #[cfg(feature = "compression")] +pub fn compress_query(data: impl AsRef, algo: ContentEncoding) -> Vec { + match algo { + ContentEncoding::Gzip => { + let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); + encoder.write_all(data.as_ref().as_bytes()).unwrap(); + encoder.finish().unwrap() + }, + ContentEncoding::Deflate => { + let mut encoder = flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default()); + encoder.write_all(data.as_ref().as_bytes()).unwrap(); + encoder.finish().unwrap() + }, + ContentEncoding::Br => { + let mut buff = Vec::new(); + let mut encoder = brotli::CompressorWriter::with_params(&mut buff, 4096, &Default::default()); + encoder.write_all(data.as_ref().as_bytes()).unwrap(); + encoder.flush().unwrap(); + encoder.into_inner().to_vec() + }, + ContentEncoding::Zstd => { + let mut buff = Vec::new(); + let mut encoder = zstd::stream::Encoder::new(&mut buff, 9).unwrap(); + encoder.write_all(data.as_ref().as_bytes()).unwrap(); + encoder.finish().unwrap().to_vec() + } + } +} diff --git a/src/http/mod.rs b/src/http/mod.rs index a43e4b2a7..2a72de4ff 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -62,6 +62,7 @@ pub async fn receive_batch_body( Some("gzip") => Some(ContentEncoding::Gzip), Some("deflate") => Some(ContentEncoding::Deflate), Some("br") => Some(ContentEncoding::Br), + Some("zstd") => Some(ContentEncoding::Zstd), _ => None, }; From 2818e6b0adfe1ebf5fee6a2b3acf0972fff0b94c Mon Sep 17 00:00:00 2001 From: Alexis Aune Date: Tue, 26 Jul 2022 15:37:42 +0200 Subject: [PATCH 3/5] Ran rustfmt --- integrations/tide/tests/graphql.rs | 12 +++++++----- integrations/tide/tests/test_utils.rs | 19 +++++++++++-------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/integrations/tide/tests/graphql.rs b/integrations/tide/tests/graphql.rs index 65f9bafd6..daa190399 100644 --- a/integrations/tide/tests/graphql.rs +++ b/integrations/tide/tests/graphql.rs @@ -219,7 +219,6 @@ async fn upload() -> Result<()> { Ok(()) } - #[async_std::test] async fn compression() -> Result<()> { let listen_addr = "127.0.0.1:8081"; @@ -250,16 +249,19 @@ async fn compression() -> Result<()> { let resp = client .post(&format!("http://{}", listen_addr)) .header("Content-Encoding", encoding.header()) - .body(compress_query(r#"{"query":"{ add(a: 10, b: 20) }"}"#, encoding)) + .body(compress_query( + r#"{"query":"{ add(a: 10, b: 20) }"}"#, + encoding, + )) .send() .await?; - + assert_eq!(resp.status(), StatusCode::OK); let string = resp.text().await?; println!("via post {}", string); - + assert_eq!(string, json!({"data": {"add": 30}}).to_string()); } Ok(()) -} \ No newline at end of file +} diff --git a/integrations/tide/tests/test_utils.rs b/integrations/tide/tests/test_utils.rs index 0c89af6c3..9cdbdfaca 100644 --- a/integrations/tide/tests/test_utils.rs +++ b/integrations/tide/tests/test_utils.rs @@ -1,4 +1,4 @@ -use std::{time::Duration, io::Write}; +use std::{io::Write, time::Duration}; use reqwest::Client; @@ -15,7 +15,7 @@ pub enum ContentEncoding { Gzip, Deflate, Br, - Zstd + Zstd, } impl ContentEncoding { @@ -40,22 +40,25 @@ impl ContentEncoding { pub fn compress_query(data: impl AsRef, algo: ContentEncoding) -> Vec { match algo { ContentEncoding::Gzip => { - let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); + let mut encoder = + flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default()); encoder.write_all(data.as_ref().as_bytes()).unwrap(); encoder.finish().unwrap() - }, + } ContentEncoding::Deflate => { - let mut encoder = flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default()); + let mut encoder = + flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default()); encoder.write_all(data.as_ref().as_bytes()).unwrap(); encoder.finish().unwrap() - }, + } ContentEncoding::Br => { let mut buff = Vec::new(); - let mut encoder = brotli::CompressorWriter::with_params(&mut buff, 4096, &Default::default()); + let mut encoder = + brotli::CompressorWriter::with_params(&mut buff, 4096, &Default::default()); encoder.write_all(data.as_ref().as_bytes()).unwrap(); encoder.flush().unwrap(); encoder.into_inner().to_vec() - }, + } ContentEncoding::Zstd => { let mut buff = Vec::new(); let mut encoder = zstd::stream::Encoder::new(&mut buff, 9).unwrap(); From a88e71fb9693a232490d1ea0b11686545fde41f4 Mon Sep 17 00:00:00 2001 From: Alexis Aune Date: Tue, 26 Jul 2022 16:20:56 +0200 Subject: [PATCH 4/5] Fixed content-type header not set in tide unit tests --- integrations/actix-web/tests/graphql.rs | 5 +++-- integrations/tide/tests/graphql.rs | 5 +++-- src/http/mod.rs | 1 - 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/integrations/actix-web/tests/graphql.rs b/integrations/actix-web/tests/graphql.rs index 7ba598b36..b05baa8e0 100644 --- a/integrations/actix-web/tests/graphql.rs +++ b/integrations/actix-web/tests/graphql.rs @@ -300,12 +300,13 @@ async fn test_compression() { .await .unwrap(); - assert!(response.status().is_success()); + assert!(response.status().is_success(), "using {:?}", encoding); let body = response.into_body(); assert_eq!( actix_web::body::to_bytes(body).await.unwrap(), - json!({"data": {"add": 30}}).to_string().into_bytes() + json!({"data": {"add": 30}}).to_string().into_bytes(), + "using {:?}", encoding ); } } diff --git a/integrations/tide/tests/graphql.rs b/integrations/tide/tests/graphql.rs index daa190399..89a6dbc41 100644 --- a/integrations/tide/tests/graphql.rs +++ b/integrations/tide/tests/graphql.rs @@ -248,6 +248,7 @@ async fn compression() -> Result<()> { for &encoding in ContentEncoding::ALL { let resp = client .post(&format!("http://{}", listen_addr)) + .header("Content-Type", "application/json") .header("Content-Encoding", encoding.header()) .body(compress_query( r#"{"query":"{ add(a: 10, b: 20) }"}"#, @@ -256,11 +257,11 @@ async fn compression() -> Result<()> { .send() .await?; - assert_eq!(resp.status(), StatusCode::OK); + assert_eq!(resp.status(), StatusCode::OK, "using {:?}", encoding); let string = resp.text().await?; println!("via post {}", string); - assert_eq!(string, json!({"data": {"add": 30}}).to_string()); + assert_eq!(string, json!({"data": {"add": 30}}).to_string(), "using {:?}", encoding); } Ok(()) diff --git a/src/http/mod.rs b/src/http/mod.rs index 2a72de4ff..0792ee10a 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -188,7 +188,6 @@ fn handle_content_encoding( content_encoding: Option, ) -> Result, ParseRequestError> { use std::io::prelude::*; - use flate2::read::{GzDecoder, ZlibDecoder}; match content_encoding { From 8095ff1e264ba214213594cd451d2945fe73fe4a Mon Sep 17 00:00:00 2001 From: Alexis Aune Date: Tue, 26 Jul 2022 16:23:39 +0200 Subject: [PATCH 5/5] Ran rustfmt --- integrations/actix-web/tests/graphql.rs | 3 ++- integrations/tide/tests/graphql.rs | 7 ++++++- src/http/mod.rs | 1 + 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/integrations/actix-web/tests/graphql.rs b/integrations/actix-web/tests/graphql.rs index b05baa8e0..24261c579 100644 --- a/integrations/actix-web/tests/graphql.rs +++ b/integrations/actix-web/tests/graphql.rs @@ -306,7 +306,8 @@ async fn test_compression() { assert_eq!( actix_web::body::to_bytes(body).await.unwrap(), json!({"data": {"add": 30}}).to_string().into_bytes(), - "using {:?}", encoding + "using {:?}", + encoding ); } } diff --git a/integrations/tide/tests/graphql.rs b/integrations/tide/tests/graphql.rs index 89a6dbc41..0f266c70f 100644 --- a/integrations/tide/tests/graphql.rs +++ b/integrations/tide/tests/graphql.rs @@ -261,7 +261,12 @@ async fn compression() -> Result<()> { let string = resp.text().await?; println!("via post {}", string); - assert_eq!(string, json!({"data": {"add": 30}}).to_string(), "using {:?}", encoding); + assert_eq!( + string, + json!({"data": {"add": 30}}).to_string(), + "using {:?}", + encoding + ); } Ok(()) diff --git a/src/http/mod.rs b/src/http/mod.rs index 0792ee10a..2a72de4ff 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -188,6 +188,7 @@ fn handle_content_encoding( content_encoding: Option, ) -> Result, ParseRequestError> { use std::io::prelude::*; + use flate2::read::{GzDecoder, ZlibDecoder}; match content_encoding {