-
Notifications
You must be signed in to change notification settings - Fork 139
[DRAFT] feature: channel broker handler #743
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Nikita B <n2h9z4@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @n2h9, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request lays the groundwork for a new broker.Handler
implementation, TMPChannelBrokerHandler
, designed to facilitate in-process message communication using Go channels. This is a strategic move to support the "meshsync as a library initiative," providing a lightweight and efficient internal messaging mechanism. The changes encompass the core handler logic, robust error reporting for message publishing, and a flexible configuration system.
Highlights
- New Broker Implementation: This PR introduces a new
TMPChannelBrokerHandler
that implements thebroker.Handler
interface, utilizing Go channels for in-process message brokering. - Channel-based Messaging: The handler provides
Publish
andSubscribeWithChannel
methods, enabling messages to be sent to and received from specific subjects and queues via Go channels, supporting a publish-subscribe pattern within the application. - Custom Error Handling: Custom error types and functions have been implemented to provide detailed feedback on publish operations, including lists of successfully delivered and failed queues.
- Configurable Options: The channel broker's behavior can be configured through options such as
SingleChannelBufferSize
andPublishToChannelDelay
, using a functional options pattern for flexibility.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This PR introduces a broker.Handler
implementation using Go channels. Key areas for improvement:
- Concurrency: The
storage
map is accessed without synchronization. - Interface Compliance:
DeepCopyInto
,DeepCopy
, andDeepCopyObject
are not properly implemented. - Error Handling: Improve error handling in
PublishWithChannel
andSubscribe
. - Naming and Typos: Address the "TMP" prefix and typos.
|
||
// SubscribeWithChannel will publish all the messages received to the given channel | ||
func (h *TMPChannelBrokerHandler) SubscribeWithChannel(subject, queue string, msgch chan *broker.Message) error { | ||
if h.storage[subject] == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a great point, thank you gemi! 🤗
go func() { | ||
// as soon as this channel will be closed, for loop will end | ||
for msg := range msgch { | ||
h.Publish(subject, msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The h.Publish(subject, msg)
call can return an error, which is currently ignored. Handle this error, at least by logging it.
if err := h.Publish(subject, msg); err != nil {
// TODO: Implement proper error handling. For now, logging the error is a good first step.
// log.Printf("Error publishing message via PublishWithChannel to subject %s: %v", subject, err)
}
broker/channel/channel.go
Outdated
} | ||
|
||
// DeepCopyInto is a deepcopy function, copying the receiver, writing into out. in must be non-nil. | ||
func (h *TMPChannelBrokerHandler) DeepCopyInto(out broker.Handler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DeepCopyInto
method is marked as "Not supported" and performs no operations. This method should panic to clearly signal that the operation is unimplemented.
// Not supported
// TODO
// it is used in meshery server in operator_helper, check if this is code base which is in use
panic("DeepCopyInto is not implemented for TMPChannelBrokerHandler")
// DeepCopy is a deepcopy function, copying the receiver, creating a new Nats. | ||
func (h *TMPChannelBrokerHandler) DeepCopy() broker.Handler { | ||
// Not supported | ||
return h |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// DeepCopyObject is a deepcopy function, copying the receiver, creating a new broker.Handler. | ||
func (h *TMPChannelBrokerHandler) DeepCopyObject() broker.Handler { | ||
// Not supported | ||
return h |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
broker/channel/channel.go
Outdated
} | ||
|
||
func NewTMPChannelBrokerHandler(optsSetters ...OptionsSetter) *TMPChannelBrokerHandler { | ||
options := DefautOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated ✅
|
||
// Not supported | ||
|
||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for message := range h.storage[subject][queue] { | ||
// this flow is correct as if we have more than one consumer for one queue | ||
// only one will receive the message | ||
msgch <- message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
broker/channel/options.go
Outdated
PublishToChannelDelay time.Duration | ||
} | ||
|
||
var DefautOptions = Options{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -1,5 +1,5 @@ | |||
{ | |||
"name": "meshkit", | |||
"type": "library", | |||
"next_error_code": 11319 | |||
"next_error_code": 11320 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…MPChannelBrokerHandler to ChannelBrokerHandler Signed-off-by: Nikita B <n2h9z4@gmail.com>
Signed-off-by: Nikita B <n2h9z4@gmail.com>
Signed-off-by: Nikita B <n2h9z4@gmail.com>
Signed-off-by: Nikita B <n2h9z4@gmail.com>
Description
This PR implements broker.Handler interface implementation based on golang channels.
Which is useful as a part of "meshsync as a library initiative";
Notes for Reviewers
Signed commits