forked from eclipse/paho.golang
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
111 lines (96 loc) · 3.09 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package main
// Connect to the broker, subscribe, and write messages received to a file
import (
"context"
"fmt"
"net/url"
"os"
"os/signal"
"syscall"
"time"
"github.com/Lorderot/paho.golang/autopaho"
"github.com/Lorderot/paho.golang/paho"
)
func main() {
cfg, err := getConfig()
if err != nil {
panic(err)
}
// Create a handler that will deal with incoming messages
h := NewHandler(cfg.writeToDisk, cfg.outputFileName, cfg.writeToStdOut)
defer h.Close()
cliCfg := autopaho.ClientConfig{
BrokerUrls: []*url.URL{cfg.serverURL},
KeepAlive: cfg.keepAlive,
ConnectRetryDelay: cfg.connectRetryDelay,
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
fmt.Println("mqtt connection up")
if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{Topic: cfg.topic, QoS: cfg.qos},
},
}); err != nil {
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
return
}
fmt.Println("mqtt subscription made")
},
OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) },
ClientConfig: paho.ClientConfig{
ClientID: cfg.clientID,
Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) {
h.handle(m)
}),
OnClientError: func(err error) { fmt.Printf("server requested disconnect: %s\n", err) },
OnServerDisconnect: func(d *paho.Disconnect) {
if d.Properties != nil {
fmt.Printf("server requested disconnect: %s\n", d.Properties.ReasonString)
} else {
fmt.Printf("server requested disconnect; reason code: %d\n", d.ReasonCode)
}
},
},
}
if cfg.debug {
cliCfg.Debug = logger{prefix: "autoPaho"}
cliCfg.PahoDebug = logger{prefix: "paho"}
}
//
// Connect to the broker
//
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm, err := autopaho.NewConnection(ctx, cliCfg)
if err != nil {
panic(err)
}
// Messages will be handled through the callback so we really just need to wait until a shutdown
// is requested
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
signal.Notify(sig, syscall.SIGTERM)
<-sig
fmt.Println("signal caught - exiting")
// We could cancel the context at this point but will call Disconnect instead (this waits for autopaho to shutdown)
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
defer cancel()
_ = cm.Disconnect(ctx)
fmt.Println("shutdown complete")
}
// logger implements the paho.Logger interface
type logger struct {
prefix string
}
// Println is the library provided NOOPLogger's
// implementation of the required interface function()
func (l logger) Println(v ...interface{}) {
fmt.Println(append([]interface{}{l.prefix + ":"}, v...)...)
}
// Printf is the library provided NOOPLogger's
// implementation of the required interface function(){}
func (l logger) Printf(format string, v ...interface{}) {
if len(format) > 0 && format[len(format)-1] != '\n' {
format = format + "\n" // some log calls in paho do not add \n
}
fmt.Printf(l.prefix+":"+format, v...)
}