Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Modern] Subscriptions and live queries docs and examples #1655

Closed
ivosabev opened this issue Apr 20, 2017 · 42 comments
Closed

[Modern] Subscriptions and live queries docs and examples #1655

ivosabev opened this issue Apr 20, 2017 · 42 comments

Comments

@ivosabev
Copy link

The docs claim that Relay supports subscriptions and live queries, but there are no examples provided or documentation.

GraphQL Subscriptions & Live Queries
Relay Modern supports GraphQL Subscriptions, using the imperative update API to allow modifications to the store whenever a payload is received. It also features experimental support for GraphQL Live Queries via polling.

https://facebook.github.io/relay/docs/new-in-relay-modern.html#graphql-subscriptions-live-queries

@irrigator
Copy link
Contributor

The todo-modern example helps me a lot when I am learning the modern relay API. I think it's a perfect candidate for showing GraphQL subscription capabilities.

@JenniferWang
Copy link

Subscription and live query require a different server side support from normal GraphQL query/mutation. Relay supports the semantics/API but it relies you to give the actual server side support.

subscription TestSubscription($input: FeedbackLikeInput) {
feedbackLikeSubscribe(input: $input) {
feedback {
id
}
}
}

export type GraphQLSubscriptionConfig = {|
subscription: GraphQLTaggedNode,
variables: Variables,
onCompleted?: ?() => void,
onError?: ?(error: Error) => void,
onNext?: ?(response: ?Object) => void,
updater?: ?(store: RecordSourceSelectorProxy) => void,
|};

If your server supports subscription, you could pass the subscription function here

subscribe?: SubscribeFunction,
.

// subscription API looks similar to Mutation
// use ` const requestSubscription = require('RelayModern')`
export type GraphQLSubscriptionConfig = {|
  subscription: GraphQLTaggedNode,
  variables: Variables,
  onCompleted?: ?() => void,
  onError?: ?(error: Error) => void,
  onNext?: ?(response: ?Object) => void,
  updater?: ?(store: RecordSourceSelectorProxy) => void,
|};

For live query, Relay currently supports polling-based 'live' query. You could specify a poll interval in the cache configure

* - `poll`: causes a query to live update by polling at the specified interval

@jamesone
Copy link

jamesone commented May 17, 2017

Is there an example we could find somewhere that implements subscriptions using relay modern? Also note that it's exported as requestSubscription not requestRelaySubscription

@jamesone
Copy link

Any updates on finding an example that uses relay modern subscriptions? Would be super super handy to have an example we could all checkout 😃

@mattkrick
Copy link

https://facebook.github.io/relay/docs/subscriptions.html

@ivosabev
Copy link
Author

The same thing for Live Queries would be appreciated.

@jamesone
Copy link

@JenniferWang Is there any chance you could track down the Environment object displayed here: https://facebook.github.io/relay/docs/subscriptions.html That'd be super awesome :)

@macolsson
Copy link

Question on SO

I'm wondering the same. I'm interested in seeing a working example/snippet of Relay Modern Subscriptions using a websocket.

As seen in the link, I've worked up an example that can accept a subscription through the requestSubscription that operates with my custom NetworkLayer. The NetworkLayer successfully sends a request to the server (implemented in Graphene-python) and is able to generate responses:

// Create a websocket instance.
const subscriptionWebSocket = new ReconnectingWebSocket('ws://example.com/ws/subscriptions/', null, options);

// Define subscribe function to be used in the NetworkLayer.
function subscriptionHandler(subscriptionConfig, variables, cacheConfig, observer) {
  subscriptionWebSocket.send(JSON.stringify(subscriptionConfig.text));
  return { dispose: () => null };
}

// Create a new NetworkLayer.
const network = Network.create(fetchQuery, subscriptionHandler);

// Create subscription query.
const subscription = graphql`
  subscription mainSubscription {
    testData {
      anotherNode {
        data
      }
    }
  }
`;

// Create subscription request with config and above query.
requestSubscription(
  environment,
  {
    subscription,
    variables: {},
    onComplete: () => { /* What goes here? Never called. */ },
    onNext: () => { /* What goes here? Never called. */ },
    onError: () => { /* What goes here? Never called. */ },
  },
);

The issue is that I have no idea how to connect a websocket to receive the servers response, and later parsing the response into the Relay Modern Store.

@JenniferWang
Copy link

@jamesone the Environment is usually the RelayModernEnvironment or you customized environment.

@JenniferWang
Copy link

@macolsson Just like fetch method, it is the subscriptionHandlers responsibility to call onNext, etc. It needs a subscribe-publish mechanism so I don't know if ReconnectingWebSocket could fulfill this role.

@macolsson
Copy link

@JenniferWang Thanks for the pointer. I'll look to revise the websocket logic.

I've been looking at using the OnNext method from the observer, however I'm not able to figure out exactly what I'm supposed to call it with;
I've tried adding a new source (RecordSource); this threw an error which I was unable to decode, is it supposed to use the same source as the NetworkLayer?

Are there are example structures that I could follow? Not sure I can fully recreate based on Flow.

@JenniferWang
Copy link

@macolsson You should definitely look at the Flow type. I don't know if we have example implementation of a subscribe function. But you should look at RelayNetwork#requestStream.

@josephsavona
Copy link
Contributor

iirc the subscription handler should call onNext with the payload received from the server, the object with {data, errors?}

@macolsson
Copy link

@JenniferWang I've followed the environment#SendSubscription function called from the requestSubscription into Network#requestStream. Seems to be the way to go.

@josephsavona Seems like that would be something to try; source I'll give it a go!

@macolsson
Copy link

@josephsavona Your recollection was indeed accurate. I have the below code which gets validated and accepted in the onNext call-chain:

function subscriptionHandler(subscriptionConfig, variables, cacheConfig, observer) {
  observer.onNext({
    data: { testData: { id: 'y', anotherNode: { id: 'z', data: 'x' } } },
    errors: [],
  });
  return {
    dispose: () => null,
  };
}

So that part seems to work great! However, the data does not get updated; am I required to build my own requestSubscription#updater, or am I missing something here?

@josephsavona
Copy link
Contributor

@macolsson Hmm, the first thing I would check is that the data matches up with the subscription itself. For mutations ansnsubscriptions, records with an id should update automatically - an updated is generally only needed for things like connections or records that don't have an id.

@macolsson
Copy link

@josephsavona Of course! I had some silly test data in the response. I've updated the faked server response to match the initial response from the GraphQL server. It seems to work great.

Thank you for all your help!

@jamesone
Copy link

jamesone commented Jun 2, 2017

@macolsson Hey when you've got a working copy of your subscriptions network layer? Would you mind sharing it?

@josephsavona
Copy link
Contributor

Share as a PR to the docs! ;-)

@macolsson
Copy link

@jamesone Will do.

@josephsavona I'll see what we can do!

@macolsson
Copy link

ping @jamesone @josephsavona

I’ll just write down how I’ve approached this issue after the assistance found in this thread. It might be usable for someone else. If anyone has any comments, suggestions or anything on the below code or reasoning, do let me know. I tried adding some comments when writing the code, hope some of it makes sense.

Firstly I built a SubscriptionHandler that will be referenced from the requestStream#subscribeFunction (SubscriptionHandler#setupSubscription).

The SubscriptionHandler instantiates a WebSocket (using a custom version of ReconnectingWebSockets) and attaches the onmessage event to an internal method (SubscriptionHandler#receiveSubscriptionPayload) which will add the payload to the corresponding subscription.

We create new subscriptions through SubscriptionHandler#newSubscription which will use the internal attribute SubscriptionHandler.subscriptions to add a keyed entry of this subscription (we use an MD5-hash util over the query and variables); meaning the object will come out as:

SubscriptionHandler.subscriptions = {
  [md5hash]: {
    query: QueryObject,
    variables: SubscriptionVariables,
    observer: Observer (contains OnNext method)
}

Whenever the server sends a subscription response the SubscriptionHandler#receiveSubscriptionPayload method will be called and it will identify what subscription the payload belongs to by using the query/variables md5 hash, then use the SubscriptionHandler.subscriptions observer onNext method.

This approach requires the server to return a message such as:

export type ServerResponseMessageParsed = {
  payload: QueryPayload,
  request: {
    query: string,
    variables: Object,
  }
}

I do not know if this is a great way of handling subscriptions, but it works for now with my current setup.

This is doable because the method passed into the network subscription function will handle all interactions through the SubscriptionHandler.

Some code:

NetworkLayer

function fetchQuery(operation, variables) {
  return fetch('/graphql', {
    method: 'POST',
    headers: {
      credentials: 'same-origin',
      'content-type': 'application/json',
    },
    body: JSON.stringify({
      query: operation.text,
      variables,
    }),
  }).then((response) => response.json());
}

const network = Network.create(
  fetchQuery,
  subscriptionHandler.setupSubscription,
);
const source = new RecordSource();
const store = new Store(source);
const environment = new Environment({ network, store });

// Attach the newly created environment to the subscription handler.
subscriptionHandler.attachEnvironment(environment);

SubscriptionHandler.js

class SubscriptionHandler {
  subscriptions: Object;
  subscriptionEnvironment: RelayModernEnvironment;
  websocket: Object;

  /**
   * The SubscriptionHandler constructor. Will setup a new websocket and bind
   * it to internal method to handle receving messages from the ws server.
   *
   * @param  {string} websocketUrl      - The WebSocket URL to listen to.
   * @param  {Object} webSocketSettings - The options object.
   *                                      See ReconnectingWebSocket.
   */
  constructor(websocketUrl: string, webSocketSettings: WebSocketSettings) {
    // All subscription hashes and objects will be stored in the
    // this.subscriptions attribute on the subscription handler.
    this.subscriptions = {};

    // Store the given environment internally to be reused when registering new
    // subscriptions. This is required as per the requestRelaySubscription spec
    // (method requestSubscription).
    this.subscriptionEnvironment = null;

    // Create a new WebSocket instance to be able to receive messages on the
    // given URL. Always opt for default protocol for the RWS, second arg.
    this.websocket = new ReconnectingWebSocket(
      websocketUrl,
      null,  // Protocol.
      webSocketSettings,
    );

    // Bind an internal method to handle incoming messages from the websocket.
    this.websocket.onmessage = this.receiveSubscriptionPayload;
  }

  /**
   * Method to attach the Relay Environment to the subscription handler.
   * This is required as the Network needs to be instantiated with the
   * SubscriptionHandler's methods, and the Environment needs the Network Layer.
   *
   * @param  {Object} environment - The apps environment.
   */
  attachEnvironment = (environment: RelayModernEnvironment) => {
    this.subscriptionEnvironment = environment;
  }

  /**
   * Generates a hash from a given query and variable pair. The method
   * used is a recreatable MD5 hash, which is used as a 'key' for the given
   * subscription. Using the MD5 hash we can identify what subscription is valid
   * based on the query/variable given from the server.
   *
   * @param  {string} query     - A string representation of the subscription.
   * @param  {Object} variables - An object containing all variables used
   *                              in the query.
   * @return {string}             The MD5 hash of the query and variables.
   */
  getHash = (query: string, variables: HashVariables) => {
    const queryString = query.replace(/\s+/gm, '');
    const variablesString = JSON.stringify(variables);
    const hash = md5(queryString + variablesString).toString();
    return hash;
  }

  /**
   * Method to be bound to the class websocket instance. The method will be
   * called each time the WebSocket receives a message on the subscribed URL
   * (see this.websocket options).
   *
   * @param  {string} message - The message received from the websocket.
   */
  receiveSubscriptionPayload = (message: ServerResponseMessage) => {
    const response: ServerResponseMessageParsed = JSON.parse(message.data);
    const { query, variables } = response.request;
    const hash = this.getHash(query, variables);

    // Fetch the subscription instance from the subscription handlers stored
    // subscriptions.
    const subscription = this.subscriptions[hash];

    if (subscription) {
      // Execute the onNext method with the received payload after validating
      // that the received hash is currently stored. If a diff occurs, meaning
      // no hash is stored for the received response, ignore the execution.
      subscription.observer.onNext(response.payload);
    } else {
      console.warn(`Received payload for unregistered hash: ${hash}`);
    }
  }

  /**
   * Method to generate new subscriptions that will be bound to the
   * SubscriptionHandler's environment and will be stored internally in the
   * instantiated handler object.
   *
   * @param {string} subscriptionQuery - The query to subscribe to. Needs to
   *                                     be a validated subscription type.
   * @param {Object} variables         - The variables for the passed query.
   * @param {Object} configs           - A subscription configuration. If
   *                                     override is required.
   */
  newSubscription = (
      subscriptionQuery: GraphQLTaggedNode,
      variables: Variables,
      configs: GraphQLSubscriptionConfig,
  ) => {
    const config = configs || DEFAULT_CONFIG;

    requestSubscription(
      this.subscriptionEnvironment,
      {
        subscription: subscriptionQuery,
        variables: {},
        ...config,
      },
    );
  }

  setupSubscription = (
    config: ConcreteBatch,
    variables: Variables,
    cacheConfig: ?CacheConfig,
    observer: Observer,
  ) => {
    const query = config.text;

    // Get the hash from the given subscriptionQuery and variables. Used to
    // identify this specific subscription.
    const hash = this.getHash(query, variables);

    // Store the newly created subscription request internally to be re-used
    // upon message receival or local data updates.
    this.subscriptions[hash] = { query, variables };

    const subscription = this.subscriptions[hash];
    subscription.observer = observer;

    // Temp fix to avoid WS Connection state.
    setTimeout(() => {
      this.websocket.send(JSON.stringify({ query, variables }));
    }, 100);
  }
}

const subscriptionHandler = new SubscriptionHandler(WS_URL, WS_OPTIONS);

export default subscriptionHandler;

@josephsavona
Copy link
Contributor

josephsavona commented Jun 5, 2017

Cool! Thanks for sharing, that high-level approach makes sense.

The main thing that stands out is that the server shouldn't have to send the query/variables back in every response. Instead, you can send a unique identifier for the query as part of the initial message to the server, then have the server just send back GraphQL payload + that identifier. A simple incrementing integer would work for that. It would also avoid the overhead of getHash().

@jamesone
Copy link

jamesone commented Jun 5, 2017

Awesome, I'll have a play around with it soon! @macolsson Just out of curiosity, what are you using subscriptions for?

@macolsson
Copy link

@josephsavona We're currently using the hash to store the query/variables in the backend, so that we can at any time run a query and return the results of this query to every single client that is listening to the query/variable pairs. The has is currently used to key the cache-store in the back-end as well, as was initially thought as an extra layer of validation; send q/v and hash, compare, receive q/v and hash, compare. But it might be redundant. We're still looking into different approaches.

@jamesone We'll be using it to update multiple clients on data mutation (not just the client who performs the mutation) and we're using it to stream real-time data from hardware to mobile app to backend.

@sibelius
Copy link
Contributor

@bochen2014 are you gonna open source your work any time soon?

@nikolasburk
Copy link
Contributor

nikolasburk commented Jun 22, 2017

I just managed to make subscriptions work with Relay Modern as well. Note that I'm not using WebSocket but the SubscriptionClient that can be found in subscriptions-transport-ws to manage the connection to the server.

Here's my minimal setup code:

Environment.js

import { SubscriptionClient } from 'subscriptions-transport-ws'
const {
  Environment,
  Network,
  RecordSource,
  Store,
} = require('relay-runtime')
const store = new Store(new RecordSource())


const fetchQuery = (operation, variables) => {
  return fetch('https://api.graph.cool/relay/v1/__GRAPHCOOL_PROJECT_ID__', {
    method: 'POST',
    headers: {
      'Accept': 'application/json',
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      query: operation.text,
      variables,
    }),
  }).then(response => {
    return response.json()
  })
}

const websocketURL = 'wss://subscriptions.graph.cool/v1/__GRAPHCOOL_PROJECT_ID__'

function setupSubscription(
  config,
  variables,
  cacheConfig,
  observer,
) {
  const query = config.text

  const subscriptionClient = new SubscriptionClient(websocketURL, {reconnect: true})
  const id = subscriptionClient.subscribe({query, variables}, (error, result) => {
    observer.onNext({data: result})
  })
}

const network = Network.create(fetchQuery, setupSubscription)
const environment = new Environment({
  network,
  store,
})

export default environment

NewLinkSubscription.js

import {
  graphql,
  requestSubscription
} from 'react-relay'
import environment from '../Environment'

const newLinkSubscription = graphql`
  subscription NewLinkSubscription {
    Link {
      mutation
      node {
        id
        description
        url
        createdAt
        postedBy {
          id
          name
        }
      }
    }
  }
`

export default (onNext, onError, onCompleted, updater) => {

  const subscriptionConfig = {
    subscription: newLinkSubscription,
    variables: {},
    onError,
    onNext,
    onCompleted,
    updater
  }

  requestSubscription(
    environment,
    subscriptionConfig
  )

}

Now you can simply use the exported function to subscribe. For example, in one of my React components in componentDidMount I can now do the following:

componentDidMount() {
  NewLinkSubscription(
    response => console.log(`Received data: `, response),
    error => console.log(`An error occurred:`, error),
    () => console.log(`Completed`)
  )
}

Note that the SubscriptionClient can only be used if your server implements this protocol!

@hkjorgensen
Copy link

@nikolasburk thanks, this is great!
How does your local schema.graphql file look like? I can see that you use graph.cool and the Relay endpoint schema does not include any subscriptions definitions. The wss one does come with subscriptions definitions, but it's not Relay compatible.

@josephsavona
Copy link
Contributor

@nikolasburk Care to submit a PR to document that approach?

@nikolasburk
Copy link
Contributor

@hkjorgensen Great point about the schema file! You're right that our Relay API currently doesn't support subscriptions. What I did to satisfy the Relay Compiler was actually manually copying over the Subscription type from the Simple API schema to the Relay API schema. This is still not ideal since the types of the two different APIs don't match up so you have to do some more manual work inside the app to convert the types you get from the subscription (which are from the Simple API) to the types you're using in your app (the ones from the Relay API). We're actually going to add the Subscriptions chapter for How to GraphQL very soon where this approach will be explained in more detail.

@josephsavona Will do that soon!

@idris
Copy link

idris commented Aug 15, 2017

Perhaps this should be split into two tickets: one for live queries, one for subscriptions.

I have a live query question though:
The poll attribute of cacheConfig is only used in streamQuery, but not in sendQuery. How do we configure Relay Modern to use streamQuery instead of sendQuery?

@josephsavona
Copy link
Contributor

@idris Relay Modern uses streamQuery by default (e.g. for QueryRenderer, PaginationContainer, etc).

@Tocknicsu
Copy link

@nikolasburk
Do you have a good idea unsubscribe?
I have multiple component subscribe different things. But I want to unsubscribe when they are unmounted. I have no idea return the id to the component.

@leebyron
Copy link
Contributor

Check out https://facebook.github.io/relay/docs/subscriptions.html which explains requestSubscription. That function returns a Disposable which can dispose() to unsubscribe.

Also as an update, all networks now accept Observable as a result, which means any network can provide a "live query" style behavior.

I'm closing this issue since it's quite old, but if anyone would like to contribute to the documentation, we're always interested in reviewing those PRs.

@swyxio
Copy link

swyxio commented Oct 12, 2017

the guys at graphcool also have this tutorial up: https://www.howtographql.com/react-apollo/8-subscriptions/ just sharing for future people who find this

@jalalat
Copy link

jalalat commented Nov 18, 2017

@nikolasburk subscriptionClient.subscribe is no longer a function: apollographql/subscriptions-transport-ws#261

Do you know how to fix that?

@nikolasburk
Copy link
Contributor

I havent' tried it myself but I believe it should be possible to replace the SubscriptionClient with a WebSocketLink to get it to work. @jalalat

@slaviczavik
Copy link

slaviczavik commented Nov 21, 2017

According to docs we can use the SubscriptionClient like this.

const subscriptionClient = new SubscriptionClient(websocketUrl, {reconnect: true})

const onNext = (result) => {
  observer.onNext(result)
}

const onError = (error) => {
  observer.onError(error)
}

const onComplete = () => {
  observer.onCompleted()
}

const client = subscriptionClient
  .request({ query, variables })
  .subscribe(onNext, onError, onComplete)

@perrosnk
Copy link

perrosnk commented Dec 7, 2017

@slaviczavik Where are you pulling the observer from?

@slaviczavik
Copy link

@perrosnk It was response on @nikolasburk code.

This is full code:

import { Environment, Network, RecordSource, Store } from 'relay-runtime'
import { SubscriptionClient } from 'subscriptions-transport-ws'

const endpoint = 'http://your-endpoint:3000/graphql'
const websocketUrl = 'ws://your-endpoint:3000/subscriptions'

const setupSubscription = (config, variables, cacheConfig, observer) => {
  const query = config.text

  const subscriptionClient = new SubscriptionClient(websocketUrl, {reconnect: true})

  const onNext = (result) => {
    observer.onNext(result)
  }

  const onError = (error) => {
    observer.onError(error)
  }

  const onComplete = () => {
    observer.onCompleted()
  }

  const client = subscriptionClient.request({ query, variables }).subscribe(
    onNext,
    onError,
    onComplete
  )

  // client.unsubscribe()
}

const fetchQuery = (operation, variables, cacheConfig, uploadebles) => {
  return fetch(endpoint, {
    method: 'POST',
    credentials: 'include',
    headers: {
      'content-type': 'application/json'
    },
    body: JSON.stringify({
      query: operation.text,
      variables
    })
  }).then(response => {
    return response.json()
  })
}

const network = Network.create(
  fetchQuery,
  setupSubscription
)
const source = new RecordSource()
const store = new Store(source)

const environment = new Environment({
  network,
  store
})

export default environment

@jeremy-colin
Copy link

jeremy-colin commented Jan 23, 2018

SubscriptionClient has been removed from the latest versions 'subscriptions-transport-ws'.
If anyone is also stumbling accross this issue, I have a working example from a fork of relay-examples using graphql-subscriptions and apollo-link here: https://github.com/jeremy-colin/relay-examples-subscription
It is still imperfect but it shows how to glue everything together

@taion
Copy link
Contributor

taion commented Jan 23, 2018

I’m pretty sure it’s still there.

@jeremy-colin
Copy link

Yes excuse my shortcut, SubscriptionManager support has been removed which broke the previous implementations proposed in this thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests