-
Notifications
You must be signed in to change notification settings - Fork 189
/
reconcilers.go
146 lines (128 loc) · 4.21 KB
/
reconcilers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package reconcilers
import (
"fmt"
"net/http"
"k8s.io/client-go/tools/record"
opsterv1 "github.com/Opster/opensearch-k8s-operator/opensearch-operator/api/v1"
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/k8s"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
const (
opensearchPending = "OpensearchPending"
opensearchError = "OpensearchError"
opensearchAPIError = "OpensearchAPIError"
opensearchRefMismatch = "OpensearchRefMismatch"
opensearchAPIUpdated = "OpensearchAPIUpdated"
passwordError = "PasswordError"
statusError = "StatusUpdateError"
)
type ComponentReconciler func() (reconcile.Result, error)
type ReconcilerOptions struct {
osClientTransport http.RoundTripper
updateStatus *bool
}
type ReconcilerOption func(*ReconcilerOptions)
func (o *ReconcilerOptions) apply(opts ...ReconcilerOption) {
for _, op := range opts {
op(o)
}
}
func WithOSClientTransport(transport http.RoundTripper) ReconcilerOption {
return func(o *ReconcilerOptions) {
o.osClientTransport = transport
}
}
func WithUpdateStatus(update bool) ReconcilerOption {
return func(o *ReconcilerOptions) {
o.updateStatus = &update
}
}
type ReconcilerContext struct {
Volumes []corev1.Volume
VolumeMounts []corev1.VolumeMount
NodePoolHashes []NodePoolHash
DashboardsConfig map[string]string
OpenSearchConfig map[string]string
recorder record.EventRecorder
instance *opsterv1.OpenSearchCluster
}
type NodePoolHash struct {
Component string
ConfigHash string
}
func NewReconcilerContext(recorder record.EventRecorder, instance *opsterv1.OpenSearchCluster, nodepools []opsterv1.NodePool) ReconcilerContext {
var nodePoolHashes []NodePoolHash
for _, nodepool := range nodepools {
nodePoolHashes = append(nodePoolHashes, NodePoolHash{
Component: nodepool.Component,
})
}
return ReconcilerContext{
NodePoolHashes: nodePoolHashes,
OpenSearchConfig: make(map[string]string),
DashboardsConfig: make(map[string]string),
recorder: recorder,
instance: instance,
}
}
func (c *ReconcilerContext) AddConfig(key string, value string) {
_, exists := c.OpenSearchConfig[key]
if exists {
fmt.Printf("Warning: Config key '%s' already exists. Will be overwritten\n", key)
c.recorder.Eventf(c.instance, "Warning", "ConfigDuplicateKey", "Config key '%s' already exists in opensearch config. Will be overwritten", key)
}
c.OpenSearchConfig[key] = value
}
func (c *ReconcilerContext) AddDashboardsConfig(key string, value string) {
_, exists := c.DashboardsConfig[key]
if exists {
fmt.Printf("Warning: Dashboards Config key '%s' already exists. Will be overwritten\n", key)
c.recorder.Eventf(c.instance, "Warning", "ConfigDuplicateKey", "Config key '%s' already exists in dashboards config. Will be overwritten", key)
}
c.DashboardsConfig[key] = value
}
// fetchNodePoolHash gets the hash of the config for a specific node pool
func (c *ReconcilerContext) fetchNodePoolHash(name string) (bool, NodePoolHash) {
for _, config := range c.NodePoolHashes {
if config.Component == name {
return true, config
}
}
return false, NodePoolHash{}
}
// replaceNodePoolHash updates the hash of the config for a specific node pool
func (c *ReconcilerContext) replaceNodePoolHash(newConfig NodePoolHash) {
var configs []NodePoolHash
for _, config := range c.NodePoolHashes {
if config.Component == newConfig.Component {
configs = append(configs, newConfig)
} else {
configs = append(configs, config)
}
}
c.NodePoolHashes = configs
}
func UpdateComponentStatus(
k8sClient k8s.K8sClient,
cluster *opsterv1.OpenSearchCluster,
status *opsterv1.ComponentStatus,
) error {
if status != nil {
return k8sClient.UpdateOpenSearchClusterStatus(client.ObjectKeyFromObject(cluster), func(instance *opsterv1.OpenSearchCluster) {
found := false
for idx, value := range instance.Status.ComponentsStatus {
if value.Component == status.Component {
instance.Status.ComponentsStatus[idx] = *status
found = true
break
}
}
if !found {
instance.Status.ComponentsStatus = append(instance.Status.ComponentsStatus, *status)
}
})
}
return nil
}