Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/book/content/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
- [Objects and generics](advanced/objects_and_generics.md)
- [Multiple operations per request](advanced/multiple_ops_per_request.md)
- [Dataloaders](advanced/dataloaders.md)
- [Subscriptions](advanced/subscriptions.md)

# - [Context switching]

Expand Down
1 change: 1 addition & 0 deletions docs/book/content/advanced/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ The chapters below cover some more advanced scenarios.
- [Objects and generics](objects_and_generics.md)
- [Multiple operations per request](multiple_ops_per_request.md)
- [Dataloaders](dataloaders.md)
- [Subscriptions](subscriptions.md)
176 changes: 176 additions & 0 deletions docs/book/content/advanced/subscriptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# Subscriptions
### How to achieve realtime data with GraphQL subscriptions

GraphQL subscriptions are a way to push data from the server to clients requesting real-time messages
from the server. Subscriptions are similar to queries in that they specify a set of fields to be delivered to the client,
but instead of immediately returning a single answer, a result is sent every time a particular event happens on the
server.

In order to execute subscriptions you need a coordinator (that spawns connections)
and a GraphQL object that can be resolved into a stream--elements of which will then
be returned to the end user. The [juniper_subscriptions][juniper_subscriptions] crate
provides a default connection implementation. Currently subscriptions are only supported on the `master` branch. Add the following to your `Cargo.toml`:
```toml
[dependencies]
juniper = { git = "https://github.com/graphql-rust/juniper", branch = "master" }
juniper_subscriptions = { git = "https://github.com/graphql-rust/juniper", branch = "master" }
```

### Schema Definition

The Subscription is just a GraphQL object, similar to the Query root and Mutations object that you defined for the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The Subscription is just a GraphQL object, similar to the Query root and Mutations object that you defined for the
The top-level GraphQL Subscription is just a GraphQL object, similar to the Query object and Mutation object defined

operations in your [Schema][Schema], the difference is that all the operations defined there should be async and the return of it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
operations in your [Schema][Schema], the difference is that all the operations defined there should be async and the return of it
in your [Schema][Schema], the difference is that all the operations defined in the GraphQL Subscription object should be async and the return value

should be a [Stream][Stream].

This example shows a subscription operation that returns two events, the strings `Hello` and `World!`
sequentially:

```rust
# use juniper::http::GraphQLRequest;
# use juniper::{DefaultScalarValue, FieldError, SubscriptionCoordinator};
# use juniper_subscriptions::Coordinator;
# use futures::{Stream, StreamExt};
# use std::pin::Pin;
# #[derive(Clone)]
# pub struct Database;
# impl juniper::Context for Database {}
# impl Database {
# fn new() -> Self {
# Self {}
# }
# }
# pub struct Query;
# #[juniper::graphql_object(Context = Database)]
# impl Query {
# fn hello_world() -> &str {
# "Hello World!"
# }
# }
pub struct Subscription;

type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send>>;

#[juniper::graphql_subscription(Context = Database)]
impl Subscription {
async fn hello_world() -> StringStream {
let stream = tokio::stream::iter(vec![
Ok(String::from("Hello")),
Ok(String::from("World!"))
]);
Box::pin(stream)
}
}
# fn main () {}
```



### Coordinator

Subscriptions require a bit more resources than regular queries, since they can provide a great vector
for DOS attacks and can bring down a server easily if not handled right. [SubscriptionCoordinator][SubscriptionCoordinator] trait provides the coordination logic.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for DOS attacks and can bring down a server easily if not handled right. [SubscriptionCoordinator][SubscriptionCoordinator] trait provides the coordination logic.
for DOS attacks and can bring down a server easily if not handled right. The [SubscriptionCoordinator][SubscriptionCoordinator] trait provides the coordination logic.

It contains the schema and can keep track of opened connections, handle subscription
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
It contains the schema and can keep track of opened connections, handle subscription
The [SubscriptionCoordinator][SubscriptionCoordinator] contains the schema and can keep track of opened connections, handle subscription

start and maintains a global subscription id. Once connection is established, subscription
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
start and maintains a global subscription id. Once connection is established, subscription
creation, and maintains a global subscription id.
Once a connection is established, the [SubscriptionCoordinator][SubscriptionCoordinator]

coordinator spawns a [SubscriptionConnection][SubscriptionConnection], which handles a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
coordinator spawns a [SubscriptionConnection][SubscriptionConnection], which handles a
spawns a [SubscriptionConnection][SubscriptionConnection], which handles a

single connection, provides resolver logic for a client stream and can provide re-connection
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
single connection, provides resolver logic for a client stream and can provide re-connection
single connection, provides resolver logic for a client stream, and can provide re-connection

and shutdown logic.


The [Coordinator][Coordinator] struct is a simple implementation of the trait [SubscriptionCoordinator][SubscriptionCoordinator]
that is responsible for handling the execution of subscription operation into your schema. The execution of the `subscribe`
operation returns a [Future][Future] with a Item value of a Result<[Connection][Connection], [GraphQLError][GraphQLError]>,
where the connection is the Stream of values returned by the operation and the GraphQLError is the error that occurred in the
resolution of this connection, which means that the subscription failed.

```rust
# use juniper::http::GraphQLRequest;
# use juniper::{DefaultScalarValue, EmptyMutation, FieldError, RootNode, SubscriptionCoordinator};
# use juniper_subscriptions::Coordinator;
# use futures::{Stream, StreamExt};
# use std::pin::Pin;
# use tokio::runtime::Runtime;
# use tokio::task;
#
# #[derive(Clone)]
# pub struct Database;
#
# impl juniper::Context for Database {}
#
# impl Database {
# fn new() -> Self {
# Self {}
# }
# }
#
# pub struct Query;
#
# #[juniper::graphql_object(Context = Database)]
# impl Query {
# fn hello_world() -> &str {
# "Hello World!"
# }
# }
#
# pub struct Subscription;
#
# type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send>>;
#
# #[juniper::graphql_subscription(Context = Database)]
# impl Subscription {
# async fn hello_world() -> StringStream {
# let stream =
# tokio::stream::iter(vec![Ok(String::from("Hello")), Ok(String::from("World!"))]);
# Box::pin(stream)
# }
# }
type Schema = RootNode<'static, Query, EmptyMutation<Database>, Subscription>;

fn schema() -> Schema {
Schema::new(Query {}, EmptyMutation::new(), Subscription {})
}

async fn run_subscription() {
let schema = schema();
let coordinator = Coordinator::new(schema);
let req: GraphQLRequest<DefaultScalarValue> = serde_json::from_str(
r#"
{
"query": "subscription { helloWorld }"
}
"#,
)
.unwrap();
let ctx = Database::new();
let mut conn = coordinator.subscribe(&req, &ctx).await.unwrap();
while let Some(result) = conn.next().await {
println!("{}", serde_json::to_string(&result).unwrap());
}
}

# fn main() { }
```

### Web Integration and Examples

Currently there is an example of subscriptions with [warp][warp], but it still in an alpha state.
GraphQL over [WS][WS] is not fully supported yet and is non-standard.

- [Warp Subscription Example](https://github.com/graphql-rust/juniper/tree/master/examples/warp_subscriptions)
- [Small Example](https://github.com/graphql-rust/juniper/tree/master/examples/basic_subscriptions)




[juniper_subscriptions]: https://github.com/graphql-rust/juniper/tree/master/juniper_subscriptions
[Stream]: https://docs.rs/futures/0.3.4/futures/stream/trait.Stream.html
<!-- TODO: Fix these links when the documentation for the `juniper_subscriptions` are defined in the docs. --->
[Coordinator]: https://docs.rs/juniper_subscriptions/0.15.0/struct.Coordinator.html
[SubscriptionCoordinator]: https://docs.rs/juniper_subscriptions/0.15.0/trait.SubscriptionCoordinator.html
[Connection]: https://docs.rs/juniper_subscriptions/0.15.0/struct.Connection.html
[SubscriptionConnection]: https://docs.rs/juniper_subscriptions/0.15.0/trait.SubscriptionConnection.html
<!--- --->
[Future]: https://docs.rs/futures/0.3.4/futures/future/trait.Future.html
[warp]: https://github.com/graphql-rust/juniper/tree/master/juniper_warp
[WS]: https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
[GraphQLError]: https://docs.rs/juniper/0.14.2/juniper/enum.GraphQLError.html
[Schema]: ../schema/schemas_and_mutations.md
16 changes: 11 additions & 5 deletions docs/book/content/schema/schemas_and_mutations.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# Schemas

A schema consists of two types: a query object and a mutation object (Juniper
does not support subscriptions yet). These two define the root query fields
and mutations of the schema, respectively.
A schema consists of three types: a query object, a mutation object, and a subscription object.
These three define the root query fields, mutations and subscriptions of the schema, respectively.

The usage of subscriptions is a little different from the mutation and query objects, so there is a specific [section][section] that discusses them.

Both query and mutation objects are regular GraphQL objects, defined like any
other object in Juniper. The mutation object, however, is optional since schemas
can be read-only.
other object in Juniper. The mutation and subscription object, however, is optional since schemas
can be read-only and without subscriptions as well. If mutations/subscriptions functionality is not needed, consider using [EmptyMutation][EmptyMutation]/[EmptySubscription][EmptySubscription].

In Juniper, the `RootNode` type represents a schema. You usually don't have to
create this object yourself: see the framework integrations for [Iron](../servers/iron.md)
Expand Down Expand Up @@ -58,3 +59,8 @@ impl Mutations {

# fn main() { }
```

[section]: ../advanced/subscriptions.md
[EmptyMutation]: https://docs.rs/juniper/0.14.2/juniper/struct.EmptyMutation.html
<!--TODO: Fix This URL when the EmptySubscription become available in the Documentation -->
[EmptySubscription]: https://docs.rs/juniper/0.14.2/juniper/struct.EmptySubscription.html
5 changes: 3 additions & 2 deletions docs/book/tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ build = "build.rs"
[dependencies]
juniper = { path = "../../../juniper" }
juniper_iron = { path = "../../../juniper_iron" }
futures = "0.3.1"

juniper_subscriptions = { path = "../../../juniper_subscriptions" }
futures = "0.3"
tokio = { version = "0.2", features = ["rt-core", "blocking", "stream", "rt-util"] }
iron = "0.5.0"
mount = "0.4.0"

Expand Down
1 change: 1 addition & 0 deletions examples/basic_subscriptions/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target
16 changes: 16 additions & 0 deletions examples/basic_subscriptions/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "basic_subscriptions"
version = "0.1.0"
edition = "2018"
authors = ["Jordao Rosario <jordao.rosario01@gmail.com>"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "0.2", features = ["rt-core", "macros", "stream"] }

juniper = { git = "https://github.com/graphql-rust/juniper" }
juniper_subscriptions = { git = "https://github.com/graphql-rust/juniper" }
65 changes: 65 additions & 0 deletions examples/basic_subscriptions/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#![deny(warnings)]

use futures::{Stream, StreamExt};
use juniper::http::GraphQLRequest;
use juniper::{DefaultScalarValue, EmptyMutation, FieldError, RootNode, SubscriptionCoordinator};
use juniper_subscriptions::Coordinator;
use std::pin::Pin;

#[derive(Clone)]
pub struct Database;

impl juniper::Context for Database {}

impl Database {
fn new() -> Self {
Self {}
}
}

pub struct Query;

#[juniper::graphql_object(Context = Database)]
impl Query {
fn hello_world() -> &str {
"Hello World!"
}
}

pub struct Subscription;

type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send>>;

#[juniper::graphql_subscription(Context = Database)]
impl Subscription {
async fn hello_world() -> StringStream {
let stream =
tokio::stream::iter(vec![Ok(String::from("Hello")), Ok(String::from("World!"))]);
Box::pin(stream)
}
}

type Schema = RootNode<'static, Query, EmptyMutation<Database>, Subscription>;

fn schema() -> Schema {
Schema::new(Query {}, EmptyMutation::new(), Subscription {})
}

#[tokio::main]
async fn main() {
let schema = schema();
let coordinator = Coordinator::new(schema);
let req: GraphQLRequest<DefaultScalarValue> = serde_json::from_str(
r#"
{
"query": "subscription { helloWorld }"
}
"#,
)
.unwrap();
let ctx = Database::new();
let mut conn = coordinator.subscribe(&req, &ctx).await.unwrap();
while let Some(result) = conn.next().await {
println!("{}", serde_json::to_string(&result).unwrap());
}
}