From e6a0cf4e7525d9fe9789d472692e7f19adff17f3 Mon Sep 17 00:00:00 2001 From: Daniele Sluijters Date: Mon, 7 Aug 2017 00:10:06 +0200 Subject: [PATCH] :sparkles: Basic MQTT functionality, CLI This includes the basic MQTT functionality needed to communicate with a broker and setup the necessary channels. It also comes with a basic CLI that right now just connects to the broker and implements the discovery feature. The `mosquitto.config` is included for use with the `eclipes-mosquitto` Docker image for development purposes. Use it something like this: ``` docker run -it -p 1883:1883 -p 9001:9001 \ -v $PWD/mosquitto.config:/mosquitto/config/mosquitto.conf:ro \ eclipse-mosquitto ``` --- .gitignore | 3 + Gopkg.lock | 33 +++++++++ Gopkg.toml | 11 +++ cmd/hemtjanst.go | 89 +++++++++++++++++++++++++ messaging/messaging.go | 23 +++++++ messaging/mqtt.go | 148 +++++++++++++++++++++++++++++++++++++++++ mosquitto.config | 4 ++ 7 files changed, 311 insertions(+) create mode 100644 Gopkg.lock create mode 100644 Gopkg.toml create mode 100644 cmd/hemtjanst.go create mode 100644 messaging/messaging.go create mode 100644 messaging/mqtt.go create mode 100644 mosquitto.config diff --git a/.gitignore b/.gitignore index 716ab12..be55e31 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,6 @@ # Go vendor directory vendor/ + +# Binary +cmd/hemtjanst diff --git a/Gopkg.lock b/Gopkg.lock new file mode 100644 index 0000000..6036b24 --- /dev/null +++ b/Gopkg.lock @@ -0,0 +1,33 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + name = "github.com/eclipse/paho.mqtt.golang" + packages = [".","packets"] + revision = "aff15770515e3c57fc6109da73d42b0d46f7f483" + version = "v1.1.0" + +[[projects]] + name = "github.com/satori/go.uuid" + packages = ["."] + revision = "879c5887cd475cd7864858769793b2ceb0d44feb" + version = "v1.1.0" + +[[projects]] + name = "github.com/spf13/pflag" + packages = ["."] + revision = "e57e3eeb33f795204c1ca35f56c44f83227c6e66" + version = "v1.0.0" + +[[projects]] + branch = "master" + name = "golang.org/x/net" + packages = ["proxy","websocket"] + revision = "f5079bd7f6f74e23c4d65efa0f4ce14cbd6a3c0f" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + inputs-digest = "513176cb05513ba037131c18ac0f9867b36761e913128305e3a4c830e5eb5333" + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml new file mode 100644 index 0000000..c5e5572 --- /dev/null +++ b/Gopkg.toml @@ -0,0 +1,11 @@ +[[constraint]] + name = "github.com/eclipse/paho.mqtt.golang" + version = "1.1.0" + +[[constraint]] + name = "github.com/satori/go.uuid" + version = "1.1.0" + +[[constraint]] + name = "github.com/spf13/pflag" + version = "1.0.0" diff --git a/cmd/hemtjanst.go b/cmd/hemtjanst.go new file mode 100644 index 0000000..2f1dfc6 --- /dev/null +++ b/cmd/hemtjanst.go @@ -0,0 +1,89 @@ +package main + +import ( + "fmt" + "git.neotor.se/daenney/hemtjanst/messaging" + "github.com/satori/go.uuid" + flag "github.com/spf13/pflag" + "log" + "os" + "os/signal" + //"strings" + "syscall" + "time" +) + +var ( + addr = flag.StringP("address", "a", "127.0.0.1", "IP or hostname for Hemtjänst to bind on") + port = flag.IntP("port", "p", 1234, "Port for Hemtjänst to bind on") + bAddr = flag.String("broker.address", "127.0.0.1", "IP or hostname of the MQTT broker") + bPort = flag.Int("broker.port", 1883, "Port the MQTT broker listens on") + cTimeout = flag.Int("broker.connection-timeout", 10, "Connection timeout in seconds") + keepAlive = flag.Int("broker.keepalive", 5, "Time in seconds between each PING packet") + maxReconInterval = flag.Int("broker.max-reconnect-interval", 2, "Maximum time in minutes to wait between reconnect attemps") + pTimeout = flag.Int("broker.ping-timeout", 10, "Time in seconds after which a ping times out") + wTimeout = flag.Int("broker.write-timeout", 5, "Time in seconds after which a write will time out") +) + +func main() { + flag.Usage = func() { + fmt.Fprintf(os.Stderr, "Usage of %s:\n\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Parameters:\n\n") + flag.PrintDefaults() + fmt.Fprintf(os.Stderr, "\n") + } + flag.Parse() + uid := uuid.NewV4() + + log.Print("Initialing Hemtjänst") + quit := make(chan os.Signal) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + announce := make(chan []byte) + leave := make(chan []byte) + + log.Print("Attempting to connect to MQTT broker") + c := messaging.NewMQTTClient( + announce, leave, + *bAddr, *bPort, + time.Duration(*cTimeout), + time.Duration(*keepAlive), + time.Duration(*maxReconInterval), + time.Duration(*pTimeout), + time.Duration(*wTimeout), + uid.String(), + ) + go func() { + if token := c.Connect(); token.Wait() && token.Error() != nil { + log.Fatal("Failed to establish connection with broker: ", token.Error()) + } + }() + +loop: + for { + select { + case msg := <-announce: + log.Print(msg) + case msg := <-leave: + log.Print(msg) + case sig := <-quit: + log.Printf("Received signal: %s, proceeding to shutdown", sig) + break loop + } + } + + // When the MQTT lib is connecting but hasn't establish a conneciton yet + // the IsConnected() method returns true. As a consequence, b/c it believes + // it is connected the call to .Disconnect() will panic if we terminate + // before we've managed to establish a connection to the broker, as it + // tries to close one of its own channels that are currently still nil. + // + // To avoid getting an ugly panic printed for what is arguably a bug in the + // library defer a recover that does nothing and exit normally. + defer func() { + recover() + }() + + c.Disconnect(250) + log.Print("Disconnected from broker. Bye!") + os.Exit(0) +} diff --git a/messaging/messaging.go b/messaging/messaging.go new file mode 100644 index 0000000..58820ea --- /dev/null +++ b/messaging/messaging.go @@ -0,0 +1,23 @@ +package messaging + +// Publisher publishes messages on a transport +type Publisher interface { + Publish(destination string, message []byte, qos int, persist bool) +} + +// Subscriber receives messages from a transport +type Subscriber interface { + Subscribe(source string, qos int) + Unsubscribe(sources ...string) +} + +// PublishSubscriber can both publish and receives messages from a transport +type PublishSubscriber interface { + Publisher + Subscriber +} + +type Message interface { + Topic() string + Payload() []byte +} diff --git a/messaging/mqtt.go b/messaging/mqtt.go new file mode 100644 index 0000000..96c4eb2 --- /dev/null +++ b/messaging/mqtt.go @@ -0,0 +1,148 @@ +package messaging + +import ( + "fmt" + mq "github.com/eclipse/paho.mqtt.golang" + "log" + "time" +) + +type mqttMessenger struct { + client mq.Client + recv chan Message +} + +type handler struct { + ann chan []byte + leave chan []byte +} + +// RetryWithBackoff will retry the operation for the amount of attempts. The +// backoff time gets multiplied by the attempt to create an exponential backoff. +// +// Returns an error if the operation still failed after the specified amount of +// attempts have been executed. +func RetryWithBackoff(attempts int, backoff time.Duration, callback func() error) (err error) { + for i := 1; ; i++ { + err = callback() + if err == nil { + log.Print("Operation succeed at attempt: ", i) + return + } + + if i >= attempts { + break + } + backoff = time.Duration(i) * backoff + log.Printf("Operation failed with error: %s. Going to reattempt in %d seconds", err, backoff/time.Second) + time.Sleep(backoff) + } + return fmt.Errorf("Operation failed after %d attempts, last error: %s", attempts, err) +} + +// NewMQTTClient configues an MQTT client according to our needs. This +// client can then be passed to NewMQTTMessenger. +func NewMQTTClient( + announce chan []byte, + leave chan []byte, + addr string, + port int, + connectTimeout time.Duration, + keepAlive time.Duration, + maxReconnectInterval time.Duration, + pingTimeout time.Duration, + writeTimeout time.Duration, + identifier string, +) mq.Client { + h := &handler{ + ann: announce, + leave: leave, + } + opts := mq.NewClientOptions(). + AddBroker(fmt.Sprintf("tcp://%s:%d", addr, port)). + SetClientID("hemtjänst"). + SetConnectTimeout(connectTimeout*time.Second). + SetKeepAlive(keepAlive*time.Second). + SetMaxReconnectInterval(maxReconnectInterval*time.Minute). + SetMessageChannelDepth(100). + SetPingTimeout(pingTimeout*time.Second). + SetProtocolVersion(4). + SetWill("leave", identifier, 0, false). + SetWriteTimeout(writeTimeout * time.Second). + SetOnConnectHandler(h.onConnect). + SetConnectionLostHandler(h.onConnectionLost) + return mq.NewClient(opts) +} + +// onConnect gets executed when we've established a connection with the MQTT +// broker, regardless of if this was our first attempt or after a reconnect. +func (h *handler) onConnect(c mq.Client) { + log.Print("Connected to MQTT broker") + + log.Print("Attempting to subscribe to announce topic") + err := RetryWithBackoff(5, 2*time.Second, func() error { + token := c.Subscribe("announce", 1, func(client mq.Client, msg mq.Message) { + h.ann <- msg.Payload() + }) + token.Wait() + return token.Error() + }) + if err != nil { + log.Fatal("Could not subscribe to announce topic") + } + log.Print("Subscribed to announce topic") + + log.Print("Attempting to publish to discover topic") + err = RetryWithBackoff(5, 2*time.Second, func() error { + token := c.Publish("discover", 1, true, "1") + token.Wait() + return token.Error() + }) + if err != nil { + log.Fatal("Could not publish to discover topic") + } + log.Print("Initiated discovery") +} + +// onConnectionLost gets triggered whenver we unexpectedly lose connection with +// the MQTT broker. +func (h *handler) onConnectionLost(c mq.Client, e error) { + log.Print("Unexpectedly lost connection to MQTT broker, attempting to reconnect") +} + +// NewMQTTMessenger returns a PublishSubscriber. +// +// It expects to be given something that looks like an MQTT Client and +// a channel on which it will publish any messages from topics to which +// we have subscribed. +// +// It allows for publishing messages to a topic on an MQTT broker, to +// subscribe to messages published to topics and to unsubscribe from topic. +func NewMQTTMessenger(client mq.Client, recv chan Message) PublishSubscriber { + return &mqttMessenger{ + client: client, + recv: recv, + } +} + +// Publish publishes a msg on the specified topic. qos represents the MQTT QoS +// level and retain informs the broker that it needs to persist this message so +// that when a new client subscribes to the topic we published on they will +// automatically get that message. +func (m *mqttMessenger) Publish(topic string, msg []byte, qos int, retain bool) { + m.client.Publish(topic, byte(qos), retain, msg) +} + +// Subscribe subscribes to the specified topic with a certain qos. The topic +// and message are then passed into this messenger's recv channel and can be +// read from by any interested consumer. +func (m *mqttMessenger) Subscribe(topic string, qos int) { + m.client.Subscribe(topic, byte(qos), func(c mq.Client, msg mq.Message) { + m.recv <- msg + }) +} + +// Unsubscribe unsubscribes from one or multiple topics. +func (m *mqttMessenger) Unsubscribe(topics ...string) { + m.client.Unsubscribe(topics...) +} diff --git a/mosquitto.config b/mosquitto.config new file mode 100644 index 0000000..ceb4351 --- /dev/null +++ b/mosquitto.config @@ -0,0 +1,4 @@ +log_dest stdout +log_type all + +max_inflight_messages 300