-
Notifications
You must be signed in to change notification settings - Fork 1
/
kubemeta.go
223 lines (187 loc) · 6.94 KB
/
kubemeta.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package cluster
import (
"fmt"
"os"
"strconv"
"strings"
"time"
"google.golang.org/grpc"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"github.com/chuyangliu/rawkv/pkg/logging"
"github.com/chuyangliu/rawkv/pkg/pb"
)
// Environment variable to initialize a KubeMeta.
const (
envPodName = "RAWKV_POD_NAME"
envServiceName = "RAWKV_SERVICE_NAME"
envNamespace = "RAWKV_NAMESPACE"
envStoragePort = "RAWKV_STORAGE_PORT"
envRaftPort = "RAWKV_RAFT_PORT"
)
// Assert *KubeMeta implements Meta.
var _ Meta = (*KubeMeta)(nil)
// KubeMeta implements Meta interface for a kubernetes cluster.
// An example of cluster config can be found at kubernetes/rawkv.yaml.
type KubeMeta struct {
logger *logging.Logger
id int32 // Current node id.
size int32 // Number of nodes in the cluster.
storageClients []pb.StorageClient // Storage grpc clients to communicate with other nodes (indexed by node id).
raftClients []pb.RaftClient // Raft grpc clients to communicate with other nodes (indexed by node id).
}
// NewKubeMeta creates a KubeMeta with given logging level.
func NewKubeMeta(level int) (*KubeMeta, error) {
km := &KubeMeta{
logger: logging.New(level),
}
if err := km.init(); err != nil {
return nil, fmt.Errorf("Initialize kubernetes cluster meta failed | err=[%w]", err)
}
km.logger.Info("Kubernetes cluster meta created | meta=%v", km)
return km, nil
}
// String returns a string representation of the KubeMeta.
func (km *KubeMeta) String() string {
return fmt.Sprintf("[id=%v | size=%v]", km.id, km.size)
}
// init initializes the KubeMeta.
func (km *KubeMeta) init() error {
// Get pod name.
podName := os.Getenv(envPodName)
if len(podName) == 0 {
return fmt.Errorf("Get pod name failed | env=%v | val=%v", envPodName, podName)
}
km.logger.Info("Read pod name env | val=%v", podName)
// Get current node id.
pos := strings.LastIndex(podName, "-")
if pos < 0 || pos == len(podName) {
return fmt.Errorf("Invalid pod name | podName=%v", podName)
}
id, err := strconv.Atoi(podName[pos+1:])
if err != nil || id < 0 {
return fmt.Errorf("Invalid node id | podName=%v | err=[%w]", podName, err)
}
km.id = int32(id)
// Get service name.
serviceName := os.Getenv(envServiceName)
if len(serviceName) == 0 {
return fmt.Errorf("Get service name failed | env=%v | val=%v", envServiceName, serviceName)
}
km.logger.Info("Read service name env | val=%v", serviceName)
// Get namespace.
namespace := os.Getenv(envNamespace)
if len(namespace) == 0 {
return fmt.Errorf("Get namespace failed | env=%v | val=%v", envNamespace, namespace)
}
km.logger.Info("Read namespace env | val=%v", namespace)
// Init cluster size.
if err := km.initSize(namespace); err != nil {
return fmt.Errorf("Initialize cluster size failed | err=[%w]", err)
}
// Create network address format.
// Reference: https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#stable-network-id
addrFmt := fmt.Sprintf("%s.%s.%s.%s.%s:%s",
podName[:pos+1]+"%v", serviceName, namespace, "svc", "cluster.local", "%v")
// Get port number of storage grpc server.
storagePort := os.Getenv(envStoragePort)
if len(storagePort) == 0 {
return fmt.Errorf("Get storage port failed | env=%v | val=%v", envStoragePort, storagePort)
}
km.logger.Info("Read storage port env | val=%v", storagePort)
// Get port number of raft grpc server.
raftPort := os.Getenv(envRaftPort)
if len(raftPort) == 0 {
return fmt.Errorf("Get raft port failed | env=%v | val=%v", envRaftPort, raftPort)
}
km.logger.Info("Read raft port env | val=%v", raftPort)
// Establish grpc connections with other storage servers in the cluster.
if err := km.initStorageClients(addrFmt, storagePort); err != nil {
return fmt.Errorf("Create storage grpc clients failed | err=[%w]", err)
}
// Establish grpc connections with other raft servers in the cluster.
if err := km.initRaftClients(addrFmt, raftPort); err != nil {
return fmt.Errorf("Create raft grpc clients failed | err=[%w]", err)
}
return nil
}
// initSize initializes the cluster size field of KubeMeta.
// The method waits until there are at least three nodes in the cluster before returning.
func (km *KubeMeta) initSize(namespace string) error {
for {
config, err := rest.InClusterConfig()
if err != nil {
return fmt.Errorf("Create in-cluster config failed | namespace=%v | err=[%w]", namespace, err)
}
clients, err := kubernetes.NewForConfig(config)
if err != nil {
return fmt.Errorf("Create client set failed | namespace=%v | err=[%w]", namespace, err)
}
pods, err := clients.CoreV1().Pods(namespace).List(metav1.ListOptions{})
if err != nil {
return fmt.Errorf("List pods info failed | namespace=%v | err=[%w]", namespace, err)
}
km.size = int32(len(pods.Items))
if km.size >= 3 {
break
}
km.logger.Warn("Require at least three nodes in the cluster for fault tolerance. Retry after 1 second"+
" | meta=%v", km)
time.Sleep(1 * time.Second)
}
return nil
}
// initStorageClients initializes storage grpc clients to communicate with each other node.
func (km *KubeMeta) initStorageClients(addrFmt string, storagePort string) error {
km.storageClients = make([]pb.StorageClient, km.size)
for id := int32(0); id < km.size; id++ {
if id != km.id {
targetAddr := fmt.Sprintf(addrFmt, id, storagePort)
conn, err := grpc.Dial(targetAddr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return fmt.Errorf("Connect storage grpc server failed | targetAddr=%v | err=[%w]", targetAddr, err)
}
km.storageClients[id] = pb.NewStorageClient(conn)
}
}
return nil
}
// initRaftClients initializes raft grpc clients to communicate with each other node.
func (km *KubeMeta) initRaftClients(addrFmt string, raftPort string) error {
km.raftClients = make([]pb.RaftClient, km.size)
for id := int32(0); id < km.size; id++ {
if id != km.id {
targetAddr := fmt.Sprintf(addrFmt, id, raftPort)
conn, err := grpc.Dial(targetAddr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return fmt.Errorf("Connect raft grpc server failed | targetAddr=%v | err=[%w]", targetAddr, err)
}
km.raftClients[id] = pb.NewRaftClient(conn)
}
}
return nil
}
// ---------------------------
// NodeProvider Implementation
// ---------------------------
// NodeIDNil returns the nil value of node id.
func (km *KubeMeta) NodeIDNil() int32 {
return -1
}
// NodeIDSelf returns the id of current node.
func (km *KubeMeta) NodeIDSelf() int32 {
return km.id
}
// Size returns the number of nodes in the cluster.
func (km *KubeMeta) Size() int32 {
return km.size
}
// StorageClient returns the storage grpc client to communicate with the node with given id.
func (km *KubeMeta) StorageClient(id int32) pb.StorageClient {
return km.storageClients[id]
}
// RaftClient returns the raft grpc client to communicate with the node with given id.
func (km *KubeMeta) RaftClient(id int32) pb.RaftClient {
return km.raftClients[id]
}