/
add_emqx_resources.go
86 lines (72 loc) · 2.33 KB
/
add_emqx_resources.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package v1beta4
import (
"context"
emperror "emperror.dev/errors"
appsv1beta4 "github.com/emqx/emqx-operator/apis/apps/v1beta4"
innerReq "github.com/emqx/emqx-operator/internal/requester"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type addEmqxResources struct {
*EmqxReconciler
Requester innerReq.RequesterInterface
}
func (a addEmqxResources) reconcile(ctx context.Context, logger logr.Logger, instance appsv1beta4.Emqx, args ...any) subResult {
initResources, ok := args[0].([]client.Object)
if !ok {
panic("args[0] is not []client.Object")
}
var resources []client.Object
license, err := a.getLicense(ctx, instance)
if err != nil {
return subResult{err: emperror.Wrap(err, "failed to get license")}
}
if license != nil {
resources = append(resources, license)
}
acl := generateEmqxACL(instance)
resources = append(resources, acl)
headlessSvc := generateHeadlessService(instance)
resources = append(resources, headlessSvc)
if err := a.CreateOrUpdateList(ctx, a.Scheme, logger, instance, resources); err != nil {
return subResult{err: emperror.Wrap(err, "failed to create or update resource")}
}
sts := generateStatefulSet(instance)
sts = updateStatefulSetForACL(sts, acl)
sts = updateStatefulSetForLicense(sts, license)
names := appsv1beta4.Names{Object: instance}
for _, initResource := range initResources {
if initResource.GetName() == names.BootstrapUser() {
bootstrapUser := initResource.(*corev1.Secret)
sts = updateStatefulSetForBootstrapUser(sts, bootstrapUser)
}
if initResource.GetName() == names.PluginsConfig() {
pluginsConfig := initResource.(*corev1.ConfigMap)
sts = updateStatefulSetForPluginsConfig(sts, pluginsConfig)
}
}
return subResult{args: sts}
}
func (a addEmqxResources) getLicense(ctx context.Context, instance appsv1beta4.Emqx) (*corev1.Secret, error) {
enterprise, ok := instance.(*appsv1beta4.EmqxEnterprise)
if !ok {
return nil, nil
}
if enterprise.Spec.License.SecretName != "" {
license := &corev1.Secret{}
if err := a.Client.Get(
ctx,
types.NamespacedName{
Name: enterprise.Spec.License.SecretName,
Namespace: enterprise.GetNamespace(),
},
license,
); err != nil {
return nil, err
}
return license, nil
}
return generateLicense(instance), nil
}