-
Notifications
You must be signed in to change notification settings - Fork 40
/
pvc_replication_utils.go
154 lines (135 loc) · 4.95 KB
/
pvc_replication_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package nodes
import (
"context"
"fmt"
"time"
"github.com/berops/claudie/internal/kubectl"
"github.com/rs/zerolog"
"gopkg.in/yaml.v3"
)
type K8sList[T any] struct {
Items []T `yaml:"items"`
}
type LonghornReplica struct {
Metadata Metadata `yaml:"metadata"`
Status Status `yaml:"status"`
Spec ReplicaSpec `yaml:"spec"`
}
type LonghornVolume struct {
Metadata Metadata `yaml:"metadata"`
Spec VolumeSpec `yaml:"spec"`
}
type Metadata struct {
Name string `yaml:"name"`
}
type ReplicaSpec struct {
VolumeName string `yaml:"volumeName"`
}
type VolumeSpec struct {
NumberOfReplicas int `yaml:"numberOfReplicas"`
}
type Status struct {
OwnerID string `yaml:"ownerID"`
CurrentState string `yaml:"currentState"`
Started bool `yaml:"started"`
}
const (
replicas = "replicas.longhorn.io"
volumes = "volumes.longhorn.io"
patchNumberOfReplicas = "{\"spec\":{\"numberOfReplicas\":%d}}"
runningState = "running"
replicaRunningCheck = 5 * time.Second
pvcReplicationTimeout = 5 * time.Minute
)
// getVolumes returns a map[volume name]volume of volumes currently in the cluster.
func getVolumes(kc kubectl.Kubectl) (map[string]LonghornVolume, error) {
out, err := kc.KubectlGet(volumes, "-n", longhornNamespace, "-o", "yaml")
if err != nil {
return nil, fmt.Errorf("failed to list all volumes : %w", err)
}
var volumeList K8sList[LonghornVolume]
if err := yaml.Unmarshal(out, &volumeList); err != nil {
return nil, fmt.Errorf("failed unmarshal kubectl output : %w", err)
}
m := make(map[string]LonghornVolume, len(volumeList.Items))
for _, v := range volumeList.Items {
m[v.Metadata.Name] = v
}
return m, nil
}
// getReplicas returns a map of nodes and slice of replicas they contain.
func getReplicasMap(kc kubectl.Kubectl) (map[string][]LonghornReplica, error) {
replicaList, err := getReplicas(kc)
if err != nil {
return nil, err
}
m := make(map[string][]LonghornReplica, len(replicaList.Items))
for _, r := range replicaList.Items {
m[r.Status.OwnerID] = append(m[r.Status.OwnerID], r)
}
return m, nil
}
func verifyAllReplicasSetUp(volumeName string, kc kubectl.Kubectl, logger zerolog.Logger) error {
ticker := time.NewTicker(replicaRunningCheck)
ctx, cancel := context.WithTimeout(context.Background(), pvcReplicationTimeout)
defer cancel()
// Check for the replication status
for {
select {
case <-ticker.C:
if ok, err := verifyAllReplicasRunning(volumeName, kc); err != nil {
logger.Warn().Msgf("Got error while checking for replication status of %s volume : %v", volumeName, err)
logger.Info().Msgf("Retrying check for replication status of %s volume", volumeName)
} else {
if ok {
return nil
} else {
logger.Debug().Msgf("Volume replication is not ready yet, retrying check for replication status of %s volume", volumeName)
}
}
case <-ctx.Done():
return fmt.Errorf("error while checking the status of volume %s replication : %w", volumeName, ctx.Err())
}
}
}
// deleteReplica deletes a replica from a node.
func deleteReplica(r LonghornReplica, kc kubectl.Kubectl) error {
return kc.KubectlDeleteResource(replicas, r.Metadata.Name, "-n", longhornNamespace)
}
// increaseReplicaCount increases number of replicas for longhorn volume by 1, via kubectl patch.
func increaseReplicaCount(v LonghornVolume, kc kubectl.Kubectl) error {
return kc.KubectlPatch(volumes, v.Metadata.Name, fmt.Sprintf(patchNumberOfReplicas, v.Spec.NumberOfReplicas+1), "-n", longhornNamespace, "--type", "merge")
}
// revertReplicaCount sets the number of replicas for longhorn volume to the original value, taken from the v, via kubectl patch.
func revertReplicaCount(v LonghornVolume, kc kubectl.Kubectl) error {
return kc.KubectlPatch(volumes, v.Metadata.Name, fmt.Sprintf(patchNumberOfReplicas, v.Spec.NumberOfReplicas), "-n", longhornNamespace, "--type", "merge")
}
// getReplicas returns a List of Longhorn replicas currently in cluster.
func getReplicas(kc kubectl.Kubectl) (K8sList[LonghornReplica], error) {
out, err := kc.KubectlGet(replicas, "-n", longhornNamespace, "-o", "yaml")
if err != nil {
return K8sList[LonghornReplica]{}, fmt.Errorf("failed to list all replicas : %w", err)
}
var replicaList K8sList[LonghornReplica]
if err := yaml.Unmarshal(out, &replicaList); err != nil {
return K8sList[LonghornReplica]{}, fmt.Errorf("failed unmarshal kubectl output : %w", err)
}
return replicaList, nil
}
// verifyAllReplicasRunning returns true, if all replicas for specified volume are in running state.
func verifyAllReplicasRunning(volumeName string, kc kubectl.Kubectl) (bool, error) {
replicaList, err := getReplicas(kc)
if err != nil {
return false, err
}
for _, r := range replicaList.Items {
if r.Spec.VolumeName == volumeName {
// Current state not running, return false.
if !(r.Status.CurrentState == runningState && r.Status.Started) {
return false, nil
}
}
}
// All replicas for specific volume are running, return true.
return true, nil
}