Skip to content

[DRAFT] feature: channel broker handler #743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

n2h9
Copy link

@n2h9 n2h9 commented Jun 16, 2025

Description

This PR implements broker.Handler interface implementation based on golang channels.
Which is useful as a part of "meshsync as a library initiative";

Notes for Reviewers

Signed commits

  • Yes, I signed my commits.

Signed-off-by: Nikita B <n2h9z4@gmail.com>
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @n2h9, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request lays the groundwork for a new broker.Handler implementation, TMPChannelBrokerHandler, designed to facilitate in-process message communication using Go channels. This is a strategic move to support the "meshsync as a library initiative," providing a lightweight and efficient internal messaging mechanism. The changes encompass the core handler logic, robust error reporting for message publishing, and a flexible configuration system.

Highlights

  • New Broker Implementation: This PR introduces a new TMPChannelBrokerHandler that implements the broker.Handler interface, utilizing Go channels for in-process message brokering.
  • Channel-based Messaging: The handler provides Publish and SubscribeWithChannel methods, enabling messages to be sent to and received from specific subjects and queues via Go channels, supporting a publish-subscribe pattern within the application.
  • Custom Error Handling: Custom error types and functions have been implemented to provide detailed feedback on publish operations, including lists of successfully delivered and failed queues.
  • Configurable Options: The channel broker's behavior can be configured through options such as SingleChannelBufferSize and PublishToChannelDelay, using a functional options pattern for flexibility.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This PR introduces a broker.Handler implementation using Go channels. Key areas for improvement:

  1. Concurrency: The storage map is accessed without synchronization.
  2. Interface Compliance: DeepCopyInto, DeepCopy, and DeepCopyObject are not properly implemented.
  3. Error Handling: Improve error handling in PublishWithChannel and Subscribe.
  4. Naming and Typos: Address the "TMP" prefix and typos.


// SubscribeWithChannel will publish all the messages received to the given channel
func (h *TMPChannelBrokerHandler) SubscribeWithChannel(subject, queue string, msgch chan *broker.Message) error {
if h.storage[subject] == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The h.storage map is accessed and modified in this method (and others like Publish, CloseConnection, ConnectedEndpoints) without any synchronization. Protect all accesses to h.storage using a sync.RWMutex.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a great point, thank you gemi! 🤗

go func() {
// as soon as this channel will be closed, for loop will end
for msg := range msgch {
h.Publish(subject, msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The h.Publish(subject, msg) call can return an error, which is currently ignored. Handle this error, at least by logging it.

if err := h.Publish(subject, msg); err != nil {
	// TODO: Implement proper error handling. For now, logging the error is a good first step.
	// log.Printf("Error publishing message via PublishWithChannel to subject %s: %v", subject, err)
}

}

// DeepCopyInto is a deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (h *TMPChannelBrokerHandler) DeepCopyInto(out broker.Handler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The DeepCopyInto method is marked as "Not supported" and performs no operations. This method should panic to clearly signal that the operation is unimplemented.

// Not supported
// TODO
// it is used in meshery server in operator_helper, check if this is code base which is in use
panic("DeepCopyInto is not implemented for TMPChannelBrokerHandler")

// DeepCopy is a deepcopy function, copying the receiver, creating a new Nats.
func (h *TMPChannelBrokerHandler) DeepCopy() broker.Handler {
// Not supported
return h
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The DeepCopy method currently returns h, which is a pointer to the original TMPChannelBrokerHandler instance, not a deep copy. This method should panic to clearly indicate that the operation is unimplemented.

// Not supported
panic("DeepCopy is not implemented for TMPChannelBrokerHandler")

// DeepCopyObject is a deepcopy function, copying the receiver, creating a new broker.Handler.
func (h *TMPChannelBrokerHandler) DeepCopyObject() broker.Handler {
// Not supported
return h
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The DeepCopyObject method returns h, the original instance, instead of a new, deep-copied object. This method should panic to clearly signal its unimplemented status.

// Not supported
panic("DeepCopyObject is not implemented for TMPChannelBrokerHandler")

}

func NewTMPChannelBrokerHandler(optsSetters ...OptionsSetter) *TMPChannelBrokerHandler {
options := DefautOptions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Typo in DefautOptions. Should be DefaultOptions.

Suggested change
options := DefautOptions
options := DefaultOptions

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated ✅


// Not supported

return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Subscribe method is documented as "Not supported" but returns nil. Return an explicit error instead.

return fmt.Errorf("Subscribe method is not supported by TMPChannelBrokerHandler")

for message := range h.storage[subject][queue] {
// this flow is correct as if we have more than one consumer for one queue
// only one will receive the message
msgch <- message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The send operation msgch <- message will block if the subscriber's channel msgch is full. Consider making this send non-blocking or using a timeout.

PublishToChannelDelay time.Duration
}

var DefautOptions = Options{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Typo in the variable name DefautOptions. Should be DefaultOptions.

Suggested change
var DefautOptions = Options{
var DefaultOptions = Options{

@@ -1,5 +1,5 @@
{
"name": "meshkit",
"type": "library",
"next_error_code": 11319
"next_error_code": 11320
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Update the next error code.

n2h9 added 4 commits June 18, 2025 19:11
…MPChannelBrokerHandler to ChannelBrokerHandler

Signed-off-by: Nikita B <n2h9z4@gmail.com>
Signed-off-by: Nikita B <n2h9z4@gmail.com>
Signed-off-by: Nikita B <n2h9z4@gmail.com>
Signed-off-by: Nikita B <n2h9z4@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant