/
client_options.go
151 lines (122 loc) · 4.22 KB
/
client_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package mqtt
import (
"context"
"crypto/tls"
"net/http"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/RedHatInsights/cloud-connector/internal/platform/logger"
"github.com/RedHatInsights/cloud-connector/internal/platform/utils/jwt_utils"
"github.com/sirupsen/logrus"
)
const akamaiTokenHeader = "X-Akamai-DCP-Token"
type MqttClientOptionsFunc func(*MQTT.ClientOptions) error
func WithJwtAsHttpHeader(tokenGenerator jwt_utils.JwtGenerator) MqttClientOptionsFunc {
return func(opts *MQTT.ClientOptions) error {
headers := http.Header{}
jwtToken, err := tokenGenerator(context.Background())
if err != nil {
logger.Log.WithFields(logrus.Fields{"error": err}).Error("Unable to retrieve the JWT Token for the MQTT broker connection")
return err
}
headers.Add(akamaiTokenHeader, jwtToken)
logger.Log.Trace("Setting the MQTT JWT HTTP header")
opts.SetHTTPHeaders(headers)
return nil
}
}
func WithJwtReconnectingHandler(tokenGenerator jwt_utils.JwtGenerator) MqttClientOptionsFunc {
//'Reconnecting' handler is called prior to a reconnection attempt , before 'Reconnect' handlers
return func(opts *MQTT.ClientOptions) error {
tokenRefresher := func(c MQTT.Client, opts *MQTT.ClientOptions) {
logger.Log.Info("Attempting JWT token refresh")
jwtToken, err := tokenGenerator(context.Background())
if err != nil {
logger.Log.WithFields(logrus.Fields{"error": err}).Error("Unable to refresh the JWT Token for the MQTT broker connection")
} else {
opts.HTTPHeaders.Set(akamaiTokenHeader, jwtToken)
}
}
logger.Log.Trace("Setting MQTT JWT reconnecting handler")
opts.SetReconnectingHandler(tokenRefresher)
return nil
}
}
func WithTlsConfig(tlsConfig *tls.Config) MqttClientOptionsFunc {
return func(opts *MQTT.ClientOptions) error {
logger.Log.Trace("Setting the TLS config")
opts.SetTLSConfig(tlsConfig)
return nil
}
}
func WithClientID(clientID string) MqttClientOptionsFunc {
return func(opts *MQTT.ClientOptions) error {
logger.Log.Trace("Setting the MQTT client-id: ", clientID)
opts.SetClientID(clientID)
return nil
}
}
func WithCleanSession(cleanSession bool) MqttClientOptionsFunc {
return func(opts *MQTT.ClientOptions) error {
logger.Log.Tracef("Setting the clean session: %v\n", cleanSession)
opts.SetCleanSession(cleanSession)
return nil
}
}
func WithResumeSubs(resumeSubs bool) MqttClientOptionsFunc {
return func(opts *MQTT.ClientOptions) error {
logger.Log.Tracef("Setting the resume subs: %v\n", resumeSubs)
opts.SetResumeSubs(resumeSubs)
return nil
}
}
func WithDefaultPublishHandler(msgHdlr MQTT.MessageHandler) MqttClientOptionsFunc {
return func(opts *MQTT.ClientOptions) error {
logger.Log.Trace("Setting the default publish handler")
opts.SetDefaultPublishHandler(msgHdlr)
return nil
}
}
func WithProtocolVersion(protocolVersion uint) MqttClientOptionsFunc {
return func(opts *MQTT.ClientOptions) error {
logger.Log.Trace("Setting the MQTT protocol version (paho specific version number): ", protocolVersion)
opts.SetProtocolVersion(protocolVersion)
return nil
}
}
func WithAutoReconnect(autoReconnect bool) MqttClientOptionsFunc {
return func(opts *MQTT.ClientOptions) error {
logger.Log.Tracef("Setting the auto-reconnect flag: %v\n", autoReconnect)
opts.SetAutoReconnect(autoReconnect)
return nil
}
}
func WithOnConnectHandler(handler func(MQTT.Client)) MqttClientOptionsFunc {
return func(opts *MQTT.ClientOptions) error {
logger.Log.Tracef("Setting the on-connect handler")
opts.SetOnConnectHandler(handler)
return nil
}
}
func WithConnectionLostHandler(handler func(MQTT.Client, error)) MqttClientOptionsFunc {
return func(opts *MQTT.ClientOptions) error {
logger.Log.Tracef("Setting the on-connection-lost handler")
opts.SetConnectionLostHandler(
// Decorate the user supplied handler function with the connection lost metric
func(c MQTT.Client, e error) {
metrics.mqttConnectionFailureCounter.Inc()
handler(c, e)
})
return nil
}
}
func NewBrokerOptions(brokerUrl string, opts ...MqttClientOptionsFunc) (*MQTT.ClientOptions, error) {
connOpts := MQTT.NewClientOptions()
connOpts.AddBroker(brokerUrl)
for _, opt := range opts {
err := opt(connOpts)
if err != nil {
return nil, err
}
}
return connOpts, nil
}