Skip to content

Commit

Permalink
Start of develop branch, rework API
Browse files Browse the repository at this point in the history
I have reworked the API to try and make it more similar to some of the
other Paho clients, and perhaps more idiomatic. I have also replaced
the internal implementation of MQTT packets with the one from my broker
project. I have replaced some structs that were taken/returned with
interfaces instead.
  • Loading branch information
Al S-M committed Sep 3, 2014
1 parent 4572889 commit 33499ed
Show file tree
Hide file tree
Showing 32 changed files with 2,177 additions and 2,883 deletions.
169 changes: 92 additions & 77 deletions client.go
Expand Up @@ -18,11 +18,24 @@ package mqtt
import (
"bufio"
"errors"
. "github.com/alsm/hrotti/packets"
"net"
"sync"
"time"
)

type Client interface {
IsConnected() bool
Start() ([]Receipt, error)
Disconnect(uint)
ForceDisconnect()
disconnect()
Publish(string, byte, bool, interface{})
Subscribe(string, byte, MessageHandler) error
SubscribeMultiple(map[string]byte, MessageHandler) error
Unsubscribe(...string) error
}

// MqttClient is a lightweight MQTT v3.1 Client for communicating
// with an MQTT server using non-blocking methods that allow work
// to be done in the background.
Expand All @@ -43,15 +56,15 @@ import (
// and then supplying a ClientOptions type.
type MqttClient struct {
sync.RWMutex
conn net.Conn
bufferedConn *bufio.ReadWriter
ibound chan *Message
obound chan sendable
oboundP chan *Message
begin chan ConnRC
errors chan error
stop chan struct{}
receipts *receiptMap
conn net.Conn
bufferedConn *bufio.ReadWriter
ibound chan ControlPacket
obound chan *PublishPacket
oboundP chan ControlPacket
begin chan byte
errors chan error
stop chan struct{}
//receipts *receiptMap
persist Store
options ClientOptions
lastContact lastcontact
Expand Down Expand Up @@ -108,34 +121,36 @@ func (c *MqttClient) Start() ([]Receipt, error) {
c.bufferedConn = bufio.NewReadWriter(bufio.NewReader(c.conn), bufio.NewWriter(c.conn))

c.persist.Open()
c.receipts = newReceiptMap()
//c.receipts = newReceiptMap()

DEBUG.Println(CLI, "about to start generateMsgIds")
c.options.mids.generateMsgIds()

c.obound = make(chan sendable)
c.ibound = make(chan *Message)
c.oboundP = make(chan *Message)
c.obound = make(chan *PublishPacket)
c.ibound = make(chan ControlPacket)
c.oboundP = make(chan ControlPacket)
c.errors = make(chan error)
c.stop = make(chan struct{})

go outgoing(c)
go alllogic(c)

cm := newConnectMsgFromOptions(c.options)
cm.ProtocolName = "MQIsdp"
cm.ProtocolVersion = 3
DEBUG.Println(CLI, "about to write new connect msg")
c.oboundP <- cm

rc := connect(c)
if rc != CONN_ACCEPTED {
CRITICAL.Println(CLI, "CONNACK was not CONN_ACCEPTED, but rather", rc2str(rc))
CRITICAL.Println(CLI, "CONNACK was not CONN_ACCEPTED, but rather", ConnackReturnCodes[rc])
// Stop all go routines except outgoing
close(c.stop)
c.conn.Close()
return nil, chkrc(rc)
return nil, connErrors[rc]
}

c.options.incomingPubChan = make(chan *Message, 100)
c.options.incomingPubChan = make(chan *PublishPacket, 100)
c.options.msgRouter.matchAndDispatch(c.options.incomingPubChan, c.options.order, c)

c.connected = true
Expand All @@ -148,7 +163,7 @@ func (c *MqttClient) Start() ([]Receipt, error) {
// Take care of any messages in the store
var leftovers []Receipt
if c.options.cleanSession == false {
leftovers = c.resume()
//leftovers = c.resume()
} else {
c.persist.Reset()
}
Expand All @@ -157,12 +172,12 @@ func (c *MqttClient) Start() ([]Receipt, error) {
go incoming(c)

DEBUG.Println(CLI, "exit startMqttClient")
if chkrc(rc) != nil {
if err := connErrors[rc]; err != nil {
// Cleanup before returning.
close(c.stop)
c.conn.Close()
}
return leftovers, chkrc(rc)
return leftovers, connErrors[rc]
}

// Disconnect will end the connection with the server, but not before waiting
Expand Down Expand Up @@ -201,13 +216,12 @@ func (c *MqttClient) ForceDisconnect() {

func (c *MqttClient) disconnect() {
c.connected = false
dm := newDisconnectMsg()

// Stop all go routines except outgoing
close(c.stop)
dm := NewControlPacket(DISCONNECT).(*DisconnectPacket)

// Send disconnect message and stop outgoing
c.oboundP <- dm
// Stop all go routines
close(c.stop)

DEBUG.Println(CLI, "disconnected")
c.persist.Close()
Expand All @@ -217,91 +231,92 @@ func (c *MqttClient) disconnect() {
// and content to the specified topic.
// Returns a read only channel used to track
// the delivery of the message.
func (c *MqttClient) Publish(qos QoS, topic string, payload interface{}) <-chan Receipt {
var pub *Message
func (c *MqttClient) Publish(topic string, qos byte, retained bool, payload interface{}) {
pub := NewControlPacket(PUBLISH).(*PublishPacket)
pub.Qos = qos
pub.TopicName = topic
pub.Retain = retained
switch payload.(type) {
case string:
pub = newPublishMsg(qos, topic, []byte(payload.(string)))
pub.Payload = []byte(payload.(string))
case []byte:
pub = newPublishMsg(qos, topic, payload.([]byte))
pub.Payload = payload.([]byte)
default:
return nil
}

r := make(chan Receipt, 1)
DEBUG.Println(CLI, "sending publish message, topic:", topic)

select {
case c.obound <- sendable{pub, r}:
case <-time.After(time.Second):
close(r)
}
return r
c.obound <- pub
}

// PublishMessage will publish a Message to the specified topic.
// Returns a read only channel used to track
// the delivery of the message.
func (c *MqttClient) PublishMessage(topic string, message *Message) <-chan Receipt {
// Just reuse pieces from the existing message
// so that message id etc aren't set
pub := newPublishMsg(message.QoS(), topic, message.payload)
pub.SetRetainedFlag(message.RetainedFlag())

r := make(chan Receipt, 1)

DEBUG.Println(CLI, "sending publish message, topic:", topic)
// Start a new subscription. Provide a MessageHandler to be executed when
// a message is published on the topic provided.
func (c *MqttClient) Subscribe(topic string, qos byte, callback MessageHandler) error {
var err error
DEBUG.Println(CLI, "enter Subscribe")
if !c.IsConnected() {
return ErrNotConnected
}
s := NewControlPacket(SUBSCRIBE).(*SubscribePacket)
DEBUG.Println(s.String())
if err = validateTopicAndQos(topic, qos); err != nil {
return err
}
s.Topics = append(s.Topics, topic)
s.Qoss = append(s.Qoss, qos)

select {
case c.obound <- sendable{pub, r}:
return r
case <-time.After(time.Second):
close(r)
return nil
if callback != nil {
c.options.msgRouter.addRoute(topic, callback)
}

c.oboundP <- s
DEBUG.Println(CLI, "exit Subscribe")
return nil
}

// Start a new subscription. Provide a MessageHandler to be executed when
// a message is published on one of the topics provided.
func (c *MqttClient) StartSubscription(callback MessageHandler, filters ...*TopicFilter) (<-chan Receipt, error) {
// Start a new subscription for multiple topics. Provide a MessageHandler to
// be executed when a message is published on one of the topics provided.
func (c *MqttClient) SubscribeMultiple(filters map[string]byte, callback MessageHandler) error {
var err error
DEBUG.Println(CLI, "enter SubscribeMultiple")
if !c.IsConnected() {
return nil, ErrNotConnected
return ErrNotConnected
}
s := NewControlPacket(SUBSCRIBE).(*SubscribePacket)
if s.Topics, s.Qoss, err = validateSubscribeMap(filters); err != nil {
return err
}
DEBUG.Println(CLI, "enter StartSubscription")
submsg := newSubscribeMsg(filters...)
chkcond(submsg != nil)

if callback != nil {
for i := range filters {
c.options.msgRouter.addRoute(filters[i].string, callback)
for topic, _ := range filters {
c.options.msgRouter.addRoute(topic, callback)
}
}

r := make(chan Receipt, 1)
//r := make(chan Receipt, 1)

c.obound <- sendable{submsg, r}
c.oboundP <- s

DEBUG.Println(CLI, "exit StartSubscription")
return r, nil
DEBUG.Println(CLI, "exit SubscribeMultiple")
return nil
}

// EndSubscription will end the subscription from each of the topics provided.
// Unsubscribe will end the subscription from each of the topics provided.
// Messages published to those topics from other clients will no longer be
// received.
func (c *MqttClient) EndSubscription(topics ...string) (<-chan Receipt, error) {
func (c *MqttClient) Unsubscribe(topics ...string) error {
DEBUG.Println(CLI, "enter Unsubscribe")
if !c.IsConnected() {
return nil, ErrNotConnected
return ErrNotConnected
}
DEBUG.Println(CLI, "enter EndSubscription")
usmsg := newUnsubscribeMsg(topics...)

r := make(chan Receipt, 1)
u := NewControlPacket(UNSUBSCRIBE).(*UnsubscribePacket)
u.Topics = make([]string, len(topics))
copy(u.Topics, topics)

c.obound <- sendable{usmsg, r}
c.oboundP <- u
for _, topic := range topics {
c.options.msgRouter.deleteRoute(topic)
}

DEBUG.Println(CLI, "exit EndSubscription")
return r, nil
DEBUG.Println(CLI, "exit Unsubscribe")
return nil
}

0 comments on commit 33499ed

Please sign in to comment.