-
Notifications
You must be signed in to change notification settings - Fork 53
/
client.go
313 lines (258 loc) · 9.27 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
/********************************************************************************
* Copyright 2020 Dell Inc.
* Copyright (c) 2023 Intel Corporation
* Copyright (C) 2023 IOTech Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*******************************************************************************/
package redis
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"os"
"reflect"
"strings"
"sync"
"time"
"github.com/edgexfoundry/go-mod-messaging/v3/internal/pkg"
"github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"
)
const (
StandardTopicSeparator = "/"
RedisTopicSeparator = "."
StandardWildcard = "#"
SingleLevelWildcard = "+"
RedisWildcard = "*"
)
// Client MessageClient implementation which provides functionality for sending and receiving messages using
// Redis Pub/Sub.
type Client struct {
redisClient RedisClient
// Used to avoid multiple subscriptions to the same topic
existingTopics map[string]bool
mapMutex *sync.Mutex
}
// NewClient creates a new Client based on the provided configuration.
func NewClient(messageBusConfig types.MessageBusConfig) (Client, error) {
return NewClientWithCreator(messageBusConfig, NewGoRedisClientWrapper, tls.X509KeyPair, tls.LoadX509KeyPair,
x509.ParseCertificate, os.ReadFile, pem.Decode)
}
// NewClientWithCreator creates a new Client based on the provided configuration while allowing more control on the
// creation of the underlying entities such as certs, keys, and Redis clients
func NewClientWithCreator(
messageBusConfig types.MessageBusConfig,
creator RedisClientCreator,
pairCreator pkg.X509KeyPairCreator,
keyLoader pkg.X509KeyLoader,
caCertCreator pkg.X509CaCertCreator,
caCertLoader pkg.X509CaCertLoader,
pemDecoder pkg.PEMDecoder) (Client, error) {
// Parse Optional configuration properties
optionalClientConfiguration, err := NewClientConfiguration(messageBusConfig)
if err != nil {
return Client{}, err
}
// Parse TLS configuration properties
tlsConfigurationOptions := pkg.TlsConfigurationOptions{}
err = pkg.Load(messageBusConfig.Optional, &tlsConfigurationOptions)
if err != nil {
return Client{}, err
}
var client RedisClient
// Create underlying client to use when publishing
if !messageBusConfig.Broker.IsHostInfoEmpty() {
client, err = createRedisClient(
messageBusConfig.Broker.GetHostURL(),
optionalClientConfiguration,
tlsConfigurationOptions,
creator,
pairCreator,
keyLoader,
caCertCreator,
caCertLoader,
pemDecoder)
if err != nil {
return Client{}, err
}
}
return Client{
redisClient: client,
existingTopics: make(map[string]bool),
mapMutex: new(sync.Mutex),
}, nil
}
// Connect noop as preemptive connections are not needed.
func (c Client) Connect() error {
// No need to connect, connection pooling is handled by the underlying client.
return nil
}
// Publish sends the provided message to appropriate Redis Pub/Sub.
func (c Client) Publish(message types.MessageEnvelope, topic string) error {
if c.redisClient == nil {
return pkg.NewMissingConfigurationErr("Broker", "Unable to create a connection for publishing")
}
if topic == "" {
// Empty topics are not allowed for Redis
return pkg.NewInvalidTopicErr("", "Unable to publish to the invalid topic")
}
topic = convertToRedisTopicScheme(topic)
var err error
if err = c.redisClient.Send(topic, message); err != nil && strings.Contains(err.Error(), "EOF") {
// Redis may have been restarted and the first attempt will fail with EOF, so need to try again
err = c.redisClient.Send(topic, message)
}
return err
}
// Subscribe creates background processes which reads messages from the appropriate Redis Pub/Sub and sends to the
// provided channels
func (c Client) Subscribe(topics []types.TopicChannel, messageErrors chan error) error {
if c.redisClient == nil {
return pkg.NewMissingConfigurationErr("Broker", "Unable to create a connection for subscribing")
}
err := c.validateTopics(topics)
if err != nil {
return err
}
var wg sync.WaitGroup
for i := range topics {
wg.Add(1)
go func(topic types.TopicChannel) {
topicName := convertToRedisTopicScheme(topic.Topic)
messageChannel := topic.Messages
var previousErr error
err := c.redisClient.Subscribe(topicName)
wg.Done()
if err != nil {
messageErrors <- err
return
}
for {
message, err := c.redisClient.Receive(topicName)
// Make sure the topic is still subscribed before processing the message.
// If not cleanup and exit from the go func
c.mapMutex.Lock()
subscribed := c.existingTopics[topic.Topic]
if !subscribed {
delete(c.existingTopics, topic.Topic)
// Do the actual unsubscribe from Redis with the Redis style topic now that this go func can exit.
c.redisClient.Unsubscribe(topicName)
c.mapMutex.Unlock()
return
}
c.mapMutex.Unlock()
if err != nil {
// This handles case when getting same repeated error due to Redis connectivity issue
// Avoids starving of other threads/processes and recipient spamming the log file.
if previousErr != nil && reflect.DeepEqual(err, previousErr) {
time.Sleep(1 * time.Millisecond) // Sleep allows other threads to get time
continue
}
messageErrors <- err
previousErr = err
continue
}
previousErr = nil
message.ReceivedTopic = convertFromRedisTopicScheme(message.ReceivedTopic)
messageChannel <- *message
}
}(topics[i])
}
// Wait for all the subscribe go funcs to be spun up
// This is needed for the Request API since the subscribe must be spun up prior to the response being published.
wg.Wait()
return nil
}
func (c Client) Request(message types.MessageEnvelope, requestTopic string, responseTopicPrefix string, timeout time.Duration) (*types.MessageEnvelope, error) {
return pkg.DoRequest(c.Subscribe, c.Unsubscribe, c.Publish, message, requestTopic, responseTopicPrefix, timeout)
}
func (c Client) Unsubscribe(topics ...string) error {
c.mapMutex.Lock()
for _, topic := range topics {
c.existingTopics[topic] = false
}
c.mapMutex.Unlock()
for _, topic := range topics {
// Must publish a dummy message to kick the subscription go func out of the Receive() call
_ = c.Publish(types.MessageEnvelope{}, topic)
}
return nil
}
// Disconnect closes connections to the Redis server.
func (c Client) Disconnect() error {
var disconnectErrors []string
if c.redisClient != nil {
err := c.redisClient.Close()
if err != nil {
disconnectErrors = append(disconnectErrors, fmt.Sprintf("Unable to disconnect publish client: %v", err))
}
}
if len(disconnectErrors) > 0 {
return NewDisconnectErr(disconnectErrors)
}
return nil
}
func (c Client) validateTopics(topics []types.TopicChannel) error {
c.mapMutex.Lock()
defer c.mapMutex.Unlock()
// First validate all the topics are unique, i.e. not existing subscription
for _, topic := range topics {
_, exists := c.existingTopics[topic.Topic]
if exists {
return fmt.Errorf("subscription for '%s' topic already exists, must be unique", topic.Topic)
}
c.existingTopics[topic.Topic] = true
}
return nil
}
// createRedisClient helper function for creating RedisClient implementations.
func createRedisClient(
redisServerURL string,
optionalClientConfiguration OptionalClientConfiguration,
tlsConfigurationOptions pkg.TlsConfigurationOptions,
creator RedisClientCreator,
pairCreator pkg.X509KeyPairCreator,
keyLoader pkg.X509KeyLoader,
caCertCreator pkg.X509CaCertCreator,
caCertLoader pkg.X509CaCertLoader,
pemDecoder pkg.PEMDecoder) (RedisClient, error) {
tlsConfig, err := pkg.GenerateTLSForClientClientOptions(
redisServerURL,
tlsConfigurationOptions,
pairCreator,
keyLoader,
caCertCreator,
caCertLoader,
pemDecoder)
if err != nil {
return nil, err
}
return creator(redisServerURL, optionalClientConfiguration.Password, tlsConfig)
}
func convertToRedisTopicScheme(topic string) string {
// Redis Pub/Sub uses "." for separator and "*" for wild cards.
// Since we have standardized on the MQTT style scheme of "/" & "#" & "+" we need to
// convert it to the Redis Pub/Sub scheme.
topic = strings.Replace(topic, StandardTopicSeparator, RedisTopicSeparator, -1)
topic = strings.Replace(topic, StandardWildcard, RedisWildcard, -1)
topic = strings.Replace(topic, SingleLevelWildcard, RedisWildcard, -1)
return topic
}
func convertFromRedisTopicScheme(topic string) string {
// Redis Pub/Sub uses "." for separator and "*" for wild cards.
// Since we have standardized on the MQTT style scheme of "/" & "#" & "+" we need to
// convert it from the Redis Pub/Sub scheme.
topic = strings.Replace(topic, RedisWildcard+RedisTopicSeparator, SingleLevelWildcard+StandardTopicSeparator, -1)
topic = strings.Replace(topic, RedisTopicSeparator, StandardTopicSeparator, -1)
topic = strings.Replace(topic, RedisWildcard, StandardWildcard, -1)
return topic
}