Skip to content

Commit

Permalink
✨ Basic MQTT functionality, CLI
Browse files Browse the repository at this point in the history
This includes the basic MQTT functionality needed to communicate with
a broker and setup the necessary channels. It also comes with a basic
CLI that right now just connects to the broker and implements the
discovery feature.

The `mosquitto.config` is included for use with the `eclipes-mosquitto`
Docker image for development purposes. Use it something like this:

```
docker run -it -p 1883:1883 -p 9001:9001 \
  -v $PWD/mosquitto.config:/mosquitto/config/mosquitto.conf:ro \
  eclipse-mosquitto
```
  • Loading branch information
daenney committed Aug 6, 2017
1 parent 6dd250d commit e6a0cf4
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -15,3 +15,6 @@

# Go vendor directory
vendor/

# Binary
cmd/hemtjanst
33 changes: 33 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions Gopkg.toml
@@ -0,0 +1,11 @@
[[constraint]]
name = "github.com/eclipse/paho.mqtt.golang"
version = "1.1.0"

[[constraint]]
name = "github.com/satori/go.uuid"
version = "1.1.0"

[[constraint]]
name = "github.com/spf13/pflag"
version = "1.0.0"
89 changes: 89 additions & 0 deletions cmd/hemtjanst.go
@@ -0,0 +1,89 @@
package main

import (
"fmt"
"git.neotor.se/daenney/hemtjanst/messaging"
"github.com/satori/go.uuid"
flag "github.com/spf13/pflag"
"log"
"os"
"os/signal"
//"strings"
"syscall"
"time"
)

var (
addr = flag.StringP("address", "a", "127.0.0.1", "IP or hostname for Hemtjänst to bind on")
port = flag.IntP("port", "p", 1234, "Port for Hemtjänst to bind on")
bAddr = flag.String("broker.address", "127.0.0.1", "IP or hostname of the MQTT broker")
bPort = flag.Int("broker.port", 1883, "Port the MQTT broker listens on")
cTimeout = flag.Int("broker.connection-timeout", 10, "Connection timeout in seconds")
keepAlive = flag.Int("broker.keepalive", 5, "Time in seconds between each PING packet")
maxReconInterval = flag.Int("broker.max-reconnect-interval", 2, "Maximum time in minutes to wait between reconnect attemps")
pTimeout = flag.Int("broker.ping-timeout", 10, "Time in seconds after which a ping times out")
wTimeout = flag.Int("broker.write-timeout", 5, "Time in seconds after which a write will time out")
)

func main() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage of %s:\n\n", os.Args[0])
fmt.Fprintf(os.Stderr, "Parameters:\n\n")
flag.PrintDefaults()
fmt.Fprintf(os.Stderr, "\n")
}
flag.Parse()
uid := uuid.NewV4()

log.Print("Initialing Hemtjänst")
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
announce := make(chan []byte)
leave := make(chan []byte)

log.Print("Attempting to connect to MQTT broker")
c := messaging.NewMQTTClient(
announce, leave,
*bAddr, *bPort,
time.Duration(*cTimeout),
time.Duration(*keepAlive),
time.Duration(*maxReconInterval),
time.Duration(*pTimeout),
time.Duration(*wTimeout),
uid.String(),
)
go func() {
if token := c.Connect(); token.Wait() && token.Error() != nil {
log.Fatal("Failed to establish connection with broker: ", token.Error())
}
}()

loop:
for {
select {
case msg := <-announce:
log.Print(msg)
case msg := <-leave:
log.Print(msg)
case sig := <-quit:
log.Printf("Received signal: %s, proceeding to shutdown", sig)
break loop
}
}

// When the MQTT lib is connecting but hasn't establish a conneciton yet
// the IsConnected() method returns true. As a consequence, b/c it believes
// it is connected the call to .Disconnect() will panic if we terminate
// before we've managed to establish a connection to the broker, as it
// tries to close one of its own channels that are currently still nil.
//
// To avoid getting an ugly panic printed for what is arguably a bug in the
// library defer a recover that does nothing and exit normally.
defer func() {
recover()
}()

c.Disconnect(250)
log.Print("Disconnected from broker. Bye!")
os.Exit(0)
}
23 changes: 23 additions & 0 deletions messaging/messaging.go
@@ -0,0 +1,23 @@
package messaging

// Publisher publishes messages on a transport
type Publisher interface {
Publish(destination string, message []byte, qos int, persist bool)
}

// Subscriber receives messages from a transport
type Subscriber interface {
Subscribe(source string, qos int)
Unsubscribe(sources ...string)
}

// PublishSubscriber can both publish and receives messages from a transport
type PublishSubscriber interface {
Publisher
Subscriber
}

type Message interface {
Topic() string
Payload() []byte
}
148 changes: 148 additions & 0 deletions messaging/mqtt.go
@@ -0,0 +1,148 @@
package messaging

import (
"fmt"
mq "github.com/eclipse/paho.mqtt.golang"
"log"
"time"
)

type mqttMessenger struct {
client mq.Client
recv chan Message
}

type handler struct {
ann chan []byte
leave chan []byte
}

// RetryWithBackoff will retry the operation for the amount of attempts. The
// backoff time gets multiplied by the attempt to create an exponential backoff.
//
// Returns an error if the operation still failed after the specified amount of
// attempts have been executed.
func RetryWithBackoff(attempts int, backoff time.Duration, callback func() error) (err error) {
for i := 1; ; i++ {
err = callback()
if err == nil {
log.Print("Operation succeed at attempt: ", i)
return
}

if i >= attempts {
break
}
backoff = time.Duration(i) * backoff
log.Printf("Operation failed with error: %s. Going to reattempt in %d seconds", err, backoff/time.Second)
time.Sleep(backoff)
}
return fmt.Errorf("Operation failed after %d attempts, last error: %s", attempts, err)
}

// NewMQTTClient configues an MQTT client according to our needs. This
// client can then be passed to NewMQTTMessenger.
func NewMQTTClient(
announce chan []byte,
leave chan []byte,
addr string,
port int,
connectTimeout time.Duration,
keepAlive time.Duration,
maxReconnectInterval time.Duration,
pingTimeout time.Duration,
writeTimeout time.Duration,
identifier string,
) mq.Client {
h := &handler{
ann: announce,
leave: leave,
}
opts := mq.NewClientOptions().
AddBroker(fmt.Sprintf("tcp://%s:%d", addr, port)).
SetClientID("hemtjänst").
SetConnectTimeout(connectTimeout*time.Second).
SetKeepAlive(keepAlive*time.Second).
SetMaxReconnectInterval(maxReconnectInterval*time.Minute).
SetMessageChannelDepth(100).
SetPingTimeout(pingTimeout*time.Second).
SetProtocolVersion(4).
SetWill("leave", identifier, 0, false).
SetWriteTimeout(writeTimeout * time.Second).
SetOnConnectHandler(h.onConnect).
SetConnectionLostHandler(h.onConnectionLost)
return mq.NewClient(opts)
}

// onConnect gets executed when we've established a connection with the MQTT
// broker, regardless of if this was our first attempt or after a reconnect.
func (h *handler) onConnect(c mq.Client) {
log.Print("Connected to MQTT broker")

log.Print("Attempting to subscribe to announce topic")
err := RetryWithBackoff(5, 2*time.Second, func() error {
token := c.Subscribe("announce", 1, func(client mq.Client, msg mq.Message) {
h.ann <- msg.Payload()
})
token.Wait()
return token.Error()
})
if err != nil {
log.Fatal("Could not subscribe to announce topic")
}
log.Print("Subscribed to announce topic")

log.Print("Attempting to publish to discover topic")
err = RetryWithBackoff(5, 2*time.Second, func() error {
token := c.Publish("discover", 1, true, "1")
token.Wait()
return token.Error()
})
if err != nil {
log.Fatal("Could not publish to discover topic")
}
log.Print("Initiated discovery")
}

// onConnectionLost gets triggered whenver we unexpectedly lose connection with
// the MQTT broker.
func (h *handler) onConnectionLost(c mq.Client, e error) {
log.Print("Unexpectedly lost connection to MQTT broker, attempting to reconnect")
}

// NewMQTTMessenger returns a PublishSubscriber.
//
// It expects to be given something that looks like an MQTT Client and
// a channel on which it will publish any messages from topics to which
// we have subscribed.
//
// It allows for publishing messages to a topic on an MQTT broker, to
// subscribe to messages published to topics and to unsubscribe from topic.
func NewMQTTMessenger(client mq.Client, recv chan Message) PublishSubscriber {
return &mqttMessenger{
client: client,
recv: recv,
}
}

// Publish publishes a msg on the specified topic. qos represents the MQTT QoS
// level and retain informs the broker that it needs to persist this message so
// that when a new client subscribes to the topic we published on they will
// automatically get that message.
func (m *mqttMessenger) Publish(topic string, msg []byte, qos int, retain bool) {
m.client.Publish(topic, byte(qos), retain, msg)
}

// Subscribe subscribes to the specified topic with a certain qos. The topic
// and message are then passed into this messenger's recv channel and can be
// read from by any interested consumer.
func (m *mqttMessenger) Subscribe(topic string, qos int) {
m.client.Subscribe(topic, byte(qos), func(c mq.Client, msg mq.Message) {
m.recv <- msg
})
}

// Unsubscribe unsubscribes from one or multiple topics.
func (m *mqttMessenger) Unsubscribe(topics ...string) {
m.client.Unsubscribe(topics...)
}
4 changes: 4 additions & 0 deletions mosquitto.config
@@ -0,0 +1,4 @@
log_dest stdout
log_type all

max_inflight_messages 300

0 comments on commit e6a0cf4

Please sign in to comment.