Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

question: guide on how to use smart subscriptions? #674

Closed
hyusetiawan opened this issue Nov 16, 2022 · 13 comments
Closed

question: guide on how to use smart subscriptions? #674

hyusetiawan opened this issue Nov 16, 2022 · 13 comments

Comments

@hyusetiawan
Copy link

hyusetiawan commented Nov 16, 2022

trying to make smartsubscriptions work backed by redis, but I am stumped on how to retrigger the refetching of the data? I am following the example but it just says "retrigger" the subscription, what does that mean?

I wonder if there is an example to show how to retrigger at query/object/field level as stated in the docs?

here is my setup:
builder.ts


export const builder = new SchemaBuilder<{
  Context: {
    req: NextApiRequest
    res: NextApiResponse
    session?: (Omit<Session, 'user'> & { user: User }) | null
  }
  Scalars: {
    DateTime: {
      Input: Date
      Output: Date
    }
    JSON: {
      Input: Prisma.JsonValue
      Output: Prisma.JsonValue
    }
  }
  PrismaTypes: PrismaTypes
}>({
  plugins: [
    PrismaPlugin,
    ErrorsPlugin,
    RelayPlugin,
    SimpleObjectsPlugin,
    WithInputPlugin,
    ValidationPlugin,
    SmartSubscriptionsPlugin,
  ],
  relayOptions: {
    idFieldName: 'pid',
    nodesOnConnection: true,
  },
  smartSubscriptions: {
    async subscribe(name, context, cb) {
      // should this be per user?
      console.log('NAME: ', name)
      await redisSub.subscribe(name, (...args) => {
        console.log('redisSub.subscribe', name, args)
        cb(...args)
      })
    },
    async unsubscribe(name, context) {
      console.log('redisSub.unsubscribe')
      await redisSub.unsubscribe(name)
    },
  },
  prisma: {
    client: prisma,
    // defaults to false, uses /// comments from prisma schema as descriptions
    // for object types, relations and exposed fields.
    // descriptions can be omitted by setting description to false
    exposeDescriptions: true,
    // use where clause from prismaRelatedConnection for totalCount (will true by default in next major version)
    filterConnectionTotalCount: true,
  },
})

and my query field:


builder.queryField('table', (t) =>
  t.prismaField({
    smartSubscription: true,
    subscribe(subscriptions, parent, args, context, info) {
      console.log('table query field: ', args.pid)
      subscriptions.register(`table/${args.pid as any}`, {
        filter: (bla) => {
          console.log('filter?', bla)
          return true
        },
      })
    },

    type: 'Table',
    args: {
      pid: t.arg.id({
        required: true,
      }),
    },
    nullable: true,
    resolve: async (_, __, { pid }, ctx) => {
      return prisma.table.findUnique({
        where: {
          pid: String(pid),
        },
      })
    },
  }),
)

and my mutation to trigger the update:


builder.mutationField('updateTable', (t) =>
  t.prismaFieldWithInput({
    errors: {
      types: [ZodError],
    },
    typeOptions: {
      name: 'UpdateTableInput',
    },
    input: {
      pid: t.input.id({
        required: true,
      }),
      name: t.input.string({
        required: false,
        validate: {
          schema: TableDBSchema.shape.name,
        },
      }),
    },

    type: 'Table',

    async resolve(query, _, { input }, ctx) {
      const res = await prisma.table.update({
        ...query,
        where: {
          pid: String(input.pid),
        },
        data: {
          // undefined is no op
          name: input.name == null ? undefined : input.name,
        },
      })
      console.log('redis publish: ', `table/${input.pid}`)
      await redisPub.publish(`table/${input.pid}`, '')
      return res
    },
  }),
)
@hayes
Copy link
Owner

hayes commented Nov 17, 2022

I don't have a full guide, or stand-alone example, but there is an example used for testing. It is a bit complex because it needs to test a lot of use-cases and edge cases, but it may be helpful.

You can see where it publishes events to re-trigger queries here: https://github.com/hayes/pothos/blob/main/packages/plugin-smart-subscriptions/tests/example/schema/poll.ts#L161

@hayes
Copy link
Owner

hayes commented Nov 17, 2022

Your existing solution looks pretty close (I don't immediately see what the issue is). I assume there is an issue currently that is causing things not to update?

I assume you have checked that publish and subscribe are being called for the same ID, does the resolver for the query field ever get called after the initial query?

@hyusetiawan
Copy link
Author

I managed to get it to a working state, i want to make sure i understand the way the data flows. if I do pubsub.publish('channel', {id: 123}) where does that data of {id: 123} go to? In the case of query, does it go as args? what if it was objectField? does it go into findUnique parameter for prisma?

@hayes
Copy link
Owner

hayes commented Nov 17, 2022

it will be passed to invalidateCache, and filter callbacks, but it doesn't get passed into the root resolver. The assumption is that generally event systems don't have the data granularity to properly re-resolve most graphql queries, so instead we assume that the content of an event is telling you WHAT changed, but that you will re-load the data from its source in response to the event. If the event has a lot of data, you could write the new value into something like a dataloader.

We also can't really replace/edit the arguments or context of a resolver, because you may need the original values to properly re-resolve the affected field

@hyusetiawan
Copy link
Author

hyusetiawan commented Nov 17, 2022

I see, if that is the case, given the current mechanism, could you suggest how to accomplish the following use case?

subscription {
   table(ids: [1,2,3]) {
      patches {
          op
          path
      }
   }
}

the idea is for the client to subscribe to a stream of patches, because of the client architecture requirements, I need to do some manual sync between the graphql store and a front-end state management.

in updateTable, redisPub.publish, I can publish the patch data like so:

      await redisPub.publish(`table/${input.pid}`, {op: 'replace', path: ['name'], value: 'new name'})

the only way I can think of are workarounds, is to access this data in invalidateCache, and filter like you mentioned and pass that data to the resolver somehow?

export const TableDBSchema = z.object({
  name: z.string().min(3).max(20),
})

builder.prismaObject('Table', {
  interfaces: [Node, WithAuthor],
  // subscribe(subscriptions, parent, context, info) {
  //   console.log('object subscribe: ', `table/${parent.pid}`)
  //   subscriptions.register(`table/${parent.pid}`)
  // },
  fields: (t) => ({
    name: t.exposeString('name'),
    columns: t.relation('columns', {}),
    rows: t.relation('rows', {}),
    patches: t.field({
      type: [TablePatch],
      subscribe: (subscriptions, parent) => {
        subscriptions.register(`table/patches/${parent.pid}`)
      },
      resolve(parent, args, context, info) {
        console.log('PATCH FIELD: ', args, parent)
        return [
          {
// how to get the data from the published event? or is there a better way to accomplish this?
          },
        ]
      },
    }),
  }),
})

@hyusetiawan
Copy link
Author

the more i think about it, i wonder if it's advisable to use trigger the event and in the resolver, I'd read from redis data structures such as popping from a list? so it's read only once, but will this work for multiple subscriptions? Or will other subscriptions for patches would return an empty list because it's already read?

@hayes
Copy link
Owner

hayes commented Nov 17, 2022

I think what you are describing is better modeled as a normal subscription.

Something like

subscription {
   tablePatches(ids: [1,2,3]) {
      tableId
      op
      path
   }
}
builder.subscriptionField('tablePatches', t => t.field({
   type: TablePatch,
   args: {
     ids: t.arg.idList()
    },
   subscribe: (_, args, ctx) => {
      // return an async iterator that iterates over each pubsub event for each id
      return subscribeToIds(args.ids)
   },
   resolve(parent) {
      return parent;
   }
}));

Your case doesn't really map well for the inteded use case of smart subscriptions which is like "live queries". Basically I have a graphql query, but I want to re-run it when something relevant gets updated. Smart subscriptions requires that you can re-load you data, and is meant for the use case of having queries that are always up-to-date, not so much for streaming a log of events. If you write your changes to your DB, the solution would be to re-query the DB in your resolver, but that doesn't really make sense here.

The above is a more traditional pattern for subscriptions, and doesn't need a plugin

@hyusetiawan
Copy link
Author

I see, apologies for use-case specific questions but I am not familiar with the subscribeToIds, is this the place where I would create a listener? such as pubsub.subscribe ?
how do I populate the parent in resolve?

@hayes
Copy link
Owner

hayes commented Nov 17, 2022

Oh, that's just a placeholder. You would need to build an async iterator that emits your patches by subscribeing to the pubsub instance.

@hyusetiawan
Copy link
Author

hmm I am not following, could you maybe provide a high level pseudo code that I can flesh out if you don't mind?

@hyusetiawan
Copy link
Author

i figured it out, I am using this package: https://github.com/davidyaha/graphql-redis-subscriptions
unfortunately, there is a typescript error that I am not sure how to fix (functionality wise, works fine, but am not sure what hidden issues might arise)


builder.subscriptionField('tablePatches', (t) =>
  t.field({
    subscribe: (parent, { pids }, ctx) => {
      return PubSub.asyncIterator(pids.map((pid) => `table/patches/${pid}`))
    },
    type: [TablePatch],
    args: {
      pids: t.arg.idList({
        required: true,
      }),
    },
    nullable: true,
    resolve: async (patch, { pids }, __, ctx) => {
      return [patch] as any
    },
  }),
)

Screenshot 2022-11-17 at 12 35 52 AM

@hayes
Copy link
Owner

hayes commented Nov 18, 2022

nice! glad you got it figured out. That type error is related to this issue: apollographql/graphql-subscriptions#261

@hayes
Copy link
Owner

hayes commented Nov 18, 2022

I guess this is actually a different package with the same issue, opened an issue here: davidyaha/graphql-redis-subscriptions#555

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants