/
dqlite.go
133 lines (118 loc) · 4.33 KB
/
dqlite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package snaputil
import (
"context"
"fmt"
"net"
"strings"
"time"
"github.com/canonical/microk8s-cluster-agent/pkg/snap"
"gopkg.in/yaml.v2"
)
// DqliteCluster is the format of the dqlite cluster.yaml file.
type DqliteCluster []DqliteClusterNode
// DqliteClusterNode is a node in the dqlite cluster.
type DqliteClusterNode struct {
// Address is the address of the node in the cluster.
Address string `yaml:"Address"`
// ID is the unique identifier of the node in the cluster.
ID uint64 `yaml:"ID,omitempty"`
// NodeRole is the role of the node in the cluster.
// 0 -- Voter
// 1 -- StandBy
// 2 -- Spare
NodeRole int `yaml:"Role,omitempty"`
}
// GetDqliteCluster a list of all currently known dqlite cluster nodes.
func GetDqliteCluster(s snap.Snap) (DqliteCluster, error) {
clusterYaml, err := s.ReadDqliteClusterYaml()
if err != nil {
return DqliteCluster{}, fmt.Errorf("failed to read list of dqlite nodes: %w", err)
}
cluster := DqliteCluster{}
if err := yaml.Unmarshal([]byte(clusterYaml), &cluster); err != nil {
return DqliteCluster{}, fmt.Errorf("failed to parse list of dqlite nodes: %w", err)
}
return cluster, nil
}
// UpdateDqliteIP sets the local dqlite cluster node to bind to a new IP address.
func UpdateDqliteIP(ctx context.Context, s snap.Snap, host string) error {
infoYaml, err := s.ReadDqliteInfoYaml()
if err != nil {
return fmt.Errorf("failed to retrieve current node info: %w", err)
}
var node DqliteClusterNode
if err := yaml.Unmarshal([]byte(infoYaml), &node); err != nil {
return fmt.Errorf("invalid format for current node info: %w", err)
}
_, port, _ := net.SplitHostPort(node.Address)
nodeUpdate := DqliteClusterNode{
Address: net.JoinHostPort(host, port),
}
b, err := yaml.Marshal(nodeUpdate)
if err != nil {
return fmt.Errorf("failed to marshal current node info update: %w", err)
}
if err := s.WriteDqliteUpdateYaml(b); err != nil {
return fmt.Errorf("failed to create dqlite update file: %w", err)
}
if err := s.RestartService(ctx, "k8s-dqlite"); err != nil {
return fmt.Errorf("failed to restart k8s-dqlite service: %w", err)
}
return nil
}
// WaitForDqliteCluster queries the dqlite cluster nodes repeatedly until f(cluster) becomes true.
func WaitForDqliteCluster(ctx context.Context, s snap.Snap, f func(DqliteCluster) (bool, error)) (DqliteCluster, error) {
interval := time.NewTicker(time.Second)
for {
cluster, err := GetDqliteCluster(s)
if err != nil {
return DqliteCluster{}, err
}
ok, err := f(cluster)
if err != nil {
return DqliteCluster{}, fmt.Errorf("failed check for cluster condition: %w", err)
}
if ok {
return cluster, nil
}
select {
case <-ctx.Done():
return DqliteCluster{}, fmt.Errorf("timed out waiting for cluster condition: %w", ctx.Err())
case <-interval.C:
}
}
}
// MaybeUpdateDqliteBindAddress checks if the node is part of a dqlite cluster and updates it if necessary.
// It ensures the node's hostPort is included in the cluster configuration.
func MaybeUpdateDqliteBindAddress(ctx context.Context, snap snap.Snap, hostPort string, remoteIP string, findMatchingBindAddress func(string) (string, error)) error {
// Check node is not in cluster already.
dqliteCluster, err := WaitForDqliteCluster(ctx, snap, func(c DqliteCluster) (bool, error) {
return len(c) >= 1, nil
})
if err != nil {
return fmt.Errorf("failed to retrieve dqlite cluster nodes: %w", err)
}
for _, node := range dqliteCluster {
if strings.HasPrefix(node.Address, remoteIP+":") {
return fmt.Errorf("the joining node (%s) is already known to dqlite", remoteIP)
}
}
// Update dqlite cluster if needed
if len(dqliteCluster) == 1 && strings.HasPrefix(dqliteCluster[0].Address, "127.0.0.1:") {
newDqliteBindAddress, err := findMatchingBindAddress(hostPort)
if err != nil {
return fmt.Errorf("failed to find matching dqlite bind address for %v: %w", hostPort, err)
}
if err := UpdateDqliteIP(ctx, snap, newDqliteBindAddress); err != nil {
return fmt.Errorf("failed to update dqlite address to %q: %w", newDqliteBindAddress, err)
}
// Wait for dqlite cluster to come up with new address
_, err = WaitForDqliteCluster(ctx, snap, func(c DqliteCluster) (bool, error) {
return len(c) >= 1 && !strings.HasPrefix(c[0].Address, "127.0.0.1:"), nil
})
if err != nil {
return fmt.Errorf("failed waiting for dqlite cluster to come up: %w", err)
}
}
return nil
}