/
redis.go
128 lines (100 loc) · 2.86 KB
/
redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package service
import (
"context"
"encoding/json"
"github.com/go-redis/redis/v9"
)
func NewRedisClient(ctx context.Context, uri string) (*redis.Client, error) {
opt, err := redis.ParseURL(uri)
if err != nil {
return nil, err
}
client := redis.NewClient(opt)
_, err = client.Ping(ctx).Result()
if err != nil {
return nil, err
}
return client, nil
}
// RedisRegistry implements the ServiceRegistry interface using Redis as a backend storage
type RedisRegistry struct {
*baseRegistry
redisClient *redis.Client
namespace string
ctx context.Context
}
// NewRedisRegistry creates a new RedisRegistry instance with a Redis client connection
func NewRedisRegistry(ctx context.Context, redisClient *redis.Client, namespace string, br *baseRegistry) (*RedisRegistry, error) {
r := &RedisRegistry{
baseRegistry: br,
redisClient: redisClient,
ctx: ctx,
namespace: namespace}
if err := r.loadServices(); err != nil {
return nil, err
}
return r, nil
}
// AddService adds a new backend service to Redis
func (r *RedisRegistry) AddService(service *BackendService) error {
r.mutex.Lock()
defer r.mutex.Unlock()
serviceJSON, err := json.Marshal(service)
if err != nil {
return err
}
err = r.addService(service, func() error {
_, err = r.redisClient.RPush(r.ctx, "services", serviceJSON).Result()
return err
})
return err
}
// UpdateService updates an existing backend service in Redis
func (r *RedisRegistry) UpdateService(service *BackendService) error {
r.mutex.Lock()
defer r.mutex.Unlock()
serviceJSON, err := json.Marshal(service)
if err != nil {
return err
}
err = r.updateService(service, func() error {
return r.redisClient.LSet(r.ctx, "services", int64(len(r.services)-1), serviceJSON).Err()
})
return err
}
// RemoveService removes a backend service from Redis
func (r *RedisRegistry) RemoveService(name string) error {
r.mutex.Lock()
defer r.mutex.Unlock()
err := r.removeService(name, func() error {
return r.redisClient.LRem(r.ctx, "services", 0, name).Err()
})
return err
}
// GetServices returns a copy of the current list of backend services
func (r *RedisRegistry) GetServices() []*BackendService {
r.mutex.RLock()
defer r.mutex.RUnlock()
return r.getServices()
}
// ensureListExists creates the services list if it does not exist
func (r *RedisRegistry) ensureListExists() error {
return r.redisClient.Do(r.ctx, "PING").Err()
}
// loadServices retrieves the list of backend services from Redis
func (r *RedisRegistry) loadServices() error {
services, err := r.redisClient.LRange(r.ctx, "services", 0, -1).Result()
if err != nil {
return err
}
for _, service := range services {
var backendService BackendService
err = json.Unmarshal([]byte(service), &backendService)
if err != nil {
return err
}
backendService.Init()
r.services = append(r.services, &backendService)
}
return nil
}