New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement FanIn #221
Implement FanIn #221
Conversation
pubsub/gochannel/fanin.go
Outdated
} | ||
|
||
// Subscribe implements the standard Subscriber interface. | ||
func (f *FanIn) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this method still make sense for FanIn
? It seems unused now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this also came to my mind, how it should be used 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah oh, I got it.
Maybe it would be better to not expose subscriber here, but just provide higher level API that is doing that under the hood? In other words - to be able to use FanIn without creating the router. So in that case I could just do
fanin, err := gochannel.NewFanIn(upstreamPubSub, config, logger)
// ...
fanin.Run()
In that case API may be much simpler to use. But the question is if in that case it fits to your use case ;-)
pubsub/gochannel/fanin.go
Outdated
@@ -0,0 +1,139 @@ | |||
package gochannel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as it's not specific for a gochannel, I think that it may be worth to move to https://github.com/ThreeDotsLabs/watermill/tree/master/components 😉
pubsub/gochannel/fanin.go
Outdated
// AddSubscription adds an internal subscriptions. | ||
// You need to call this method with slice of `fromTopics` and `toTopic` before the FanIn is started. | ||
// AddSubscription is idempotent. | ||
func (f *FanIn) AddSubscription(fromTopics []string, toTopic string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to call this method with slice of
fromTopics
andtoTopic
before the FanIn is started.
so maybe it's better to add it to the constructor (and add some kind of config). In that case design would be simpler 😉
As it's still in progress I converted it to draft. |
Hey @0michalsokolowski0, sorry for a bit of delay with the review 😅 All good, thanks for the contribution 🍻 |
Create component multiplexing messages from many topics on one