Skip to content

Commit

Permalink
provide the qos and retain configuration for mqtt protocol
Browse files Browse the repository at this point in the history
Signed-off-by: myan <myan@redhat.com>

to review

Signed-off-by: myan <myan@redhat.com>

avoid all-or-nothing

Signed-off-by: myan <myan@redhat.com>

rollback all-or-nothing

Signed-off-by: myan <myan@redhat.com>

convert tparamter to pointers

Signed-off-by: myan <myan@redhat.com>

fixed doc string

Signed-off-by: myan <myan@redhat.com>

remove the protocolContext

Signed-off-by: myan <myan@redhat.com>

add context for protocol

Signed-off-by: myan <myan@redhat.com>

initialize mqtt protocol with publish/subscribe options

Signed-off-by: myan <myan@redhat.com>

remove the sender options from context

Signed-off-by: myan <myan@redhat.com>

using functional options in new()

Signed-off-by: myan <myan@redhat.com>

add ctx to the new() function

Signed-off-by: myan <myan@redhat.com>
  • Loading branch information
yanmxa committed Jul 30, 2023
1 parent 3dfc033 commit 808bf38
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 84 deletions.
48 changes: 48 additions & 0 deletions protocol/mqtt_paho/v2/option.go
@@ -0,0 +1,48 @@
/*
Copyright 2023 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package mqtt_paho

import (
"fmt"

"github.com/eclipse/paho.golang/paho"
)

// Option is the function signature required to be considered an mqtt_paho.Option.
type Option func(*Protocol) error

// WithConnect sets the paho.Connect configuration for the client. This option is not required.
func WithConnect(connOpt *paho.Connect) Option {
return func(p *Protocol) error {
if connOpt == nil {
return fmt.Errorf("the paho.Connect option must not be nil")
}
p.connOption = connOpt
return nil
}
}

// WithPublish sets the paho.Publish configuration for the client. This option is required if you want to send messages.
func WithPublish(publishOpt *paho.Publish) Option {
return func(p *Protocol) error {
if publishOpt == nil {
return fmt.Errorf("the paho.Publish option must not be nil")
}
p.publishOption = publishOpt
return nil
}
}

// WithSubscribe sets the paho.Subscribe configuration for the client. This option is required if you want to receive messages.
func WithSubscribe(subscribeOpt *paho.Subscribe) Option {
return func(p *Protocol) error {
if subscribeOpt == nil {
return fmt.Errorf("the paho.Subscribe option must not be nil")
}
p.subscribeOption = subscribeOpt
return nil
}
}
120 changes: 64 additions & 56 deletions protocol/mqtt_paho/v2/protocol.go
Expand Up @@ -19,12 +19,11 @@ import (
)

type Protocol struct {
client *paho.Client
connConfig *paho.Connect
senderTopic string
receiverTopics []string
qos byte
retained bool
client *paho.Client
config *paho.ClientConfig
connOption *paho.Connect
publishOption *paho.Publish
subscribeOption *paho.Subscribe

// receiver
incoming chan *paho.Publish
Expand All @@ -41,96 +40,105 @@ var (
_ protocol.Closer = (*Protocol)(nil)
)

func New(ctx context.Context, clientConfig *paho.ClientConfig, connConfig *paho.Connect, SenderTopic string,
ReceiverTopics []string, qos byte, retained bool,
) (*Protocol, error) {
client := paho.NewClient(*clientConfig)
ca, err := client.Connect(ctx, connConfig)
func New(ctx context.Context, config *paho.ClientConfig, opts ...Option) (*Protocol, error) {
if config == nil {
return nil, fmt.Errorf("the paho.ClientConfig must not be nil")
}

p := &Protocol{
client: paho.NewClient(*config),
// default connect option
connOption: &paho.Connect{
KeepAlive: 30,
CleanStart: true,
},
incoming: make(chan *paho.Publish),
closeChan: make(chan struct{}),
}
if err := p.applyOptions(opts...); err != nil {
return nil, err
}

// Connect to the MQTT broker
connAck, err := p.client.Connect(ctx, p.connOption)
if err != nil {
return nil, err
}
if ca.ReasonCode != 0 {
return nil, fmt.Errorf("failed to connect to %s : %d - %s", client.Conn.RemoteAddr(), ca.ReasonCode,
ca.Properties.ReasonString)
if connAck.ReasonCode != 0 {
return nil, fmt.Errorf("failed to connect to %q : %d - %q", p.client.Conn.RemoteAddr(), connAck.ReasonCode,
connAck.Properties.ReasonString)
}

return &Protocol{
client: client,
connConfig: connConfig,
senderTopic: SenderTopic,
receiverTopics: ReceiverTopics,
qos: qos,
retained: retained,
incoming: make(chan *paho.Publish),
openerMutex: sync.Mutex{},
closeChan: make(chan struct{}),
}, nil
return p, nil
}

func (p *Protocol) applyOptions(opts ...Option) error {
for _, fn := range opts {
if err := fn(p); err != nil {
return err
}
}
return nil
}

func (t *Protocol) Send(ctx context.Context, m binding.Message, transformers ...binding.Transformer) error {
func (p *Protocol) Send(ctx context.Context, m binding.Message, transformers ...binding.Transformer) error {
if p.publishOption == nil {
return fmt.Errorf("the paho.Publish option must not be nil")
}

var err error
defer m.Finish(err)

topic := cecontext.TopicFrom(ctx)
if topic == "" {
topic = t.senderTopic
msg := p.publishOption
if cecontext.TopicFrom(ctx) != "" {
msg.Topic = cecontext.TopicFrom(ctx)
cecontext.WithTopic(ctx, "")
}

msg := &paho.Publish{
QoS: t.qos,
Retain: t.retained,
Topic: topic,
}
err = WritePubMessage(ctx, m, msg, transformers...)
if err != nil {
return err
}

_, err = t.client.Publish(ctx, msg)
_, err = p.client.Publish(ctx, msg)
if err != nil {
return err
}
return err
}

func (t *Protocol) OpenInbound(ctx context.Context) error {
t.openerMutex.Lock()
defer t.openerMutex.Unlock()
func (p *Protocol) OpenInbound(ctx context.Context) error {
if p.subscribeOption == nil {
return fmt.Errorf("the paho.Subscribe option must not be nil")
}

p.openerMutex.Lock()
defer p.openerMutex.Unlock()

logger := cecontext.LoggerFrom(ctx)

t.client.Router = paho.NewSingleHandlerRouter(func(m *paho.Publish) {
t.incoming <- m
p.client.Router = paho.NewSingleHandlerRouter(func(m *paho.Publish) {
p.incoming <- m
})

subs := make(map[string]paho.SubscribeOptions)
for _, topic := range t.receiverTopics {
subs[topic] = paho.SubscribeOptions{
QoS: t.qos,
RetainAsPublished: t.retained,
}
}

logger.Infof("subscribe to topics: %v", t.receiverTopics)
_, err := t.client.Subscribe(ctx, &paho.Subscribe{
Subscriptions: subs,
})
logger.Infof("subscribing to topics: %v", p.subscribeOption.Subscriptions)
_, err := p.client.Subscribe(ctx, p.subscribeOption)
if err != nil {
return err
}

// Wait until external or internal context done
select {
case <-ctx.Done():
case <-t.closeChan:
case <-p.closeChan:
}
return t.client.Disconnect(&paho.Disconnect{ReasonCode: 0})
return p.client.Disconnect(&paho.Disconnect{ReasonCode: 0})
}

// Receive implements Receiver.Receive
func (t *Protocol) Receive(ctx context.Context) (binding.Message, error) {
func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) {
select {
case m, ok := <-t.incoming:
case m, ok := <-p.incoming:
if !ok {
return nil, io.EOF
}
Expand Down
14 changes: 7 additions & 7 deletions samples/mqtt/receiver/main.go
Expand Up @@ -11,7 +11,7 @@ import (
"log"
"net"

cemqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
mqtt_paho "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/eclipse/paho.golang/paho"
)
Expand All @@ -22,16 +22,16 @@ func main() {
if err != nil {
log.Fatalf("failed to connect to mqtt broker: %s", err.Error())
}
clientConfig := &paho.ClientConfig{
config := &paho.ClientConfig{
ClientID: "receiver-client-id",
Conn: conn,
}
cp := &paho.Connect{
KeepAlive: 30,
CleanStart: true,
subscribeOpt := &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
"test-topic": {QoS: 0},
},
}

p, err := cemqtt.New(ctx, clientConfig, cp, "", []string{"test-topic"}, 0, false)
p, err := mqtt_paho.New(ctx, config, mqtt_paho.WithSubscribe(subscribeOpt))
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
Expand Down
11 changes: 6 additions & 5 deletions samples/mqtt/sender/main.go
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/eclipse/paho.golang/paho"
"github.com/google/uuid"

cemqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
mqtt_paho "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
cecontext "github.com/cloudevents/sdk-go/v2/context"
)

Expand All @@ -29,18 +29,19 @@ func main() {
if err != nil {
log.Fatalf("failed to connect to mqtt broker: %s", err.Error())
}
clientConfig := &paho.ClientConfig{
config := &paho.ClientConfig{
ClientID: "sender-client-id",
Conn: conn,
}
cp := &paho.Connect{
// optional connect option
connOpt := &paho.Connect{
KeepAlive: 30,
CleanStart: true,
}
// set a default topic with test-topic1
p, err := cemqtt.New(ctx, clientConfig, cp, "test-topic1", nil, 0, false)
p, err := mqtt_paho.New(ctx, config, mqtt_paho.WithPublish(&paho.Publish{Topic: "test-topic1"}), mqtt_paho.WithConnect(connOpt))
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
log.Fatalf("failed to create protocol: %v", err)
}
defer p.Close(ctx)

Expand Down
31 changes: 21 additions & 10 deletions test/integration/mqtt_paho/mqtt_test.go
Expand Up @@ -17,6 +17,7 @@ import (

mqtt_paho "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
cecontext "github.com/cloudevents/sdk-go/v2/context"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/test"
)
Expand All @@ -37,7 +38,7 @@ func TestSendEvent(t *testing.T) {
eventChan := make(chan receiveEvent)
defer close(eventChan)
go func() {
client, err := cloudevents.NewClient(protocolFactory(t, topicName))
client, err := cloudevents.NewClient(protocolFactory(ctx, t, topicName))
if err != nil {
eventChan <- receiveEvent{err: err}
return
Expand All @@ -51,8 +52,8 @@ func TestSendEvent(t *testing.T) {
}
}()

// start a cloudevents sender client go to send the event
client, err := cloudevents.NewClient(protocolFactory(t, topicName))
// start a cloudevents sender client go to send the event, set the topic on context
client, err := cloudevents.NewClient(protocolFactory(ctx, t, ""))
require.NoError(t, err)

timer := time.NewTimer(5 * time.Millisecond)
Expand All @@ -67,7 +68,10 @@ func TestSendEvent(t *testing.T) {
test.AssertEventEquals(t, inEvent, test.ConvertEventExtensionsToString(t, eventOut.event))
return
case <-timer.C:
result := client.Send(ctx, inEvent)
result := client.Send(
cecontext.WithTopic(ctx, topicName),
inEvent,
)
require.NoError(t, result)
// the receiver mightn't be ready before the sender send the message, so wait and we retry
continue
Expand All @@ -79,20 +83,27 @@ func TestSendEvent(t *testing.T) {
// To start a local environment for testing:
// docker run -it --rm --name mosquitto -p 1883:1883 eclipse-mosquitto:2.0 mosquitto -c /mosquitto-no-auth.conf
// the protocolFactory will generate a unique connection clientId when it be invoked
func protocolFactory(t testing.TB, topicName string) *mqtt_paho.Protocol {
ctx := context.Background()

func protocolFactory(ctx context.Context, t testing.TB, topicName string) *mqtt_paho.Protocol {
broker := "127.0.0.1:1883"
conn, err := net.Dial("tcp", broker)
require.NoError(t, err)
clientConfig := &paho.ClientConfig{
config := &paho.ClientConfig{
Conn: conn,
}
cp := &paho.Connect{
connOpt := &paho.Connect{
KeepAlive: 30,
CleanStart: true,
}
p, err := mqtt_paho.New(ctx, clientConfig, cp, topicName, []string{topicName}, 0, false)
publishOpt := &paho.Publish{
Topic: topicName, QoS: 0,
}
subscribeOpt := &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
topicName: {QoS: 0},
},
}

p, err := mqtt_paho.New(ctx, config, mqtt_paho.WithConnect(connOpt), mqtt_paho.WithPublish(publishOpt), mqtt_paho.WithSubscribe(subscribeOpt))
require.NoError(t, err)

return p
Expand Down
17 changes: 11 additions & 6 deletions test/integration/mqtt_paho_binding/mqtt_test.go
Expand Up @@ -126,13 +126,18 @@ func getProtocol(ctx context.Context, topic string) (*mqtt_paho.Protocol, error)
if err != nil {
return nil, err
}
cp := &paho.Connect{
KeepAlive: 30,
CleanStart: true,
config := &paho.ClientConfig{
Conn: conn,
}
publishOpt := &paho.Publish{
Topic: topic, QoS: 0,
}
subscribeOpt := &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
topic: {QoS: 0},
},
}

p, err := mqtt_paho.New(ctx, &paho.ClientConfig{
Conn: conn,
}, cp, topic, []string{topic}, 0, false)
p, err := mqtt_paho.New(ctx, config, mqtt_paho.WithPublish(publishOpt), mqtt_paho.WithSubscribe(subscribeOpt))
return p, err
}

0 comments on commit 808bf38

Please sign in to comment.