From 808bf38859c6ae119375a1f0bb7ed8485e652c01 Mon Sep 17 00:00:00 2001 From: myan Date: Thu, 20 Jul 2023 09:23:50 +0000 Subject: [PATCH] provide the qos and retain configuration for mqtt protocol Signed-off-by: myan to review Signed-off-by: myan avoid all-or-nothing Signed-off-by: myan rollback all-or-nothing Signed-off-by: myan convert tparamter to pointers Signed-off-by: myan fixed doc string Signed-off-by: myan remove the protocolContext Signed-off-by: myan add context for protocol Signed-off-by: myan initialize mqtt protocol with publish/subscribe options Signed-off-by: myan remove the sender options from context Signed-off-by: myan using functional options in new() Signed-off-by: myan add ctx to the new() function Signed-off-by: myan --- protocol/mqtt_paho/v2/option.go | 48 +++++++ protocol/mqtt_paho/v2/protocol.go | 120 ++++++++++-------- samples/mqtt/receiver/main.go | 14 +- samples/mqtt/sender/main.go | 11 +- test/integration/mqtt_paho/mqtt_test.go | 31 +++-- .../mqtt_paho_binding/mqtt_test.go | 17 ++- 6 files changed, 157 insertions(+), 84 deletions(-) create mode 100644 protocol/mqtt_paho/v2/option.go diff --git a/protocol/mqtt_paho/v2/option.go b/protocol/mqtt_paho/v2/option.go new file mode 100644 index 00000000..955a1621 --- /dev/null +++ b/protocol/mqtt_paho/v2/option.go @@ -0,0 +1,48 @@ +/* + Copyright 2023 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package mqtt_paho + +import ( + "fmt" + + "github.com/eclipse/paho.golang/paho" +) + +// Option is the function signature required to be considered an mqtt_paho.Option. +type Option func(*Protocol) error + +// WithConnect sets the paho.Connect configuration for the client. This option is not required. +func WithConnect(connOpt *paho.Connect) Option { + return func(p *Protocol) error { + if connOpt == nil { + return fmt.Errorf("the paho.Connect option must not be nil") + } + p.connOption = connOpt + return nil + } +} + +// WithPublish sets the paho.Publish configuration for the client. This option is required if you want to send messages. +func WithPublish(publishOpt *paho.Publish) Option { + return func(p *Protocol) error { + if publishOpt == nil { + return fmt.Errorf("the paho.Publish option must not be nil") + } + p.publishOption = publishOpt + return nil + } +} + +// WithSubscribe sets the paho.Subscribe configuration for the client. This option is required if you want to receive messages. +func WithSubscribe(subscribeOpt *paho.Subscribe) Option { + return func(p *Protocol) error { + if subscribeOpt == nil { + return fmt.Errorf("the paho.Subscribe option must not be nil") + } + p.subscribeOption = subscribeOpt + return nil + } +} diff --git a/protocol/mqtt_paho/v2/protocol.go b/protocol/mqtt_paho/v2/protocol.go index 01c5b8c1..261fc6c3 100644 --- a/protocol/mqtt_paho/v2/protocol.go +++ b/protocol/mqtt_paho/v2/protocol.go @@ -19,12 +19,11 @@ import ( ) type Protocol struct { - client *paho.Client - connConfig *paho.Connect - senderTopic string - receiverTopics []string - qos byte - retained bool + client *paho.Client + config *paho.ClientConfig + connOption *paho.Connect + publishOption *paho.Publish + subscribeOption *paho.Subscribe // receiver incoming chan *paho.Publish @@ -41,80 +40,89 @@ var ( _ protocol.Closer = (*Protocol)(nil) ) -func New(ctx context.Context, clientConfig *paho.ClientConfig, connConfig *paho.Connect, SenderTopic string, - ReceiverTopics []string, qos byte, retained bool, -) (*Protocol, error) { - client := paho.NewClient(*clientConfig) - ca, err := client.Connect(ctx, connConfig) +func New(ctx context.Context, config *paho.ClientConfig, opts ...Option) (*Protocol, error) { + if config == nil { + return nil, fmt.Errorf("the paho.ClientConfig must not be nil") + } + + p := &Protocol{ + client: paho.NewClient(*config), + // default connect option + connOption: &paho.Connect{ + KeepAlive: 30, + CleanStart: true, + }, + incoming: make(chan *paho.Publish), + closeChan: make(chan struct{}), + } + if err := p.applyOptions(opts...); err != nil { + return nil, err + } + + // Connect to the MQTT broker + connAck, err := p.client.Connect(ctx, p.connOption) if err != nil { return nil, err } - if ca.ReasonCode != 0 { - return nil, fmt.Errorf("failed to connect to %s : %d - %s", client.Conn.RemoteAddr(), ca.ReasonCode, - ca.Properties.ReasonString) + if connAck.ReasonCode != 0 { + return nil, fmt.Errorf("failed to connect to %q : %d - %q", p.client.Conn.RemoteAddr(), connAck.ReasonCode, + connAck.Properties.ReasonString) } - return &Protocol{ - client: client, - connConfig: connConfig, - senderTopic: SenderTopic, - receiverTopics: ReceiverTopics, - qos: qos, - retained: retained, - incoming: make(chan *paho.Publish), - openerMutex: sync.Mutex{}, - closeChan: make(chan struct{}), - }, nil + return p, nil +} + +func (p *Protocol) applyOptions(opts ...Option) error { + for _, fn := range opts { + if err := fn(p); err != nil { + return err + } + } + return nil } -func (t *Protocol) Send(ctx context.Context, m binding.Message, transformers ...binding.Transformer) error { +func (p *Protocol) Send(ctx context.Context, m binding.Message, transformers ...binding.Transformer) error { + if p.publishOption == nil { + return fmt.Errorf("the paho.Publish option must not be nil") + } + var err error defer m.Finish(err) - topic := cecontext.TopicFrom(ctx) - if topic == "" { - topic = t.senderTopic + msg := p.publishOption + if cecontext.TopicFrom(ctx) != "" { + msg.Topic = cecontext.TopicFrom(ctx) + cecontext.WithTopic(ctx, "") } - msg := &paho.Publish{ - QoS: t.qos, - Retain: t.retained, - Topic: topic, - } err = WritePubMessage(ctx, m, msg, transformers...) if err != nil { return err } - _, err = t.client.Publish(ctx, msg) + _, err = p.client.Publish(ctx, msg) if err != nil { return err } return err } -func (t *Protocol) OpenInbound(ctx context.Context) error { - t.openerMutex.Lock() - defer t.openerMutex.Unlock() +func (p *Protocol) OpenInbound(ctx context.Context) error { + if p.subscribeOption == nil { + return fmt.Errorf("the paho.Subscribe option must not be nil") + } + + p.openerMutex.Lock() + defer p.openerMutex.Unlock() logger := cecontext.LoggerFrom(ctx) - t.client.Router = paho.NewSingleHandlerRouter(func(m *paho.Publish) { - t.incoming <- m + p.client.Router = paho.NewSingleHandlerRouter(func(m *paho.Publish) { + p.incoming <- m }) - subs := make(map[string]paho.SubscribeOptions) - for _, topic := range t.receiverTopics { - subs[topic] = paho.SubscribeOptions{ - QoS: t.qos, - RetainAsPublished: t.retained, - } - } - - logger.Infof("subscribe to topics: %v", t.receiverTopics) - _, err := t.client.Subscribe(ctx, &paho.Subscribe{ - Subscriptions: subs, - }) + logger.Infof("subscribing to topics: %v", p.subscribeOption.Subscriptions) + _, err := p.client.Subscribe(ctx, p.subscribeOption) if err != nil { return err } @@ -122,15 +130,15 @@ func (t *Protocol) OpenInbound(ctx context.Context) error { // Wait until external or internal context done select { case <-ctx.Done(): - case <-t.closeChan: + case <-p.closeChan: } - return t.client.Disconnect(&paho.Disconnect{ReasonCode: 0}) + return p.client.Disconnect(&paho.Disconnect{ReasonCode: 0}) } // Receive implements Receiver.Receive -func (t *Protocol) Receive(ctx context.Context) (binding.Message, error) { +func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) { select { - case m, ok := <-t.incoming: + case m, ok := <-p.incoming: if !ok { return nil, io.EOF } diff --git a/samples/mqtt/receiver/main.go b/samples/mqtt/receiver/main.go index d76b3eb0..7951c4ae 100644 --- a/samples/mqtt/receiver/main.go +++ b/samples/mqtt/receiver/main.go @@ -11,7 +11,7 @@ import ( "log" "net" - cemqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2" + mqtt_paho "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/eclipse/paho.golang/paho" ) @@ -22,16 +22,16 @@ func main() { if err != nil { log.Fatalf("failed to connect to mqtt broker: %s", err.Error()) } - clientConfig := &paho.ClientConfig{ + config := &paho.ClientConfig{ ClientID: "receiver-client-id", Conn: conn, } - cp := &paho.Connect{ - KeepAlive: 30, - CleanStart: true, + subscribeOpt := &paho.Subscribe{ + Subscriptions: map[string]paho.SubscribeOptions{ + "test-topic": {QoS: 0}, + }, } - - p, err := cemqtt.New(ctx, clientConfig, cp, "", []string{"test-topic"}, 0, false) + p, err := mqtt_paho.New(ctx, config, mqtt_paho.WithSubscribe(subscribeOpt)) if err != nil { log.Fatalf("failed to create protocol: %s", err.Error()) } diff --git a/samples/mqtt/sender/main.go b/samples/mqtt/sender/main.go index 77bf3a41..10c2a03e 100644 --- a/samples/mqtt/sender/main.go +++ b/samples/mqtt/sender/main.go @@ -15,7 +15,7 @@ import ( "github.com/eclipse/paho.golang/paho" "github.com/google/uuid" - cemqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2" + mqtt_paho "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2" cecontext "github.com/cloudevents/sdk-go/v2/context" ) @@ -29,18 +29,19 @@ func main() { if err != nil { log.Fatalf("failed to connect to mqtt broker: %s", err.Error()) } - clientConfig := &paho.ClientConfig{ + config := &paho.ClientConfig{ ClientID: "sender-client-id", Conn: conn, } - cp := &paho.Connect{ + // optional connect option + connOpt := &paho.Connect{ KeepAlive: 30, CleanStart: true, } // set a default topic with test-topic1 - p, err := cemqtt.New(ctx, clientConfig, cp, "test-topic1", nil, 0, false) + p, err := mqtt_paho.New(ctx, config, mqtt_paho.WithPublish(&paho.Publish{Topic: "test-topic1"}), mqtt_paho.WithConnect(connOpt)) if err != nil { - log.Fatalf("failed to create protocol: %s", err.Error()) + log.Fatalf("failed to create protocol: %v", err) } defer p.Close(ctx) diff --git a/test/integration/mqtt_paho/mqtt_test.go b/test/integration/mqtt_paho/mqtt_test.go index 3ec3e0af..72631cc1 100644 --- a/test/integration/mqtt_paho/mqtt_test.go +++ b/test/integration/mqtt_paho/mqtt_test.go @@ -17,6 +17,7 @@ import ( mqtt_paho "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2" cloudevents "github.com/cloudevents/sdk-go/v2" + cecontext "github.com/cloudevents/sdk-go/v2/context" "github.com/cloudevents/sdk-go/v2/event" "github.com/cloudevents/sdk-go/v2/test" ) @@ -37,7 +38,7 @@ func TestSendEvent(t *testing.T) { eventChan := make(chan receiveEvent) defer close(eventChan) go func() { - client, err := cloudevents.NewClient(protocolFactory(t, topicName)) + client, err := cloudevents.NewClient(protocolFactory(ctx, t, topicName)) if err != nil { eventChan <- receiveEvent{err: err} return @@ -51,8 +52,8 @@ func TestSendEvent(t *testing.T) { } }() - // start a cloudevents sender client go to send the event - client, err := cloudevents.NewClient(protocolFactory(t, topicName)) + // start a cloudevents sender client go to send the event, set the topic on context + client, err := cloudevents.NewClient(protocolFactory(ctx, t, "")) require.NoError(t, err) timer := time.NewTimer(5 * time.Millisecond) @@ -67,7 +68,10 @@ func TestSendEvent(t *testing.T) { test.AssertEventEquals(t, inEvent, test.ConvertEventExtensionsToString(t, eventOut.event)) return case <-timer.C: - result := client.Send(ctx, inEvent) + result := client.Send( + cecontext.WithTopic(ctx, topicName), + inEvent, + ) require.NoError(t, result) // the receiver mightn't be ready before the sender send the message, so wait and we retry continue @@ -79,20 +83,27 @@ func TestSendEvent(t *testing.T) { // To start a local environment for testing: // docker run -it --rm --name mosquitto -p 1883:1883 eclipse-mosquitto:2.0 mosquitto -c /mosquitto-no-auth.conf // the protocolFactory will generate a unique connection clientId when it be invoked -func protocolFactory(t testing.TB, topicName string) *mqtt_paho.Protocol { - ctx := context.Background() - +func protocolFactory(ctx context.Context, t testing.TB, topicName string) *mqtt_paho.Protocol { broker := "127.0.0.1:1883" conn, err := net.Dial("tcp", broker) require.NoError(t, err) - clientConfig := &paho.ClientConfig{ + config := &paho.ClientConfig{ Conn: conn, } - cp := &paho.Connect{ + connOpt := &paho.Connect{ KeepAlive: 30, CleanStart: true, } - p, err := mqtt_paho.New(ctx, clientConfig, cp, topicName, []string{topicName}, 0, false) + publishOpt := &paho.Publish{ + Topic: topicName, QoS: 0, + } + subscribeOpt := &paho.Subscribe{ + Subscriptions: map[string]paho.SubscribeOptions{ + topicName: {QoS: 0}, + }, + } + + p, err := mqtt_paho.New(ctx, config, mqtt_paho.WithConnect(connOpt), mqtt_paho.WithPublish(publishOpt), mqtt_paho.WithSubscribe(subscribeOpt)) require.NoError(t, err) return p diff --git a/test/integration/mqtt_paho_binding/mqtt_test.go b/test/integration/mqtt_paho_binding/mqtt_test.go index c96c47bc..7d96d73d 100644 --- a/test/integration/mqtt_paho_binding/mqtt_test.go +++ b/test/integration/mqtt_paho_binding/mqtt_test.go @@ -126,13 +126,18 @@ func getProtocol(ctx context.Context, topic string) (*mqtt_paho.Protocol, error) if err != nil { return nil, err } - cp := &paho.Connect{ - KeepAlive: 30, - CleanStart: true, + config := &paho.ClientConfig{ + Conn: conn, + } + publishOpt := &paho.Publish{ + Topic: topic, QoS: 0, + } + subscribeOpt := &paho.Subscribe{ + Subscriptions: map[string]paho.SubscribeOptions{ + topic: {QoS: 0}, + }, } - p, err := mqtt_paho.New(ctx, &paho.ClientConfig{ - Conn: conn, - }, cp, topic, []string{topic}, 0, false) + p, err := mqtt_paho.New(ctx, config, mqtt_paho.WithPublish(publishOpt), mqtt_paho.WithSubscribe(subscribeOpt)) return p, err }