Skip to content

Commit

Permalink
mqtt protocol binding (#910)
Browse files Browse the repository at this point in the history
* add receiver and opener for mqtt broker

Signed-off-by: myan <myan@redhat.com>

add protocol and sample files

Signed-off-by: myan <myan@redhat.com>

rollback samples to go1.17

Signed-off-by: myan <myan@redhat.com>

move the protocol to go1.17

Signed-off-by: myan <myan@redhat.com>

add message test

Signed-off-by: myan <myan@redhat.com>

trigger to run the integration test

Signed-off-by: myan <myan@redhat.com>

fixed go mod issue

Signed-off-by: myan <myan@redhat.com>

resolve the review issue

Signed-off-by: myan <myan@redhat.com>

remove the useless comment

Signed-off-by: myan <myan@redhat.com>

add ut for writeMessage

Signed-off-by: myan <myan@redhat.com>

go mod tidy

Signed-off-by: myan <myan@redhat.com>

add intergration test

Signed-off-by: myan <myan@redhat.com>

solve the uncheck error

Signed-off-by: myan <myan@redhat.com>

fix the integration error

Signed-off-by: myan <myan@redhat.com>

fix the integration error

Signed-off-by: myan <myan@redhat.com>

reply the review

Signed-off-by: myan <myan@redhat.com>

simpler tests

Signed-off-by: myan <myan@redhat.com>

remove the nesting

Signed-off-by: myan <myan@redhat.com>

refactor the recevier logic

Signed-off-by: myan <myan@redhat.com>

add a timer for assert loop

Signed-off-by: myan <myan@redhat.com>

* add copyright

Signed-off-by: myan <myan@redhat.com>

---------

Signed-off-by: myan <myan@redhat.com>
  • Loading branch information
yanmxa committed Jul 13, 2023
1 parent f681ac6 commit fdcb2d2
Show file tree
Hide file tree
Showing 18 changed files with 1,093 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/integration.yaml
Expand Up @@ -54,6 +54,11 @@ jobs:
}
ports:
- 5672:5672

mqtt:
image: eclipse-mosquitto:1.6
ports:
- 1883:1883

steps:
- name: Checkout code
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -35,3 +35,4 @@ test/benchmark/e2e/http/results/
tmp/
test/coverage.tmp

vendor/
25 changes: 25 additions & 0 deletions protocol/mqtt_paho/v2/go.mod
@@ -0,0 +1,25 @@
module github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2

go 1.17

replace github.com/cloudevents/sdk-go/v2 => ../../../v2

require (
github.com/cloudevents/sdk-go/v2 v2.5.0
github.com/eclipse/paho.golang v0.11.0
github.com/stretchr/testify v1.8.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.5.5 // indirect
github.com/json-iterator/go v1.1.10 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
55 changes: 55 additions & 0 deletions protocol/mqtt_paho/v2/go.sum
@@ -0,0 +1,55 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.golang v0.11.0 h1:6Avu5dkkCfcB61/y1vx+XrPQ0oAl4TPYtY0uw3HbQdM=
github.com/eclipse/paho.golang v0.11.0/go.mod h1:rhrV37IEwauUyx8FHrvmXOKo+QRKng5ncoN1vJiJMcs=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
119 changes: 119 additions & 0 deletions protocol/mqtt_paho/v2/message.go
@@ -0,0 +1,119 @@
/*
Copyright 2023 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package mqtt_paho

import (
"bytes"
"context"
"strings"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/format"
"github.com/cloudevents/sdk-go/v2/binding/spec"
"github.com/eclipse/paho.golang/paho"
)

const (
prefix = "ce-"
contentType = "Content-Type"
)

var specs = spec.WithPrefix(prefix)

// Message represents a MQTT message.
// This message *can* be read several times safely
type Message struct {
internal *paho.Publish
version spec.Version
format format.Format
}

// Check if Message implements binding.Message
var (
_ binding.Message = (*Message)(nil)
_ binding.MessageMetadataReader = (*Message)(nil)
)

func NewMessage(msg *paho.Publish) *Message {
var f format.Format
var v spec.Version
if msg.Properties != nil {
// Use properties.User["Content-type"] to determine if message is structured
if s := msg.Properties.User.Get(contentType); format.IsFormat(s) {
f = format.Lookup(s)
} else if s := msg.Properties.User.Get(specs.PrefixedSpecVersionName()); s != "" {
v = specs.Version(s)
}
}
return &Message{
internal: msg,
version: v,
format: f,
}
}

func (m *Message) ReadEncoding() binding.Encoding {
if m.version != nil {
return binding.EncodingBinary
}
if m.format != nil {
return binding.EncodingStructured
}
return binding.EncodingUnknown
}

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
if m.version != nil {
return binding.ErrNotStructured
}
if m.format == nil {
return binding.ErrNotStructured
}
return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(m.internal.Payload))
}

func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) (err error) {
if m.format != nil {
return binding.ErrNotBinary
}

for _, userProperty := range m.internal.Properties.User {
if strings.HasPrefix(userProperty.Key, prefix) {
attr := m.version.Attribute(userProperty.Key)
if attr != nil {
err = encoder.SetAttribute(attr, userProperty.Value)
} else {
err = encoder.SetExtension(strings.TrimPrefix(userProperty.Key, prefix), userProperty.Value)
}
} else if userProperty.Key == contentType {
err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), string(userProperty.Value))
}
if err != nil {
return
}
}

if m.internal.Payload != nil {
return encoder.SetData(bytes.NewBuffer(m.internal.Payload))
}
return nil
}

func (m *Message) Finish(error) error {
return nil
}

func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{}) {
attr := m.version.AttributeFromKind(k)
if attr != nil {
return attr, m.internal.Properties.User.Get(prefix + attr.Name())
}
return nil, nil
}

func (m *Message) GetExtension(name string) interface{} {
return m.internal.Properties.User.Get(prefix + name)
}
69 changes: 69 additions & 0 deletions protocol/mqtt_paho/v2/message_test.go
@@ -0,0 +1,69 @@
/*
Copyright 2023 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package mqtt_paho

import (
"context"
"testing"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/eclipse/paho.golang/paho"
)

func TestReadStructured(t *testing.T) {
tests := []struct {
name string
msg *paho.Publish
wantErr error
}{
{
name: "nil format",
msg: &paho.Publish{
Payload: []byte(""),
},
wantErr: binding.ErrNotStructured,
},
{
name: "json format",
msg: &paho.Publish{
Payload: []byte(""),
Properties: &paho.PublishProperties{
User: []paho.UserProperty{{Key: contentType, Value: event.ApplicationCloudEventsJSON}},
},
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
msg := NewMessage(tc.msg)
err := msg.ReadStructured(context.Background(), (*pubMessageWriter)(tc.msg))
if err != tc.wantErr {
t.Errorf("Error unexpected. got: %v, want: %v", err, tc.wantErr)
}
})
}
}

func TestReadBinary(t *testing.T) {
msg := &paho.Publish{
Payload: []byte("{hello:world}"),
Properties: &paho.PublishProperties{
User: []paho.UserProperty{
{Key: "ce-specversion", Value: "1.0"},
{Key: "ce-type", Value: "binary.test"},
{Key: "ce-source", Value: "test-source"},
{Key: "ce-id", Value: "ABC-123"},
},
},
}

message := NewMessage(msg)
err := message.ReadBinary(context.Background(), (*pubMessageWriter)(msg))
if err != nil {
t.Errorf("Error unexpected. got: %v", err)
}
}

0 comments on commit fdcb2d2

Please sign in to comment.