Skip to content

Commit

Permalink
add support multipart/mixed request. #1348
Browse files Browse the repository at this point in the history
  • Loading branch information
sunli829 committed Aug 19, 2023
1 parent 1cf4490 commit 39af9a1
Show file tree
Hide file tree
Showing 15 changed files with 314 additions and 15 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

# [6.0.4] 2023-08-18

- add support `multipart/mixed` request. [#1348](https://github.com/async-graphql/async-graphql/issues/1348)
- async-graphql-actix-web: add `GraphQL` handler.
- async-graphql-axum: add `GraphQL` service.

# [6.0.3] 2023-08-15

- dynamic: fix the error that some methods of `XXXAccessor` return reference lifetimes that are smaller than expected.
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async-stream = "0.3.0"
async-trait.workspace = true
bytes.workspace = true
fnv = "1.0.7"
futures-util = { workspace = true, features = ["io", "sink"] }
futures-util = { workspace = true, features = ["std", "io", "sink"] }
http = "0.2.3"
indexmap.workspace = true
mime = "0.3.15"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ Add the extension crate [`async_graphql_apollo_studio_extension`](https://github

- [GraphQL](https://graphql.org)
- [GraphQL Multipart Request](https://github.com/jaydenseric/graphql-multipart-request-spec)
- [Multipart HTTP protocol for GraphQL subscriptions](https://www.apollographql.com/docs/router/executing-operations/subscription-multipart-protocol/)
- [GraphQL Cursor Connections Specification](https://facebook.github.io/relay/graphql/connections.htm)
- [GraphQL over WebSocket Protocol](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md)
- [Apollo Tracing](https://github.com/apollographql/apollo-tracing)
Expand Down
1 change: 1 addition & 0 deletions integrations/actix-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ futures-util = { version = "0.3.0", default-features = false }
serde_cbor = { version = "0.11.2", optional = true }
serde_json.workspace = true
thiserror.workspace = true
async-stream = "0.3.0"

[features]
cbor = ["serde_cbor"]
Expand Down
62 changes: 62 additions & 0 deletions integrations/actix-web/src/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::time::Duration;

use actix_http::StatusCode;
use actix_web::{Handler, HttpRequest, HttpResponse, Responder};
use async_graphql::{
http::{create_multipart_mixed_stream, is_multipart_mixed},
Executor,
};
use futures_util::{future::LocalBoxFuture, FutureExt, StreamExt};

use crate::{GraphQLRequest, GraphQLResponse};

/// A GraphQL handler.
#[derive(Clone)]
pub struct GraphQL<E> {
executor: E,
}

impl<E> GraphQL<E> {
/// Create a GraphQL handler.
pub fn new(executor: E) -> Self {
Self { executor }
}
}

impl<E: Executor> Handler<(HttpRequest, GraphQLRequest)> for GraphQL<E> {
type Output = HttpResponse;
type Future = LocalBoxFuture<'static, Self::Output>;

fn call(&self, (http_req, graphql_req): (HttpRequest, GraphQLRequest)) -> Self::Future {
let executor = self.executor.clone();
async move {
let is_multipart_mixed = http_req
.headers()
.get("accept")
.and_then(|value| value.to_str().ok())
.map(is_multipart_mixed)
.unwrap_or_default();

if is_multipart_mixed {
let stream = executor.execute_stream(graphql_req.0, None);
let interval = Box::pin(async_stream::stream! {
let mut interval = actix_web::rt::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
yield ();
}
});
HttpResponse::build(StatusCode::OK)
.insert_header(("content-type", "multipart/mixed; boundary=graphql"))
.streaming(
create_multipart_mixed_stream(stream, interval)
.map(Ok::<_, actix_web::Error>),
)
} else {
GraphQLResponse(executor.execute(graphql_req.into_inner()).await.into())
.respond_to(&http_req)
}
}
.boxed_local()
}
}
2 changes: 2 additions & 0 deletions integrations/actix-web/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
#![allow(clippy::upper_case_acronyms)]
#![warn(missing_docs)]

mod handler;
mod request;
mod subscription;

pub use handler::GraphQL;
pub use request::{GraphQLBatchRequest, GraphQLRequest, GraphQLResponse};
pub use subscription::GraphQLSubscription;
3 changes: 2 additions & 1 deletion integrations/axum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ async-trait.workspace = true
axum = { version = "0.6.0", features = ["ws", "headers"] }
bytes.workspace = true
futures-util.workspace = true
http-body = "0.4.2"
serde_json.workspace = true
tokio = { version = "1.17.0", features = ["time"] }
tokio-util = { workspace = true, default-features = false, features = [
"io",
"compat",
] }
tokio-stream = "0.1.14"
tower-service = "0.3"
5 changes: 3 additions & 2 deletions integrations/axum/src/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{io::ErrorKind, marker::PhantomData};

use async_graphql::{futures_util::TryStreamExt, http::MultipartOptions, ParseRequestError};
use axum::{
body::HttpBody,
extract::{BodyStream, FromRequest},
http::{self, Method, Request},
response::IntoResponse,
Expand Down Expand Up @@ -62,7 +63,7 @@ pub mod rejection {
#[async_trait::async_trait]
impl<S, B, R> FromRequest<S, B> for GraphQLRequest<R>
where
B: http_body::Body + Unpin + Send + Sync + 'static,
B: HttpBody + Send + Sync + 'static,
B::Data: Into<Bytes>,
B::Error: Into<BoxError>,
S: Send + Sync,
Expand Down Expand Up @@ -98,7 +99,7 @@ impl<R> GraphQLBatchRequest<R> {
#[async_trait::async_trait]
impl<S, B, R> FromRequest<S, B> for GraphQLBatchRequest<R>
where
B: http_body::Body + Unpin + Send + Sync + 'static,
B: HttpBody + Send + Sync + 'static,
B::Data: Into<Bytes>,
B::Error: Into<BoxError>,
S: Send + Sync,
Expand Down
2 changes: 2 additions & 0 deletions integrations/axum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
#![warn(missing_docs)]

mod extract;
mod query;
mod response;
mod subscription;

pub use extract::{GraphQLBatchRequest, GraphQLRequest};
pub use query::GraphQL;
pub use response::GraphQLResponse;
pub use subscription::{GraphQLProtocol, GraphQLSubscription, GraphQLWebSocket};
94 changes: 94 additions & 0 deletions integrations/axum/src/query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::{
convert::Infallible,
task::{Context, Poll},
time::Duration,
};

use async_graphql::{
http::{create_multipart_mixed_stream, is_multipart_mixed},
Executor,
};
use axum::{
body::{BoxBody, HttpBody, StreamBody},
extract::FromRequest,
http::{Request as HttpRequest, Response as HttpResponse},
response::IntoResponse,
BoxError,
};
use bytes::Bytes;
use futures_util::{future::BoxFuture, StreamExt};
use tower_service::Service;

use crate::{
extract::rejection::GraphQLRejection, GraphQLBatchRequest, GraphQLRequest, GraphQLResponse,
};

/// A GraphQL service.
#[derive(Clone)]
pub struct GraphQL<E> {
executor: E,
}

impl<E> GraphQL<E> {
/// Create a GraphQL handler.
pub fn new(executor: E) -> Self {
Self { executor }
}
}

impl<B, E> Service<HttpRequest<B>> for GraphQL<E>
where
B: HttpBody + Send + Sync + 'static,
B::Data: Into<Bytes>,
B::Error: Into<BoxError>,
E: Executor,
{
type Response = HttpResponse<BoxBody>;
type Error = Infallible;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: HttpRequest<B>) -> Self::Future {
let executor = self.executor.clone();
Box::pin(async move {
let is_multipart_mixed = req
.headers()
.get("accept")
.and_then(|value| value.to_str().ok())
.map(is_multipart_mixed)
.unwrap_or_default();

if is_multipart_mixed {
let req = match GraphQLRequest::<GraphQLRejection>::from_request(req, &()).await {
Ok(req) => req,
Err(err) => return Ok(err.into_response()),
};
let stream = executor.execute_stream(req.0, None);
let body = StreamBody::new(
create_multipart_mixed_stream(
stream,
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
Duration::from_secs(30),
))
.map(|_| ()),
)
.map(Ok::<_, std::io::Error>),
);
Ok(HttpResponse::builder()
.header("content-type", "multipart/mixed; boundary=graphql")
.body(body.boxed_unsync())
.expect("BUG: invalid response"))
} else {
let req =
match GraphQLBatchRequest::<GraphQLRejection>::from_request(req, &()).await {
Ok(req) => req,
Err(err) => return Ok(err.into_response()),
};
Ok(GraphQLResponse(executor.execute_batch(req.0).await).into_response())
}
})
}
}
3 changes: 3 additions & 0 deletions integrations/poem/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ serde_json.workspace = true
tokio-util = { workspace = true, default-features = false, features = [
"compat",
] }
mime = "0.3.16"
tokio = { version = "1.17.0", features = ["time"] }
tokio-stream = "0.1.14"
47 changes: 37 additions & 10 deletions integrations/poem/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use async_graphql::Executor;
use poem::{async_trait, Endpoint, FromRequest, Request, Result};
use std::time::Duration;

use crate::{GraphQLBatchRequest, GraphQLBatchResponse};
use async_graphql::{
http::{create_multipart_mixed_stream, is_multipart_mixed},
Executor,
};
use futures_util::StreamExt;
use poem::{async_trait, Body, Endpoint, FromRequest, IntoResponse, Request, Response, Result};

use crate::{GraphQLBatchRequest, GraphQLBatchResponse, GraphQLRequest};

/// A GraphQL query endpoint.
///
Expand Down Expand Up @@ -31,7 +37,7 @@ pub struct GraphQL<E> {
}

impl<E> GraphQL<E> {
/// Create a GraphQL query endpoint.
/// Create a GraphQL endpoint.
pub fn new(executor: E) -> Self {
Self { executor }
}
Expand All @@ -42,13 +48,34 @@ impl<E> Endpoint for GraphQL<E>
where
E: Executor,
{
type Output = GraphQLBatchResponse;
type Output = Response;

async fn call(&self, req: Request) -> Result<Self::Output> {
let (req, mut body) = req.split();
let req = GraphQLBatchRequest::from_request(&req, &mut body).await?;
Ok(GraphQLBatchResponse(
self.executor.execute_batch(req.0).await,
))
let is_multipart_mixed = req
.header("accept")
.map(is_multipart_mixed)
.unwrap_or_default();

if is_multipart_mixed {
let (req, mut body) = req.split();
let req = GraphQLRequest::from_request(&req, &mut body).await?;
let stream = self.executor.execute_stream(req.0, None);
Ok(Response::builder()
.header("content-type", "multipart/mixed; boundary=graphql")
.body(Body::from_bytes_stream(
create_multipart_mixed_stream(
stream,
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
Duration::from_secs(30),
))
.map(|_| ()),
)
.map(Ok::<_, std::io::Error>),
)))
} else {
let (req, mut body) = req.split();
let req = GraphQLBatchRequest::from_request(&req, &mut body).await?;
Ok(GraphQLBatchResponse(self.executor.execute_batch(req.0).await).into_response())
}
}
}
2 changes: 2 additions & 0 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod graphiql_source;
#[cfg(feature = "graphiql")]
mod graphiql_v2_source;
mod multipart;
mod multipart_subscribe;
#[cfg(feature = "playground")]
mod playground_source;
mod websocket;
Expand All @@ -18,6 +19,7 @@ pub use graphiql_source::graphiql_source;
pub use graphiql_v2_source::{Credentials, GraphiQLSource};
use mime;
pub use multipart::MultipartOptions;
pub use multipart_subscribe::{create_multipart_mixed_stream, is_multipart_mixed};
#[cfg(feature = "playground")]
pub use playground_source::{playground_source, GraphQLPlaygroundConfig};
use serde::Deserialize;
Expand Down

0 comments on commit 39af9a1

Please sign in to comment.