-
Notifications
You must be signed in to change notification settings - Fork 0
/
connect.go
68 lines (63 loc) · 1.94 KB
/
connect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package mqttx
import (
"errors"
"sync"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// Connect 初始化MQTT服务
func Connect(pool *MQTTxClientPool, servers []*MQTTxServer, defaultPublishHandler mqtt.MessageHandler, onConnectHandler mqtt.OnConnectHandler, connectionLostHandler mqtt.ConnectionLostHandler, reconnectingHandler mqtt.ReconnectHandler) error {
if pool == nil {
return errors.New("MQTTx client pool is nil")
}
if len(servers) == 0 {
return errors.New("MQTTx servers is empty")
}
for k, v := range servers {
client, err := NewMQTTxClient(v)
if err != nil {
return err
}
TraceDebug("MQTT Client(%v - %v) Connect...: %v", k, v.Vendor, client)
if defaultPublishHandler == nil {
defaultPublishHandler = MQTTxHandlerDefault
}
if onConnectHandler == nil {
onConnectHandler = MQTTxHandlerOnConnect
}
if connectionLostHandler == nil {
connectionLostHandler = MQTTxHandlerConnectionLost
}
if reconnectingHandler == nil {
reconnectingHandler = MQTTxHandlerReconnecting
}
err = client.Connect(defaultPublishHandler, onConnectHandler, connectionLostHandler, reconnectingHandler)
if err != nil {
return err
}
pool.Add(client)
}
return nil
}
// NewMQTTxClient 根据服务器的信息,创建MQTT客户端
func NewMQTTxClient(server *MQTTxServer) (*MQTTxClient, error) {
if server == nil {
return nil, errors.New("server is nil")
}
client := &MQTTxClient{
Vendor: server.Vendor,
Scheme: server.Scheme,
Domain: server.Domain,
IP: server.IP,
Port: server.Port,
Cert: server.Cert,
ClientID: server.ClientID,
Username: server.Username,
Password: server.Password,
Opts: nil,
Client: nil,
ServerConnectionCount: 0,
OtherOpts: map[string]string{},
otherOptsMux: &sync.RWMutex{},
}
return client, nil
}