Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Individual consumer groupIds #36

Open
cramhead opened this issue Oct 5, 2020 · 6 comments
Open

Individual consumer groupIds #36

cramhead opened this issue Oct 5, 2020 · 6 comments

Comments

@cramhead
Copy link

cramhead commented Oct 5, 2020

Question: Would there be a good way to pass a consumer groupId on each subscription? This would allow those who's connection was broken to reconnect and get missed events.

Similarly, is there a good way to specify a topic on publish and to consume from, such that we could publish and consume from multiple topics?

@wtrocki
Copy link

wtrocki commented Dec 2, 2020

@cramhead I was actually forced to rewrite this library to support it couple times with different approaches. Using single topic seems to be antipattern etc. however multiple topics bring another set of challenges.

There is easy workaround for it which is to create different pubSub engine for each subscription for object. This however doesn't resolve problem of ignoring client side ids and reconnecting to get proper message offsets.

When we use multiple topics we will need to organise data around events we support. This might be something like:

  • topic per database model (debezium can create those)
  • topic per resolver group (app creates those)
  • event update topic ( mutation doing some multiple updates)

All of this has very strong correlation to implementation. For example if our subscription supports fetching relationships and we have topics per model we need to do stream processing - joins etc.

@wtrocki
Copy link

wtrocki commented Dec 3, 2020

@cramhead @ancashoria if we would come with generic solution for this problem together I'm happy to PR this repository or.. create separate library to cover it.

@cramhead
Copy link
Author

cramhead commented Dec 3, 2020

@wtrocki sound good. I'd like to contribute where I can.
I'm relatively new to the whole Kafka world so I'm sure it will be a learning experience.

It would seem that many of the wants around features like multiple topics and reconnection for missed events are really around managing mappings between clients and connections. I'm under the impression that the Kafka client does much of this and would really just need to make sure the inputs are appropriate such that it can perform its functions. For example, we need to provide the consumer group id and the Kafka client will take care finding the events that haven't been delivered

@wtrocki
Copy link

wtrocki commented Dec 4, 2020

@cramhead I would love us to discuss which of the approaches I have pointed out to you above would be more appealing - contributing can come next.

For example, we need to provide the consumer group id and the Kafka client will take care finding the events that haven't been delivered

So each client (web) has their own consumer group and you have some topics structure that they can connect with. How about using plain SSE instead of ws with graphQL. It sounds like GraphQL would give us number of problems (as we cannot relatively mitigate reconnection offset and most of the client side subscriptions libraries doesn't handle that nice so we would not only need fix entire server but also rewrite some bits of the graphql-subscription libraires.

@cramhead
Copy link
Author

cramhead commented Dec 4, 2020

@wtrocki excellent idea. Better to understand the problem and approaches clearly, before implementing anything and choosing a single problem is much simpler than trying to solve all problems at once..

Thinking SSE vs GraphQL. I think that GraphQL is agnostic to the transport mechanism. I think there are implementations that use websockets (standard) and some that use SSE (new/prototypes). The user needs to be able to subscribe whether it's delivered by SSE or GraphQL. I think that SSE has a number of good advantage and the lack of options and standard approaches is the only thing holding it back. That could be changing https://wundergraph.com/blog/deprecate_graphql_subscriptions_over_websockets. At the end of the day I think that GraphQL provides a "standard" way for consumers to subscribe and builds on expectations that consumers have on around strong typing, setting boundaries of what needs to be fulfilled, but allowing the backend to do so it whatever way is best, ....

Regarding consumer groups and reconnection, I'm thinking that there is a connect and disconnect event can be used, https://www.apollographql.com/docs/apollo-server/data/subscriptions/#lifecycle-events. The connect would used to identify the user, as they would be passing in some sort of identity token, e.g. JWT. So by the end of the connect call you have an authorized user, but that user will have multiple subscriptions, this implies multiple consumer groups. I expect each consumer group is mapped to a specific topic. If the same user, i.e. identity, connects with independent subscription, e.g. the same user connecting from multiple devices, they would have the multiple consumer groups so some other info would be needed to identify their consumer groups. It may be best to deal with that topic separately.

I expect after the user is identified the client request continues to make call to the subscribe function. There could be many steps here, but I'm totally unaware. This function needs to be able to pass in consumer group information as well as topic information. I expect this is where graphql-kafka-subscriptions plays prime role. It would be taking care of passing that info the client so it could do something like below.

For example:
Sorry I don't know much node-rdkafka, but expect it can do all https://github.com/tulios/kafkajs can do.

const consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.subscribe({ topic: 'test-topic' })

I'm not entirely sure how that data would be passed in while still adhering to the interface provide by https://github.com/apollographql/graphql-subscriptions/blob/master/src/pubsub-engine.ts

@wtrocki
Copy link

wtrocki commented Dec 4, 2020

Amazing response. I'm going to take look into this
I would start with some diagrams and see if there is easy and reliable way to do it

thinking that there is a connect and disconnect event can be used

From my experience those do not work very well with websockets. I would need to see how this API can be used with SSE

consumer groups so some other info would be needed to identify their consumer groups.

Each subscription client generates this unique Id on connection for multiplexing needs. This can be used.. but I'm not sure if using consumer groups this way will not use them improperly - what is the overhead of consumer group creation as in this there will be a lot of those - basically each user and topic will have them.

Drawback of this approach I do not see how to enable subscriptions that will also fetch relationships.

There is also other architecture where abstraction of subscribed users and their Ids, filters is kept in the server and then results for those users are materialised in form of live query.

This will technically work better when subscriptions have generic filtering but end up the same if subscription filters are done for current user.
Hasura has been doing this

https://hasura.io/blog/1-million-active-graphql-subscriptions/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants