From 39af9a1e8889f6dc9681ccf3441c55084813438a Mon Sep 17 00:00:00 2001 From: Sunli Date: Sat, 19 Aug 2023 11:16:42 +0800 Subject: [PATCH] add support `multipart/mixed` request. #1348 --- CHANGELOG.md | 6 ++ Cargo.toml | 2 +- README.md | 1 + examples | 2 +- integrations/actix-web/Cargo.toml | 1 + integrations/actix-web/src/handler.rs | 62 +++++++++++++++++ integrations/actix-web/src/lib.rs | 2 + integrations/axum/Cargo.toml | 3 +- integrations/axum/src/extract.rs | 5 +- integrations/axum/src/lib.rs | 2 + integrations/axum/src/query.rs | 94 ++++++++++++++++++++++++++ integrations/poem/Cargo.toml | 3 + integrations/poem/src/query.rs | 47 ++++++++++--- src/http/mod.rs | 2 + src/http/multipart_subscribe.rs | 97 +++++++++++++++++++++++++++ 15 files changed, 314 insertions(+), 15 deletions(-) create mode 100644 integrations/actix-web/src/handler.rs create mode 100644 integrations/axum/src/query.rs create mode 100644 src/http/multipart_subscribe.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cf514aee..c9d681d75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +# [6.0.4] 2023-08-18 + +- add support `multipart/mixed` request. [#1348](https://github.com/async-graphql/async-graphql/issues/1348) +- async-graphql-actix-web: add `GraphQL` handler. +- async-graphql-axum: add `GraphQL` service. + # [6.0.3] 2023-08-15 - dynamic: fix the error that some methods of `XXXAccessor` return reference lifetimes that are smaller than expected. diff --git a/Cargo.toml b/Cargo.toml index 77355620e..9e89ff5d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ async-stream = "0.3.0" async-trait.workspace = true bytes.workspace = true fnv = "1.0.7" -futures-util = { workspace = true, features = ["io", "sink"] } +futures-util = { workspace = true, features = ["std", "io", "sink"] } http = "0.2.3" indexmap.workspace = true mime = "0.3.15" diff --git a/README.md b/README.md index 30f2b799d..b376eefe0 100644 --- a/README.md +++ b/README.md @@ -212,6 +212,7 @@ Add the extension crate [`async_graphql_apollo_studio_extension`](https://github - [GraphQL](https://graphql.org) - [GraphQL Multipart Request](https://github.com/jaydenseric/graphql-multipart-request-spec) +- [Multipart HTTP protocol for GraphQL subscriptions](https://www.apollographql.com/docs/router/executing-operations/subscription-multipart-protocol/) - [GraphQL Cursor Connections Specification](https://facebook.github.io/relay/graphql/connections.htm) - [GraphQL over WebSocket Protocol](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md) - [Apollo Tracing](https://github.com/apollographql/apollo-tracing) diff --git a/examples b/examples index 2d2132556..08e637650 160000 --- a/examples +++ b/examples @@ -1 +1 @@ -Subproject commit 2d21325566bf07a14ca6b3727354f1ba838438ab +Subproject commit 08e6376500206050f13bad817d66ea86d4a6bf98 diff --git a/integrations/actix-web/Cargo.toml b/integrations/actix-web/Cargo.toml index 6b9866d44..ef2e87cc1 100644 --- a/integrations/actix-web/Cargo.toml +++ b/integrations/actix-web/Cargo.toml @@ -24,6 +24,7 @@ futures-util = { version = "0.3.0", default-features = false } serde_cbor = { version = "0.11.2", optional = true } serde_json.workspace = true thiserror.workspace = true +async-stream = "0.3.0" [features] cbor = ["serde_cbor"] diff --git a/integrations/actix-web/src/handler.rs b/integrations/actix-web/src/handler.rs new file mode 100644 index 000000000..7c181d640 --- /dev/null +++ b/integrations/actix-web/src/handler.rs @@ -0,0 +1,62 @@ +use std::time::Duration; + +use actix_http::StatusCode; +use actix_web::{Handler, HttpRequest, HttpResponse, Responder}; +use async_graphql::{ + http::{create_multipart_mixed_stream, is_multipart_mixed}, + Executor, +}; +use futures_util::{future::LocalBoxFuture, FutureExt, StreamExt}; + +use crate::{GraphQLRequest, GraphQLResponse}; + +/// A GraphQL handler. +#[derive(Clone)] +pub struct GraphQL { + executor: E, +} + +impl GraphQL { + /// Create a GraphQL handler. + pub fn new(executor: E) -> Self { + Self { executor } + } +} + +impl Handler<(HttpRequest, GraphQLRequest)> for GraphQL { + type Output = HttpResponse; + type Future = LocalBoxFuture<'static, Self::Output>; + + fn call(&self, (http_req, graphql_req): (HttpRequest, GraphQLRequest)) -> Self::Future { + let executor = self.executor.clone(); + async move { + let is_multipart_mixed = http_req + .headers() + .get("accept") + .and_then(|value| value.to_str().ok()) + .map(is_multipart_mixed) + .unwrap_or_default(); + + if is_multipart_mixed { + let stream = executor.execute_stream(graphql_req.0, None); + let interval = Box::pin(async_stream::stream! { + let mut interval = actix_web::rt::time::interval(Duration::from_secs(30)); + loop { + interval.tick().await; + yield (); + } + }); + HttpResponse::build(StatusCode::OK) + .insert_header(("content-type", "multipart/mixed; boundary=graphql")) + .streaming( + create_multipart_mixed_stream(stream, interval) + .map(Ok::<_, actix_web::Error>), + ) + } else { + GraphQLResponse(executor.execute(graphql_req.into_inner()).await.into()) + .respond_to(&http_req) + } + } + .boxed_local() + } +} diff --git a/integrations/actix-web/src/lib.rs b/integrations/actix-web/src/lib.rs index 2f0c35e32..8244e4fd4 100644 --- a/integrations/actix-web/src/lib.rs +++ b/integrations/actix-web/src/lib.rs @@ -3,8 +3,10 @@ #![allow(clippy::upper_case_acronyms)] #![warn(missing_docs)] +mod handler; mod request; mod subscription; +pub use handler::GraphQL; pub use request::{GraphQLBatchRequest, GraphQLRequest, GraphQLResponse}; pub use subscription::GraphQLSubscription; diff --git a/integrations/axum/Cargo.toml b/integrations/axum/Cargo.toml index 7ee29a364..0ff7c74a1 100644 --- a/integrations/axum/Cargo.toml +++ b/integrations/axum/Cargo.toml @@ -18,10 +18,11 @@ async-trait.workspace = true axum = { version = "0.6.0", features = ["ws", "headers"] } bytes.workspace = true futures-util.workspace = true -http-body = "0.4.2" serde_json.workspace = true +tokio = { version = "1.17.0", features = ["time"] } tokio-util = { workspace = true, default-features = false, features = [ "io", "compat", ] } +tokio-stream = "0.1.14" tower-service = "0.3" diff --git a/integrations/axum/src/extract.rs b/integrations/axum/src/extract.rs index fba042aa2..990756b49 100644 --- a/integrations/axum/src/extract.rs +++ b/integrations/axum/src/extract.rs @@ -2,6 +2,7 @@ use std::{io::ErrorKind, marker::PhantomData}; use async_graphql::{futures_util::TryStreamExt, http::MultipartOptions, ParseRequestError}; use axum::{ + body::HttpBody, extract::{BodyStream, FromRequest}, http::{self, Method, Request}, response::IntoResponse, @@ -62,7 +63,7 @@ pub mod rejection { #[async_trait::async_trait] impl FromRequest for GraphQLRequest where - B: http_body::Body + Unpin + Send + Sync + 'static, + B: HttpBody + Send + Sync + 'static, B::Data: Into, B::Error: Into, S: Send + Sync, @@ -98,7 +99,7 @@ impl GraphQLBatchRequest { #[async_trait::async_trait] impl FromRequest for GraphQLBatchRequest where - B: http_body::Body + Unpin + Send + Sync + 'static, + B: HttpBody + Send + Sync + 'static, B::Data: Into, B::Error: Into, S: Send + Sync, diff --git a/integrations/axum/src/lib.rs b/integrations/axum/src/lib.rs index 7aab72c80..6858ea17f 100644 --- a/integrations/axum/src/lib.rs +++ b/integrations/axum/src/lib.rs @@ -4,9 +4,11 @@ #![warn(missing_docs)] mod extract; +mod query; mod response; mod subscription; pub use extract::{GraphQLBatchRequest, GraphQLRequest}; +pub use query::GraphQL; pub use response::GraphQLResponse; pub use subscription::{GraphQLProtocol, GraphQLSubscription, GraphQLWebSocket}; diff --git a/integrations/axum/src/query.rs b/integrations/axum/src/query.rs new file mode 100644 index 000000000..6de0de3f5 --- /dev/null +++ b/integrations/axum/src/query.rs @@ -0,0 +1,94 @@ +use std::{ + convert::Infallible, + task::{Context, Poll}, + time::Duration, +}; + +use async_graphql::{ + http::{create_multipart_mixed_stream, is_multipart_mixed}, + Executor, +}; +use axum::{ + body::{BoxBody, HttpBody, StreamBody}, + extract::FromRequest, + http::{Request as HttpRequest, Response as HttpResponse}, + response::IntoResponse, + BoxError, +}; +use bytes::Bytes; +use futures_util::{future::BoxFuture, StreamExt}; +use tower_service::Service; + +use crate::{ + extract::rejection::GraphQLRejection, GraphQLBatchRequest, GraphQLRequest, GraphQLResponse, +}; + +/// A GraphQL service. +#[derive(Clone)] +pub struct GraphQL { + executor: E, +} + +impl GraphQL { + /// Create a GraphQL handler. + pub fn new(executor: E) -> Self { + Self { executor } + } +} + +impl Service> for GraphQL +where + B: HttpBody + Send + Sync + 'static, + B::Data: Into, + B::Error: Into, + E: Executor, +{ + type Response = HttpResponse; + type Error = Infallible; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: HttpRequest) -> Self::Future { + let executor = self.executor.clone(); + Box::pin(async move { + let is_multipart_mixed = req + .headers() + .get("accept") + .and_then(|value| value.to_str().ok()) + .map(is_multipart_mixed) + .unwrap_or_default(); + + if is_multipart_mixed { + let req = match GraphQLRequest::::from_request(req, &()).await { + Ok(req) => req, + Err(err) => return Ok(err.into_response()), + }; + let stream = executor.execute_stream(req.0, None); + let body = StreamBody::new( + create_multipart_mixed_stream( + stream, + tokio_stream::wrappers::IntervalStream::new(tokio::time::interval( + Duration::from_secs(30), + )) + .map(|_| ()), + ) + .map(Ok::<_, std::io::Error>), + ); + Ok(HttpResponse::builder() + .header("content-type", "multipart/mixed; boundary=graphql") + .body(body.boxed_unsync()) + .expect("BUG: invalid response")) + } else { + let req = + match GraphQLBatchRequest::::from_request(req, &()).await { + Ok(req) => req, + Err(err) => return Ok(err.into_response()), + }; + Ok(GraphQLResponse(executor.execute_batch(req.0).await).into_response()) + } + }) + } +} diff --git a/integrations/poem/Cargo.toml b/integrations/poem/Cargo.toml index 8a9b81bb7..ba71d9e6b 100644 --- a/integrations/poem/Cargo.toml +++ b/integrations/poem/Cargo.toml @@ -20,3 +20,6 @@ serde_json.workspace = true tokio-util = { workspace = true, default-features = false, features = [ "compat", ] } +mime = "0.3.16" +tokio = { version = "1.17.0", features = ["time"] } +tokio-stream = "0.1.14" diff --git a/integrations/poem/src/query.rs b/integrations/poem/src/query.rs index 498d3c74b..1e31cfcbc 100644 --- a/integrations/poem/src/query.rs +++ b/integrations/poem/src/query.rs @@ -1,7 +1,13 @@ -use async_graphql::Executor; -use poem::{async_trait, Endpoint, FromRequest, Request, Result}; +use std::time::Duration; -use crate::{GraphQLBatchRequest, GraphQLBatchResponse}; +use async_graphql::{ + http::{create_multipart_mixed_stream, is_multipart_mixed}, + Executor, +}; +use futures_util::StreamExt; +use poem::{async_trait, Body, Endpoint, FromRequest, IntoResponse, Request, Response, Result}; + +use crate::{GraphQLBatchRequest, GraphQLBatchResponse, GraphQLRequest}; /// A GraphQL query endpoint. /// @@ -31,7 +37,7 @@ pub struct GraphQL { } impl GraphQL { - /// Create a GraphQL query endpoint. + /// Create a GraphQL endpoint. pub fn new(executor: E) -> Self { Self { executor } } @@ -42,13 +48,34 @@ impl Endpoint for GraphQL where E: Executor, { - type Output = GraphQLBatchResponse; + type Output = Response; async fn call(&self, req: Request) -> Result { - let (req, mut body) = req.split(); - let req = GraphQLBatchRequest::from_request(&req, &mut body).await?; - Ok(GraphQLBatchResponse( - self.executor.execute_batch(req.0).await, - )) + let is_multipart_mixed = req + .header("accept") + .map(is_multipart_mixed) + .unwrap_or_default(); + + if is_multipart_mixed { + let (req, mut body) = req.split(); + let req = GraphQLRequest::from_request(&req, &mut body).await?; + let stream = self.executor.execute_stream(req.0, None); + Ok(Response::builder() + .header("content-type", "multipart/mixed; boundary=graphql") + .body(Body::from_bytes_stream( + create_multipart_mixed_stream( + stream, + tokio_stream::wrappers::IntervalStream::new(tokio::time::interval( + Duration::from_secs(30), + )) + .map(|_| ()), + ) + .map(Ok::<_, std::io::Error>), + ))) + } else { + let (req, mut body) = req.split(); + let req = GraphQLBatchRequest::from_request(&req, &mut body).await?; + Ok(GraphQLBatchResponse(self.executor.execute_batch(req.0).await).into_response()) + } } } diff --git a/src/http/mod.rs b/src/http/mod.rs index ff3bc3ee3..d36e9faf6 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -5,6 +5,7 @@ mod graphiql_source; #[cfg(feature = "graphiql")] mod graphiql_v2_source; mod multipart; +mod multipart_subscribe; #[cfg(feature = "playground")] mod playground_source; mod websocket; @@ -18,6 +19,7 @@ pub use graphiql_source::graphiql_source; pub use graphiql_v2_source::{Credentials, GraphiQLSource}; use mime; pub use multipart::MultipartOptions; +pub use multipart_subscribe::{create_multipart_mixed_stream, is_multipart_mixed}; #[cfg(feature = "playground")] pub use playground_source::{playground_source, GraphQLPlaygroundConfig}; use serde::Deserialize; diff --git a/src/http/multipart_subscribe.rs b/src/http/multipart_subscribe.rs new file mode 100644 index 000000000..238435243 --- /dev/null +++ b/src/http/multipart_subscribe.rs @@ -0,0 +1,97 @@ +use bytes::{BufMut, Bytes, BytesMut}; +use futures_util::{stream::BoxStream, Stream, StreamExt}; +use mime::Mime; + +use crate::Response; + +static PART_HEADER: Bytes = + Bytes::from_static(b"--graphql\r\nContent-Type: application/json\r\n\r\n"); +static EOF: Bytes = Bytes::from_static(b"--graphql--\r\n"); +static CRLF: Bytes = Bytes::from_static(b"\r\n"); +static HEARTBEAT: Bytes = Bytes::from_static(b"{}\r\n"); + +/// Create a stream for `multipart/mixed` responses. +/// +/// Reference: +pub fn create_multipart_mixed_stream<'a>( + input: impl Stream + Send + Unpin + 'a, + heartbeat_timer: impl Stream + Send + Unpin + 'a, +) -> BoxStream<'a, Bytes> { + let mut input = input.fuse(); + let mut heartbeat_timer = heartbeat_timer.fuse(); + + async_stream::stream! { + loop { + futures_util::select! { + item = input.next() => { + match item { + Some(resp) => { + let data = BytesMut::new(); + let mut writer = data.writer(); + if serde_json::to_writer(&mut writer, &resp).is_err() { + continue; + } + + yield PART_HEADER.clone(); + yield writer.into_inner().freeze(); + yield CRLF.clone(); + } + None => break, + } + } + _ = heartbeat_timer.next() => { + yield PART_HEADER.clone(); + yield HEARTBEAT.clone(); + } + } + } + + yield EOF.clone(); + } + .boxed() +} + +fn parse_accept(accept: &str) -> Vec { + let mut items = accept + .split(',') + .map(str::trim) + .filter_map(|item| { + let mime: Mime = item.parse().ok()?; + let q = mime + .get_param("q") + .and_then(|value| Some((value.as_str().parse::().ok()? * 1000.0) as i32)) + .unwrap_or(1000); + Some((mime, q)) + }) + .collect::>(); + items.sort_by(|(_, qa), (_, qb)| qb.cmp(qa)); + items.into_iter().map(|(mime, _)| mime).collect() +} + +/// Check accept is multipart-mixed +/// +/// # Example header +/// +/// ```text +/// Accept: multipart/mixed; boundary="graphql"; subscriptionSpec="1.0" +/// ``` +/// +/// the value for boundary should always be `graphql`, and the value +/// for `subscriptionSpec` should always be `1.0`. +/// +/// Reference: +pub fn is_multipart_mixed(accept: &str) -> bool { + parse_accept(accept) + .into_iter() + .next() + .map(|mime| { + mime.type_() == mime::MULTIPART + && mime.subtype() == "mixed" + && mime.get_param(mime::BOUNDARY).map(|value| value.as_str()) == Some("graphql") + && mime + .get_param("subscriptionSpec") + .map(|value| value.as_str()) + == Some("1.0") + }) + .unwrap_or_default() +}