/
add_admin_server.go
121 lines (98 loc) · 3.62 KB
/
add_admin_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package controller
import (
"context"
hapi "github.com/hstreamdb/hstream-operator/api/v1alpha2"
"github.com/hstreamdb/hstream-operator/internal"
"github.com/hstreamdb/hstream-operator/internal/utils"
"github.com/hstreamdb/hstream-operator/pkg/constants"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const (
adminServerConfigPath = "/etc/logdevice"
)
var adminServerArgs = []string{
"--config-path", adminServerConfigPath + "/config.json",
"--enable-maintenance-manager", "",
"--maintenance-log-snapshotting", "",
"--enable-safety-check-periodic-metadata-update", "",
}
type addAdminServer struct{}
func (a addAdminServer) reconcile(ctx context.Context, r *HStreamDBReconciler, hdb *hapi.HStreamDB) *requeue {
logger := log.WithValues("namespace", hdb.Namespace, "instance", hdb.Name, "reconciler", "add admin server")
deploy := a.getDeployment(hdb)
existingDeploy := &appsv1.Deployment{}
err := r.Client.Get(ctx, client.ObjectKeyFromObject(&deploy), existingDeploy)
if err != nil {
if !k8sErrors.IsNotFound(err) {
return &requeue{curError: err}
}
if err = ctrl.SetControllerReference(hdb, &deploy, r.Scheme); err != nil {
return &requeue{curError: err}
}
logger.Info("Create admin server")
if err = r.Client.Create(ctx, &deploy); err != nil {
return &requeue{curError: err}
}
return nil
}
if !isHashChanged(&existingDeploy.ObjectMeta, &deploy.ObjectMeta) {
return nil
}
logger.Info("Update admin server")
r.Recorder.Event(hdb, corev1.EventTypeNormal, "UpdatingAdminServer", "")
existingDeploy.Annotations = deploy.Annotations
existingDeploy.Labels = deploy.Labels
existingDeploy.Spec = deploy.Spec
if err = r.Update(ctx, existingDeploy); err != nil {
return &requeue{curError: err}
}
return nil
}
func (a addAdminServer) getDeployment(hdb *hapi.HStreamDB) appsv1.Deployment {
podTemplate := a.getPodTemplate(hdb)
deploy := internal.GetDeployment(hdb, &hdb.Spec.AdminServer,
&podTemplate, hapi.ComponentTypeAdminServer)
return deploy
}
func (a addAdminServer) getPodTemplate(hdb *hapi.HStreamDB) corev1.PodTemplateSpec {
adminServer := &hdb.Spec.AdminServer
pod := corev1.PodTemplateSpec{
ObjectMeta: hapi.ComponentTypeAdminServer.GetObjectMeta(hdb, nil),
Spec: corev1.PodSpec{
Affinity: adminServer.Affinity,
Tolerations: adminServer.Tolerations,
NodeName: adminServer.NodeName,
NodeSelector: adminServer.NodeSelector,
SecurityContext: adminServer.PodSecurityContext,
InitContainers: adminServer.InitContainers,
Containers: a.getContainer(hdb),
Volumes: append(adminServer.Volumes, utils.GetLogDeviceConfigVolume(hdb)),
},
}
return pod
}
func (a addAdminServer) getContainer(hdb *hapi.HStreamDB) []corev1.Container {
adminServer := &hdb.Spec.AdminServer
container := corev1.Container{
Image: hdb.Spec.AdminServer.Image,
ImagePullPolicy: hdb.Spec.AdminServer.ImagePullPolicy,
}
structAssign(&container, &adminServer.Container)
if container.Name == "" {
container.Name = string(hapi.ComponentTypeAdminServer)
}
if len(container.Command) == 0 {
container.Command = []string{"/usr/local/bin/ld-admin-server"}
}
container.Args, _ = extendArgs(container.Args, adminServerArgs...)
container.Ports = coverPortsFromArgs(container.Args, extendPorts(container.Ports, constants.DefaultAdminServerPort))
container.VolumeMounts = append(
container.VolumeMounts,
utils.GetLogDeviceConfigVolumeMount(hdb),
)
return append([]corev1.Container{container}, adminServer.SidecarContainers...)
}