Skip to content
This repository was archived by the owner on May 31, 2019. It is now read-only.

feat: initial subscriptions implementation#32

Merged
darahayes merged 33 commits into
masterfrom
subscriptions
Jul 26, 2018
Merged

feat: initial subscriptions implementation#32
darahayes merged 33 commits into
masterfrom
subscriptions

Conversation

@darahayes
Copy link
Copy Markdown

@darahayes darahayes commented Jul 23, 2018

To do

  • Tests
  • Cleanup???

This is runnable right now though. You can try it out as follows:

  • npm run db:init:memeo
  • npm run dev
  • Open http://localhost:8000/graphiql in two different tabs
  • Leave one tab as it is with the sample queries
  • In the other tab remove the sample queries and paste the following:
subscription memeAdded{
  memeAdded{
    id
    photoUrl
  }
}
  • Run the subscription query and you should see the following message: "Your subscription data will appear here after server publication!"
  • Run the createMeme query in the other window.

How does the subscription piece work?

On the server side we define a new memeAdded subscription.

type Subscription {
    memeAdded(photoUrl: String):Meme!
}

Clients will be able to use this query to listen for updates. The photoUrl variable is optional (more on that later).

The next thing we need to be able to do is define a way to publish an event during some query or mutation. GraphQL uses a Pub/Sub model to do this. So when a query/mutation occurs, we can publish some events+data to a Pub/Sub mechanism and we can react to these events elsewhere. Below shows how a user could configure a createMeme resolver to also publish an event.

{
    type: 'Mutation',
    field: 'createMeme',
    requestMapping: `{
      "operation": "insert",
      "doc": {
        "_type":"meme",
        "photoUrl": "{{context.arguments.photoUrl}}"
      }
    }`,
    responseMapping: '{{ toJSON (convertNeDBIds context.result) }}',
    publish: JSON.stringify({
      topic: 'memeCreated',
      payload: `{
        "memeAdded": {{ toJSON context.result }},
      }`
    }),
  }

The last piece of the puzzle is to be able to listen for those events and push them back down to a client. We can do this with the new Subscription config object. Take a look at the example one for the memeolist (included in this PR)

{
    type: 'Subscription',
    field: 'memeAdded',
    topic: 'memeCreated',
}

This config tells our server to listen for messages published to the 'memeCreated' topic on the internal pubsub mechanism and to fire the memeAdded resolver as a result.

Trying out Filtering

Filtering is an important aspect of Grapqhl subscriptions. In most real world apps, it doesn't make sense for a client to be listening for notifications about every single record that was added to a collection. It would be like getting a notification in slack every time anybody in the world sent a message.

Taking the Slack example further, on the client side we might subscribe to a messageAdded query as follows:

subscription messageAdded{
  messageAdded(channelId: '#random'){
    id
	channelId
    text
	userId
  }
}

The idea is we only want to listen for notifications in a particular slack channel - #random.

How would this look on the server side?

How can we enable an end user of our service to define some kind of filtering that's relatively good?

I have come up with small DSL that allows the user to define some conditions in a declarative way using JSON. We add this filter to the subscription config object.

Here's when it might look like in the Slack example

{
    type: 'Subscription',
    field: 'messageAdded',
    topic: 'messageCreated',
	filter: JSON.stringify({
      'eq': ['$payload.messageAdded.channelId', '$variables.channelId']
    })
  }

This filter would check that the channelId in the newly created message is equal to the channelId that the client passed as a variable when they subscribed. i.e. the user can say give me real time updates about messages for this particular channel.

The filter expression has access to a context object that looks like this: { payload: {...}, variables: {}} where payload is the message received from the pubsub layer and variables contains whatever variables were passed by the client to initialize the subscription. Using the dollar syntax like $payload.messageAdded.id we can reference the context object.

I figured that 99% of the time, the basic use case will like the one above where we check that two things are equal. I made a more concise syntax for this particular case. The same example above can be expressed as follows:

{
    type: 'Subscription',
    field: 'messageAdded',
    topic: 'messageCreated',
	filter: JSON.stringify({
      '$payload.messageAdded.channelId': '$variables.channelId'
    })
  }

This is the most basic usage of the filter language. You can do many more things. Try some of these out with the memeolist schema:

(you can modify the subscription object in the file located under seeders, then run the db init and start commands again)

// subscribe only to new memes where payload.photoURL === google.com
{
    type: 'Subscription',
    field: 'memeAdded',
    GraphQLSchemaId: 2,
    topic: 'memeCreated',
    createdAt: time,
    updatedAt: time,
	filter: JSON.stringify({
      'eq': ['$payload.memeAdded.photoUrl', 'https://google.com']
    })
  }
// subscribe only to new memes where payload.photoURL !== google.com
{
    type: 'Subscription',
    field: 'memeAdded',
    GraphQLSchemaId: 2,
    topic: 'memeCreated',
    createdAt: time,
    updatedAt: time,
	filter: JSON.stringify({
      '!eq': ['$payload.memeAdded.photoUrl', 'https://google.com']
    })
  }
// subscribe only to new memes where payload.photoURL matches a regex
{
    type: 'Subscription',
    field: 'memeAdded',
    GraphQLSchemaId: 2,
    topic: 'memeCreated',
    createdAt: time,
    updatedAt: time,
	filter: JSON.stringify({
      'match': ['$payload.memeAdded.photoUrl', 'https:\/\/.*']
    })
  }
// subscribe only to new memes where payload.photoUrl === https://google.com || payload.photoUrl == $variables.photoUrl
{
    type: 'Subscription',
    field: 'memeAdded',
    GraphQLSchemaId: 2,
    topic: 'memeCreated',
    createdAt: time,
    updatedAt: time,
	filter: JSON.stringify({
      'or': [
        { 'eq': ['$payload.memeAdded.photoUrl', 'https://google.com'] },
        { 'eq': ['$payload.memeAdded.photoUrl', '$variables.photoUrl']}
       ]
    })
  }
// subscribe to new memes where payload.photoUrl === variables.photoUrl || (payload.photoUrl === https://facebook.com && payload.photoUrl === Dara)
{
    type: 'Subscription',
    field: 'memeAdded',
    GraphQLSchemaId: 2,
    topic: 'memeCreated',
    createdAt: time,
    updatedAt: time,
	filter: JSON.stringify({
      'or': [
        { 'eq': ['$payload.memeAdded.photoUrl', '$variables.photoUrl']},
        { 'and': [
                  { 'eq': ['$payload.memeAdded.photoUrl', 'https://facebook.com']},
                  { 'eq': ['$payload.memeAdded.owner', 'Dara'] }
                ]
       ]
    })
  }

Why Did I do it this way?

  • JSON format is super portable, for example - instead of defining the filter expression in the server config, a client app could define whatever filter they want and use it to listen for updates, and it would be completely safe because there's no way for them to inject code. This has the potential for some very useful applications IMO.
  • I considered using handlebars to do something similar but it's very hacky because it doesn't actually evaluate things to true or false. It would also mean users would have to write a horrible syntax. In my opinion, this syntax is pretty intuitive.
  • Inspired by MongoDB query language which I thought was very intuitive
  • Flexibility, the code that evaluates these expressions could be extended in the future very easily.

@darahayes darahayes requested a review from aliok as a code owner July 23, 2018 22:32
@@ -0,0 +1,91 @@
const _ = require('lodash')

module.exports = function ExpressionParser () {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually really generic and could be published as its own npm module with docs that demonstrate usage.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be a better idea to extract this.

BTW, have you searched for a lib that does the same thing?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to search for something like this but I couldn't find anything. What would you even call this? I don't even know how to describe it in 2-3 words so I found it very hard to search for.

I think it would be great to extract. We should be constantly looking for opportunities to extract and open source internal libs like this. Where should the repo go? under github.com/aerogear?

@darahayes darahayes requested a review from wtrocki July 23, 2018 23:56

type Subscription {
_: Boolean
memeAdded(photoUrl: String):Meme!
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Outside scope of this Pr] I think it will be more benefitial to add more fields to the meme like votes as this will help us to test various use cases.

Comment thread server/server.js Outdated

// Set up the WebSocket for handling GraphQL subscriptions
/* eslint-disable no-new */
new SubscriptionServer({
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this won't pick up new schema with hot reload

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 This completely slipped my mind

Comment thread server/server.js
let dataSourcesJson = dataSources.map((dataSource) => {
return dataSource.toJSON()
})
let dataSourcesJson = await models.DataSource.findAll({raw: true})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

Comment thread server/lib/schemaParser.js Outdated
const PubSub = require('./pubsubNotifiers').InMemory

module.exports = function (schemaString, dataSourcesJson, resolverMappingsJson, subscriptionMappingsJson) {
const pubsub = PubSub()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the plan to abstract this away?

Comment thread server/lib/util/filterEvaluator.js Outdated
if (typeof str !== 'string' || str.length <= 1) {
return str
}
var c1 = str[0]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see some vars in the file

Comment thread server/lib/resolvers/resolverMapper.js Outdated
return new Promise(async (resolve, reject) => {
try {
const result = await resolverFn(obj, args, context, info)
resolve(result)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on resolving and then publishing

Comment thread server/lib/resolvers/resolverMapper.js Outdated
let payload = compiledPayload(compileOpts)

// The InMemory pubsub implementation wants an object
// Whereas the postgres one would expect a string
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you noted, we need abstraction for this. let's not forget about that.

@@ -0,0 +1,91 @@
const _ = require('lodash')

module.exports = function ExpressionParser () {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be a better idea to extract this.

BTW, have you searched for a lib that does the same thing?

payload: `{
"memeAdded": {{ toJSON context.result }},
}`
}),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also add sample into memeolist.query.graphql file so that we don't have to type anything when we open Graphiql?

@aliok
Copy link
Copy Markdown
Contributor

aliok commented Jul 24, 2018

@darahayes really great work! and, thanks for the long PR description which explains everything!
left some comments, but won't affect the general structure.

I haven't done any verification yet. I will do it after the requested changes are addressed.

I think we would need some integration tests where we create 2 clients and mutate on one and listen on other one. Feel free to implement it as part of this pr OR we can implement it in a separate ticket/pr.
UPDATE: on a second thought, better to do it in another ticket to unblock merging of this PR. created https://issues.jboss.org/browse/AEROGEAR-7691

Another thing: we need to discuss with David and the team about syncing the sync server instances. I know we talked about it and it seems necessary, but maybe it is not in the scope right now.

const { evalWithContext } = require('../util/filterEvaluator')()
const { log } = require('../util/logger')

module.exports = function mapSubscriptions (subsciptionMappings, pubsub) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add some comments on what this module is doing.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion

Comment thread server/lib/resolvers/resolverMapper.js Outdated
const result = await resolverFn(obj, args, context, info)
resolve(result)

const { pubsub, topic, compiledPayload } = publishOpts
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would wrap the code from here into separate function like notifySubscriptions() that could be used in different context.

@wtrocki
Copy link
Copy Markdown
Contributor

wtrocki commented Jul 24, 2018

@darahayes Performing verification and adjusting memelist app

@darahayes darahayes requested a review from david-martin July 24, 2018 09:22
@wtrocki
Copy link
Copy Markdown
Contributor

wtrocki commented Jul 24, 2018

@darahayes Adjusting now the memolist app for testing. Initial subscription looked like this. Do we support action filter from mobile? Something like here: https://github.com/aerogear/aerogear-android-example-apps/blob/07fcffdb17b4a7bd2543097b1b5e8a6aef159dc5/memeolist/app/src/main/graphql/org/aerogear/android/app/memeolist/graphql/memes.graphql#L17-L19

@darahayes
Copy link
Copy Markdown
Author

@wtrocki I have never seen this kind of syntax before. We definitely don't support it right now. Can you link to some Apollo/Gprahql documentation that shows this off in more detail? I'd like to learn more

@aliok
Copy link
Copy Markdown
Contributor

aliok commented Jul 24, 2018

@darahayes @wtrocki that I think is from the schema generated by Graphcool. When you define a type it generates queries/mutations/subscriptions.
Example type:

type Note {
  id: ID! @isUnique
  text: String
}

Portion of generated stuff:

allNotes(filter: NoteFilter, orderBy: NoteOrderBy, skip: Int, after: String, before: String, first: Int, last: Int): NoteConnection!
...
input NoteFilter {
  # Logical AND on all given filters.
  AND: [NoteFilter!]

  # Logical OR on all given filters.
  OR: [NoteFilter!]
  id: ID

  # All values that are not equal to given value.
  id_not: ID

  # All values that are contained in given list.
  id_in: [ID!]

  # All values that are not contained in given list.
  id_not_in: [ID!]
...
}

That's still valid Graphql.

@wtrocki
Copy link
Copy Markdown
Contributor

wtrocki commented Jul 24, 2018

Verified with graphql. Verified on the Android application.
Rebase needed! I will be nice to put some of the description from this PR into some rough docs in the repository so we will have some documentation draft.

Copy link
Copy Markdown
Contributor

@david-martin david-martin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified from a functionality point of view.
Structure of code looks very reasonable too. Good to have the wrapper around postgres pubsub
Approved.

@darahayes
Copy link
Copy Markdown
Author

@aliok Thanks for the great feedback. I believe I have incorporated all of it except the thing about extracting the filterEvaluator library. That will require a lot of extra time.

Let me know what you think!

Comment thread server/server.js Outdated
log.info('Received schema change notification. Rebuilding it')
let newSchema
try {
newSubScriptionServer(server, schema)
Copy link
Copy Markdown
Contributor

@wtrocki wtrocki Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be the newSchema that is build bellow?

throw new Error(`PubSub implementation for ${pubsubConfig.type} is missing a constructor`)
}

const pubsub = new PubSubClass(pubsubConfig.config)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is not better add all const on top of it? All together.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@camilamacedo86 not sure what you mean, it wouldn't be possible to declare this at an earlier point in the code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be inside of the func getPublish (PubSubClass); as described in the next comment.

module.exports = function NewPubSub (pubsubConfig) {
const PubSubClass = notifiers[pubsubConfig.type]

if (!PubSubClass) {
Copy link
Copy Markdown
Contributor

@camilamacedo86 camilamacedo86 Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is not better encapsulate these if in functions the call them, something like as:

module.exports = function NewPubSub (pubsubConfig) {
   const PubSubClass = notifiers[pubsubConfig.type]
   try {
       checkObject(PubSubClass);
       return getPublish(PubSubClass);
   } catch (error) {
       throws error;
   }
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @camilamacedo86 thanks for reviewing! Your suggestion is very good and fair, but I do not see the need to move any of this stuff into separate functions right now because because this is the only place we will ever run these checks.

Copy link
Copy Markdown
Contributor

@camilamacedo86 camilamacedo86 Jul 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@darahayes the problem is if we don't start the code as it should be then it will not grow as we wish. This code has many ifs, and if someone will improve it will add more ifs then if we encapsulate the logic is not required comments because what it is doing will be clear as we are doing things that allow improvements in a healthy way. Is it makes sense for you?

Comment thread server/config/index.js
}
}

const port = process.env.HTTP_PORT || '8000'
Copy link
Copy Markdown
Contributor

@camilamacedo86 camilamacedo86 Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not better add all const/var declarations in the top without space?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable declarations at the top are typically used for declaring imports.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@darahayes at on top typically is declared all that is global to the program in all languages. For default, it follows the following standard/convention.

imports ...
global vars ...
functions/methods ...

It is something important to make easier to keep the project maintained, global vars in many places make harder to read and understand the code as could cause mistakes, for example, declare the same value again in another global var.

Note that this standard is valid for any language. global variables are often available by declaring a variable at the top level of the program. See here

Also, it is a good practice to avoid global variables in JS and try to use always local variable instead of it. Following some references.

Comment thread server/config/index.js
subscriptionsEndpoint: `ws://${hostname()}:${port}/subscriptions`
},
postgresConfig: {
database: process.env.POSTGRES_DATABASE || 'aerogear_data_sync_db',
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IHMO the fixed values as 'aerogear_data_sync_db' could be declared as const on to of it.

Copy link
Copy Markdown
Contributor

@aliok aliok Jul 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on that. @camilamacedo86 great suggestion. I will create a GH issue for that.
I don't want this PR to get any bigger. I am too lazy to create a JIRA for something minor though.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#33

@aliok
Copy link
Copy Markdown
Contributor

aliok commented Jul 26, 2018

@darahayes verified functionality locally but only the case in memeo list schema. unit tests look good as well.
Could you please review the test cases at the integration tests JIRA? https://issues.jboss.org/browse/AEROGEAR-7691

Approving this PR.

@aliok
Copy link
Copy Markdown
Contributor

aliok commented Jul 26, 2018

@david-martin

Another thing: we need to discuss with David and the team about syncing the sync server instances. I know we talked about it and it seems necessary, but maybe it is not in the scope right now.

What do you think about this? cc @darahayes

This is about syncing the sync server states using the same mechanisms we have. @darahayes showed some great thinking and brought this, but we're not sure if this is something in scope.

Imagine the scenario:

  • User1 is connected to server1 (websocket connection subscriptions)
  • User2 is connected to server2 (websocket connection subscriptions)
  • User1 creates a document which should result in publishing of a subscription event
  • User2 won't get the event

@darahayes
Copy link
Copy Markdown
Author

@aliok This will work when using the postgres pubsub implementation.

  • Client 1 is connected to server 1
  • Client 2 is connected to server 2
  • Client 1 performs an action (e.g. create a new resource)
  • Server 1 publishes this to the Postgres PubSub mechanism
  • Server 1 and Server 2 receive the message and notify clients as necessary.

It's basically implemented although the postgres Pubsub implementation in this PR is really a skeleton and I haven't tried it at all.

@darahayes
Copy link
Copy Markdown
Author

^^ I would suggest adding the following steps to the integration testing ticket:

  • Get data sync server working with Postgres Pubsub implementation
  • Integration tests for Postgres Pubsub implementation

I feel that we should focus the integration tests much more on the Postgres implementation because the In Memory one is really not for production use.

@aliok
Copy link
Copy Markdown
Contributor

aliok commented Jul 26, 2018

@darahayes

This will work when using the postgres pubsub implementation.

You're right. I think I got confused with the in memory pubsub.

I would suggest adding the following steps to the integration testing ticket:

added to the ticket.

I feel that we should focus the integration tests much more on the Postgres implementation because the In Memory one is really not for production use.

We can target having Postgres pubsub for integration tests initially and can see how easy it is to create separate tests for pubsub later.

@darahayes
Copy link
Copy Markdown
Author

@aliok Great! I have rebased now so ready to merge as long as you're happy!

@darahayes darahayes changed the title [WIP] feat: initial subscriptions implementation feat: initial subscriptions implementation Jul 26, 2018
@darahayes darahayes merged commit c1eb87c into master Jul 26, 2018
@darahayes darahayes deleted the subscriptions branch July 26, 2018 10:40
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants