Skip to content

Latest commit

 

History

History
274 lines (214 loc) · 8.93 KB

File metadata and controls

274 lines (214 loc) · 8.93 KB

Subscriptions with OpenAPI-to-GraphQL

Since version 2.1.0, OpenAPI-to-GraphQL supports GraphQL subscription operations. In GraphQL, using a subscription query, clients subscribe to updates on the data defined in the query. In this scenario, when data changes, the server publishes these changes to all clients that have active subscriptions for that data.

The OpenAPI specification can define similar behavior using callbacks: a callback defines a request that the server may initiate in response to receiving another request. Callbacks can thus be used to model publish/subscribe behavior. I.e., when the server receives a request to update some data, it can then itself issue callback requests (outside of the first request/response cycle) to any number of subscribed clients to inform about the new data.

When the createSubscriptionsFromCallbacks option is enabled, OpenAPI-to-GraphQL creates subscription fields from an operation's callback objects. In such cases, OpenAPI-to-GraphQL creates a subscribe function responsible to subscribing clients to receive results of callbacks being executed, and a special form of a resolve function, which pushes data updates to subscribed clients.

To create these two functions, OpenAPI-to-GraphQL relies on the popular graphql-subscriptions package, which provides a unified API to support different network transports (like WebSockets, MQTT, Redis etc. - see this list of supported transports).

A typical example of using OpenAPI-to-GraphQL to create a GraphQL server supporting subscriptions may look like this:

Creating PubSub instance

First, initialize a PubSub instance to spread events between your API and the GraphQL Server, in a pubsub.js file.

import { EventEmitter2 } from 'eventEmitter2';
import { PubSub } = from 'graphql-subscriptions'

const eventEmitter = new EventEmitter2({
  wildcard: true,
  delimiter: '/'
});

// Create the PubSub instance (here by wrapping an EventEmitter client)
const pubsub = new PubSub()

export default pubsub

PubSub could also wrap an MQTT client connected to a broker, like in this example API.

import { connect } = from 'mqtt'
import { MQTTPubSub } = from 'graphql-mqtt-subscriptions'

const MQTT_PORT = 1883

// Create a PubSub instance (here by wrapping a MQTT client)
const client = connect(`mqtt://localhost:${MQTT_PORT}`)

const pubsub = new MQTTPubSub({
  client
})

export default pubsub

GraphQL server

Create GraphQL schema, resolvers and endpoints.

import { createGraphQLSchema } from 'openapi-to-graphql'
import express from 'express'
import { graphqlExpress } from 'apollo-server-express'
import { execute, printSchema, subscribe } from 'graphql'
import { SubscriptionServer } from 'subscriptions-transport-ws'
import { createServer } from 'http'
import { pubsub } from './pubsub'

const HTTP_PORT = 3000

const init = async () => {
  // Let OpenAPI-to-GraphQL create the schema
  const schema = await createGraphQLSchema(oasWithCallbackObjects, {
    createSubscriptionsFromCallbacks: true
  })

  // Log GraphQL schema...
  const myGraphQLSchema = printSchema(schema)
  console.log(myGraphQLSchema)

  // Set up GraphQL server using Express.js
  const app = express()
  app.use('/graphql', graphqlExpress({ schema }))

  // Wrap the Express server...
  const wsServer = createServer(app)

  // ...and set up the WebSocket for handling GraphQL subscriptions
  wsServer.listen(HTTP_PORT, () => {
    new SubscriptionServer(
      {
        execute,
        subscribe,
        schema,
        onConnect: (params, socket, ctx) => {
          // Add pubsub to context to be used by GraphQL subscribe field
          return { pubsub }
        }
      },
      {
        server: wsServer,
        path: '/subscriptions'
      }
    )
  })
}

init()

API server

A simple example could be the following, when an HTTP client tries to create a device (via post('/api/devices') route) an event is published by the PubSub instance. If a callback like #/components/callbacks/DevicesEvent is declared in your OpenAPI schema and used in path /devices for the post Operation, a subscription field will be generated by OpenAPI-to-GraphQL.

import express from 'express'
import bodyParser from 'body-parser'
import pubsub from './pubsub'

const HTTP_PORT = 4000

const Devices = {
  'Audio-player': {
    name: 'Audio-player',
    userName: 'johnny'
  },
  Drone: {
    name: 'Drone',
    userName: 'eric'
  }
}

const startServer = () => {
  const app = express()

  app.use(bodyParser.json())

  const httpServer = app.listen(HTTP_PORT, () => {
    app.get('/api/devices', (req, res) => {
      res.status(200).send(Object.values(Devices))
    })

    app.post('/api/devices', (req, res) => {
      if (req.body.userName && req.body.name) {
        const device = req.body
        Devices[device.name] = device
        const packet = {
          topic: `/api/${device.userName}/devices/${req.method.toUpperCase()}/${
            device.name
          }`,
          payload: Buffer.from(JSON.stringify(device))
        }

        // Use pubsub to publish the event
        pubsub.publish(packet)

        res.status(200).send(device)
      } else {
        res.status(404).send({
          message: 'Wrong device schema'
        })
      }
    })

    app.get('/api/devices/:deviceName', (req, res) => {
      if (req.params.deviceName in Devices) {
        res.status(200).send(Devices[req.params.deviceName])
      } else {
        res.status(404).send({
          message: 'Wrong device ID.'
        })
      }
    })

  })
}

startServer()

GrapQL client

If any GraphQL (WS) client subscribed to the route defined by the callback (#/components/callbacks/DevicesEvent), it will get the content transfered by PubSub.

import axios from 'axios'
import { SubscriptionClient } from 'subscriptions-transport-ws'
import pubsub from './pubsub'

const GRAPHQL_HTTP_PORT = 3000
const REST_HTTP_PORT = 4000

const device = {
  userName: 'Carlos',
  name: 'Bot'
}

const startClient = () => {
  // Generate subscription via GraphQL WS API...
  const client = new SubscriptionClient(
    `ws://localhost:${GRAPHQL_HTTP_PORT}/subscriptions`
  )

  client.request({
    query: `subscription watchDevice($topicInput: TopicInput!) {
      devicesEventListener(topicInput: $topicInput) {
        name
        userName
        status
      }
    }`,
    operationName: 'watchDevice',
    variables: {
      topicInput: {
        method: 'POST',
        userName: `${device.userName}`
      }
    }
  })
  .subscribe({
    next: {data} => {
      console.log('Device created', data)
    },
  })
  
  // ...or directly via PubSub instance like OpenAPI-to-GraphQL would do
  pubsub.subscribe(`/api/${device.userName}/devices/POST/*`, (...args) => {
    console.log('Device created', args)
  })

  
  // Trigger device creation via GraphQL HTTP API...
  axios({
    url: `http://localhost:${GRAPHQL_HTTP_PORT}/graphql`,
    method: 'POST',
    json: true,
    data: {
      query: `mutation($deviceInput: DeviceInput!) {
        createDevice(deviceInput: $deviceInput) {
          name
          userName
        }
      }`,
      variables: device,
    },
  })

  // ...or via REST API like OpenAPI-to-GraphQL would do
  axios({
    url: `http://localhost:${REST_HTTP_PORT}/api/devices`,
    method: 'POST',
    json: true,
    data: device,
  })
}

startClient()

In this example, we rely on the subscriptions-transport-ws package to create a SubscriptionServer that manages WebSockets connections between the GraphQL clients and our server. We also rely on the graphqlExpress server provided by the apollo-server-express package to serve GraphQL from Express.js.

Concerning callbacks, as you can see in the example, the path (runtime expression) /api/{$request.body#/userName}/devices/{$request.body#/method}/+ is delimited by / and ends with +, these symbols are interpreted as delimiters and wildcard when using MQTT topics. It needs to be adapted accordingly to the client wrapped in your PubSub instance, for eventEmitter2 you can use * and define your own delimiter. A helper might be provided in the future, to simplify this process.

Examples

You can also run the example provided in this project.

Start REST API server (HTTP and MQTT) :

npm run api_sub

Start GRAPHQL server (HTTP and WS) :

npm run start_dev_sub