Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provide publish/subscribe options for MQTT protocol #924

Merged
merged 1 commit into from
Jul 30, 2023

Conversation

yanmxa
Copy link
Contributor

@yanmxa yanmxa commented Jul 20, 2023

Signed-off-by: myan myan@redhat.com

Provide more flexible parameter configurations when using the MQTT protocol to send events.

@yanmxa yanmxa changed the title provide the Topic/QoS/Retain configuration for MQTT Sender protocol provide Topic/QoS/Retain configuration for MQTT Sender protocol Jul 20, 2023
Comment on lines 87 to 88
QoS: qos,
Retain: retain,
Copy link
Member

@embano1 embano1 Jul 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (readability): please reorder, following topic/retain/qos flow from above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

@embano1 embano1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Digging more into the implementation, I have a general question @yanmxa on this PR: it seems that we're replicating functionality in the protocol context which is already (partially) available in the protocol constructor New(). Can you provide an example why someone would use the context vs New() to configure these settings? I understand that we do this to overwrite behavior for certain, but not all, Send() calls in protocols. However, this proposal seems to create inconsistencies between the options available in the constructor New() and what can be defined on context. Also, I'm afraid we have to keep up with field level changes, i.e., add more fields, with the current proposal, so I was wondering whether there's a generic MQTT options/property struct we should allow (use) to influence all parameters in Send() instead of handpicking a couple of options?

@yanmxa
Copy link
Contributor Author

yanmxa commented Jul 22, 2023

1. Why someone would use the context vs. New() to configure these settings?

When using MQTT sends a message, you can configure many parameters according to your needs. We provide the ProtocolConext here to support users to pass all the parameters to MQTT publisher. For example, let's create a ProtocolContext and then add it to the ctx.

qos := 0
retained := false
packetId := uint16(123)
properties := &paho.PublishProperties{
    TopicAlias: paho.Uint16(3),
}
pctx := cemqtt.ProtocolContext{
     Topic: "test",
      QoS : &qos,
      Retained: &retained,
      PacketID: &packetId,
      PublishProperties: properties
}
ctx = cemqtt.WithProtocolContext(ctx, pctx)

But in most cases, users only specify topic/qos/retain when publishing messages, and do not set packet-related parameters, so I provide a New method here to deal with such cases

if result := c.Send(
    cemqtt.WithProtocolContext(ctx, cemqtt.NewProtocolContext("test-topic", 0, false)),
    e,
);

2. Whether there generic MQTT options/property struct we should allow (use) to influence all parameters in Send() instead of handpicking a couple of options?

Yes. There is a generic MQTT option struct when sending the message, But in its implementation, both payload and properties are placed in the struct. That's the reason I created the ProtocolContext to map all the properties, instead of using its own option struct.

@embano1
Copy link
Member

embano1 commented Jul 22, 2023

1. Why someone would use the context vs. New() to configure these settings?

When using MQTT sends a message, you can configure many parameters according to your needs. We provide the ProtocolConext here to support users to pass all the parameters to MQTT publisher. For example, let's create a ProtocolContext and then add it to the ctx.
But in most cases, users only specify topic/qos/retain when publishing messages, and do not set packet-related parameters, so I provide a New method here to deal with such cases

I don't think this is the right use case for Context here. We generally use a specific Context to overwrite certain behavior for individual Send calls. In your case, I'd suggest using a default New constructor with functional options. I'm not 100% clear on whether the topic value will always come from cecontext or must be passed to New(). WDYT?

2. Whether there generic MQTT options/property struct we should allow (use) to influence all parameters in Send() instead of handpicking a couple of options?

Yes. There is a generic MQTT option struct when sending the message, But in its implementation, both payload and properties are placed in the struct. That's the reason I created the ProtocolContext to map all the properties, instead of using its own option struct.

I just checked our implementation and IMHO it would be fine to have the Context use the whole struct assuming a user would know how to use it (since he's intentionally overwriting behavior) and would not pass a payload (which would anyways be overwritten during SetData).

@yanmxa
Copy link
Contributor Author

yanmxa commented Jul 24, 2023

1. Why someone would use the context vs. New() to configure these settings?
When using MQTT sends a message, you can configure many parameters according to your needs. We provide the ProtocolConext here to support users to pass all the parameters to MQTT publisher. For example, let's create a ProtocolContext and then add it to the ctx.
But in most cases, users only specify topic/qos/retain when publishing messages, and do not set packet-related parameters, so I provide a New method here to deal with such cases

I don't think this is the right use case for Context here. We generally use a specific Context to overwrite certain behavior for individual Send calls. In your case, I'd suggest using a default New constructor with functional options. I'm not 100% clear on whether the topic value will always come from cecontext or must be passed to New(). WDYT?

2. Whether there generic MQTT options/property struct we should allow (use) to influence all parameters in Send() instead of handpicking a couple of options?
Yes. There is a generic MQTT option struct when sending the message, But in its implementation, both payload and properties are placed in the struct. That's the reason I created the ProtocolContext to map all the properties, instead of using its own option struct.

I just checked our implementation and IMHO it would be fine to have the Context use the whole struct assuming a user would know how to use it (since he's intentionally overwriting behavior) and would not pass a payload (which would anyways be overwritten during SetData).

Agree with that. we can keep using function options with other parameters. At the same time, discard protocolContext and use the struct option as the value.

@yanmxa yanmxa force-pushed the br_mqtt_context branch 9 times, most recently from 9804f12 to 8213648 Compare July 24, 2023 12:59
@yanmxa
Copy link
Contributor Author

yanmxa commented Jul 24, 2023

@embano1 If I append the mqtt function option in the file v1/context/context.go, then we have to update so many go.mod and go.sum files just like this PR. Another way is to add this MQTT function option to the directory protocol/mqtt_phao/v2, this only requires updating a few files.
So which one do you think is better?

@yanmxa yanmxa force-pushed the br_mqtt_context branch 2 times, most recently from 5ab4a9b to 90ae1c2 Compare July 24, 2023 13:19
@embano1
Copy link
Member

embano1 commented Jul 25, 2023

Sorry, I think I may have confused you somehow. I did not suggest using cecontext for this, but to implement everything in func New() of the protocol API e.g., using functional options and defaults. See http protocol implementation for how this is done there.

@yanmxa yanmxa force-pushed the br_mqtt_context branch 2 times, most recently from 0b64341 to fa18dfb Compare July 26, 2023 09:16
@yanmxa yanmxa changed the title provide Topic/QoS/Retain configuration for MQTT Sender protocol provide publish options for MQTT Sender protocol Jul 26, 2023
@embano1
Copy link
Member

embano1 commented Jul 27, 2023

@yanmxa looks like you reverted to the mqtt context approach instead of what I meant following other protocol implementations e.g., http. Can you please review

func New(opts ...Option) (*Protocol, error) {
and see if this approach would work for your proposal? I know it's a breaking change but we haven't cut a release with mqtt anyways so strictly it's non-breaking.

@yanmxa
Copy link
Contributor Author

yanmxa commented Jul 28, 2023

Thanks @embano1! I misunderstood before.
Also, I think we can support setting the options in both theNew() protocol and Send()(cause the settings sent each time may be different from the ones at initialization) at the same time. What do you think?

@embano1
Copy link
Member

embano1 commented Jul 28, 2023

Thanks @embano1! I misunderstood before.

Also, I think we can support setting the options in both theNew() protocol and Send()(cause the settings sent each time may be different from the ones at initialization) at the same time. What do you think?

Send is an interface implementation. For those cases context is typically used. Can we start with New() in this PR and create a follow up PR if needed?

@yanmxa yanmxa force-pushed the br_mqtt_context branch 2 times, most recently from e3b29d7 to d3ae386 Compare July 28, 2023 15:10
@embano1
Copy link
Member

embano1 commented Jul 28, 2023

@yanmxa what do you think of using functional options in Mew() like other protocols do to reduce API surface, allow for API evolution, set defaults, etc.?

@yanmxa
Copy link
Contributor Author

yanmxa commented Jul 28, 2023

@yanmxa what do you think of using functional options in Mew() like other protocols do to reduce API surface, allow for API evolution, set defaults, etc.?

@embano1 Do you mean define the functional options in New() like this?

Now MQTT protocol mainly has four parts option struct. In this way, we can reduce to provide only one necessary clientConfig each time, and then override or add other options as needed. This is a great idea! I'll try to implement it tomorrow.

@embano1
Copy link
Member

embano1 commented Jul 28, 2023

Great! Excited!

func WithConnect(connOpt *paho.Connect) Option {
return func(p *Protocol) error {
if connOpt == nil {
return fmt.Errorf("the paho.Connect option should not be nil")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should -> must

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace in all places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 53 to 54
openerMutex: sync.Mutex{},
clientMutex: sync.Mutex{},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: zero values not needed to be set here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}

// WithPublish sets an paho.Publish for the client. This option is required if you want to send message.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// WithPublish sets an paho.Publish for the client. This option is required if you want to send message.
// WithPublish sets the paho.Publish configuration for the client. This option is required if you want to send messages.

}
}

// WithSubscribe sets an paho.Subscribe for the client. This option is required if you want to receive message.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// WithSubscribe sets an paho.Subscribe for the client. This option is required if you want to receive message.
// WithSubscribe sets the paho.Subscribe configuration for the client. This option is required if you want to receive messages.

return err
}
if connAck.ReasonCode != 0 {
return fmt.Errorf("failed to connect to %s : %d - %s", client.Conn.RemoteAddr(), connAck.ReasonCode,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("failed to connect to %s : %d - %s", client.Conn.RemoteAddr(), connAck.ReasonCode,
return fmt.Errorf("failed to connect to %q: %d - %q", client.Conn.RemoteAddr(), connAck.ReasonCode,

p, err := mqtt_paho.New(ctx, clientConfig, connConfig, &paho.Publish{
Topic: "test-topic1", QoS: 0,
}, nil)
p, err := mqtt_paho.New(config, mqtt_paho.WithPublish(&paho.Publish{Topic: "test-topic1"}), mqtt_paho.WithConnect(connOpt))
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can use %v and just err to reduce typing :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return fmt.Errorf("the paho.Publish option should not be nil")
}

err := p.initClient(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: this is called in Send and OpenInbound which might be confusing/inconsistent across usage. Should initClient be part of New()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's okay. Just if we put initClient on New(), then the ctx must be added as the parameter of the New() constructor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is an issue and makes the protocol even easier to use/harder to misuse

protocol/mqtt_paho/v2/protocol.go Outdated Show resolved Hide resolved
client := paho.NewClient(*clientConfig)
ca, err := client.Connect(ctx, connConfig)
if err != nil {
func New(config *paho.ClientConfig, opts ...Option) (*Protocol, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

questions:

  • should we keep ctx as first arg?
  • can we supply defaults to config so it can also become an option? I see it's mostly used to set a client id and conn which can be defaulted IMHO

Copy link
Contributor Author

@yanmxa yanmxa Jul 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • We can keep the ctx as the first args. So I will init the client on the New().
  • Of course, we can set the id by default. But I havn't figure out how can we set a default conn for the config?
    If we don't specify the address of the conn, it seems there is no way to establish a connection. and throw the error message when connecting to the broker in New(): client connection is nil

Copy link
Member

@embano1 embano1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! Can you please squash the commits?

@embano1
Copy link
Member

embano1 commented Jul 30, 2023

Are there any doc-related changes needed for this?

Signed-off-by: myan <myan@redhat.com>

to review

Signed-off-by: myan <myan@redhat.com>

avoid all-or-nothing

Signed-off-by: myan <myan@redhat.com>

rollback all-or-nothing

Signed-off-by: myan <myan@redhat.com>

convert tparamter to pointers

Signed-off-by: myan <myan@redhat.com>

fixed doc string

Signed-off-by: myan <myan@redhat.com>

remove the protocolContext

Signed-off-by: myan <myan@redhat.com>

add context for protocol

Signed-off-by: myan <myan@redhat.com>

initialize mqtt protocol with publish/subscribe options

Signed-off-by: myan <myan@redhat.com>

remove the sender options from context

Signed-off-by: myan <myan@redhat.com>

using functional options in new()

Signed-off-by: myan <myan@redhat.com>

add ctx to the new() function

Signed-off-by: myan <myan@redhat.com>
@yanmxa
Copy link
Contributor Author

yanmxa commented Jul 30, 2023

Great work! Can you please squash the commits?

Done!

@yanmxa
Copy link
Contributor Author

yanmxa commented Jul 30, 2023

Are there any doc-related changes needed for this?

The protocol here has not changed, and it seems that there is no need to change the document now.

@yanmxa yanmxa changed the title provide publish options for MQTT Sender protocol provide publish/subscribe options for MQTT protocol Jul 30, 2023
@embano1 embano1 enabled auto-merge July 30, 2023 16:09
Copy link
Member

@embano1 embano1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GREAT WORK!

LGTM

@embano1 embano1 merged commit 85db5b9 into cloudevents:main Jul 30, 2023
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants