Skip to content

Commit

Permalink
feat: Refactor resource generation
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeyshevch committed Mar 25, 2024
1 parent 3088a5a commit c3194c1
Show file tree
Hide file tree
Showing 8 changed files with 416 additions and 159 deletions.
160 changes: 3 additions & 157 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -132,177 +131,24 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
func (r *EtcdClusterReconciler) ensureClusterObjects(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, isClusterInitialized bool) error {
// 1. create or update configmap <name>-cluster-state
if err := r.ensureClusterStateConfigMap(ctx, cluster, isClusterInitialized); err != nil {
if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, isClusterInitialized, r.Client, r.Scheme); err != nil {
return err
}
if err := r.ensureClusterService(ctx, cluster); err != nil {
if err := factory.CreateOrUpdateClusterService(ctx, cluster, r.Client, r.Scheme); err != nil {
return err
}
// 2. create or update statefulset
if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client, r.Scheme); err != nil {
return err
}
// 3. create or update ClusterIP Service
if err := r.ensureClusterClientService(ctx, cluster); err != nil {
if err := factory.CreateOrUpdateClientService(ctx, cluster, r.Client, r.Scheme); err != nil {
return err
}

return nil
}

func (r *EtcdClusterReconciler) ensureClusterService(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
svc := &corev1.Service{}
err := r.Get(ctx, client.ObjectKey{
Namespace: cluster.Namespace,
Name: cluster.Name,
}, svc)
// Service exists, skip creation
if err == nil {
return nil
}
if !errors.IsNotFound(err) {
return fmt.Errorf("cannot get cluster service: %w", err)
}

svc = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: cluster.Name,
Namespace: cluster.Namespace,
Labels: map[string]string{
"app.kubernetes.io/name": "etcd",
"app.kubernetes.io/instance": cluster.Name,
"app.kubernetes.io/managed-by": "etcd-operator",
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "peer", TargetPort: intstr.FromInt32(2380), Port: 2380, Protocol: corev1.ProtocolTCP},
{Name: "client", TargetPort: intstr.FromInt32(2379), Port: 2379, Protocol: corev1.ProtocolTCP},
},
Type: corev1.ServiceTypeClusterIP,
ClusterIP: "None",
Selector: map[string]string{
"app.kubernetes.io/name": "etcd",
"app.kubernetes.io/instance": cluster.Name,
"app.kubernetes.io/managed-by": "etcd-operator",
},
PublishNotReadyAddresses: true,
},
}
if err = ctrl.SetControllerReference(cluster, svc, r.Scheme); err != nil {
return fmt.Errorf("cannot set controller reference: %w", err)
}
if err = r.Create(ctx, svc); err != nil {
return fmt.Errorf("cannot create cluster service: %w", err)
}
return nil
}

func (r *EtcdClusterReconciler) ensureClusterClientService(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
svc := &corev1.Service{}
err := r.Get(ctx, client.ObjectKey{
Namespace: cluster.Namespace,
Name: r.getClientServiceName(cluster),
}, svc)
// Service exists, skip creation
if err == nil {
return nil
}
if !errors.IsNotFound(err) {
return fmt.Errorf("cannot get cluster client service: %w", err)
}

svc = &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: r.getClientServiceName(cluster),
Namespace: cluster.Namespace,
Labels: map[string]string{
"app.kubernetes.io/name": "etcd",
"app.kubernetes.io/instance": cluster.Name,
"app.kubernetes.io/managed-by": "etcd-operator",
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "client", TargetPort: intstr.FromInt32(2379), Port: 2379, Protocol: corev1.ProtocolTCP},
},
Type: corev1.ServiceTypeClusterIP,
Selector: map[string]string{
"app.kubernetes.io/name": "etcd",
"app.kubernetes.io/instance": cluster.Name,
"app.kubernetes.io/managed-by": "etcd-operator",
},
},
}
if err = ctrl.SetControllerReference(cluster, svc, r.Scheme); err != nil {
return fmt.Errorf("cannot set controller reference: %w", err)
}
if err = r.Create(ctx, svc); err != nil {
return fmt.Errorf("cannot create cluster client service: %w", err)
}
return nil
}

// ensureClusterStateConfigMap creates or updates cluster state configmap.
func (r *EtcdClusterReconciler) ensureClusterStateConfigMap(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, isClusterInitialized bool) error {
configMap := &corev1.ConfigMap{}
err := r.Get(ctx, client.ObjectKey{
Namespace: cluster.Namespace,
Name: factory.GetClusterStateConfigMapName(cluster),
}, configMap)
// configmap exists, skip editing.
if err == nil {
if isClusterInitialized {
// update cluster state to existing
configMap.Data["ETCD_INITIAL_CLUSTER_STATE"] = "existing"
if err = r.Update(ctx, configMap); err != nil {
return fmt.Errorf("cannot update cluster state configmap: %w", err)
}
}
return nil
}

// configmap does not exist, create with cluster state "new"
if errors.IsNotFound(err) {
initialCluster := ""
for i := int32(0); i < *cluster.Spec.Replicas; i++ {
if i > 0 {
initialCluster += ","
}
initialCluster += fmt.Sprintf("%s-%d=https://%s-%d.%s.%s.svc:2380",
cluster.Name, i,
cluster.Name, i, cluster.Name, cluster.Namespace,
)
}

configMap = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Name: factory.GetClusterStateConfigMapName(cluster),
},
Data: map[string]string{
"ETCD_INITIAL_CLUSTER_STATE": "new",
"ETCD_INITIAL_CLUSTER": initialCluster,
"ETCD_INITIAL_CLUSTER_TOKEN": cluster.Name + "-" + cluster.Namespace,
},
}
if err := ctrl.SetControllerReference(cluster, configMap, r.Scheme); err != nil {
return fmt.Errorf("cannot set controller reference: %w", err)
}
if err := r.Create(ctx, configMap); err != nil {
return fmt.Errorf("cannot create cluster state configmap: %w", err)
}
return nil
}

return fmt.Errorf("cannot get cluster state configmap: %w", err)
}

func (r *EtcdClusterReconciler) getClientServiceName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
return cluster.Name + "-client"
}

// updateStatusOnErr wraps error and updates EtcdCluster status
func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, err error) (ctrl.Result, error) {
res, statusErr := r.updateStatus(ctx, cluster)
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/etcdcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ var _ = Describe("EtcdCluster Controller", func() {
svc = &v1.Service{}
clientSvcName := types.NamespacedName{
Namespace: typeNamespacedName.Namespace,
Name: controllerReconciler.getClientServiceName(etcdcluster),
Name: factory.GetClientServiceName(etcdcluster),
}
err = k8sClient.Get(ctx, clientSvcName, svc)
Expect(err).NotTo(HaveOccurred(), "cluster client Service should exist")
Expand Down
40 changes: 40 additions & 0 deletions internal/controller/factory/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -47,3 +48,42 @@ func reconcileSTS(ctx context.Context, rclient client.Client, crdName string, st
sts.Status = currentSts.Status
return rclient.Update(ctx, sts)
}

func reconcileConfigMap(ctx context.Context, rclient client.Client, crdName string, configMap *corev1.ConfigMap) error {
logger := log.FromContext(ctx)

currentConfigMap := &corev1.ConfigMap{}
err := rclient.Get(ctx, types.NamespacedName{Namespace: configMap.Namespace, Name: configMap.Name}, currentConfigMap)
if err != nil {
if errors.IsNotFound(err) {
logger.V(2).Info("creating new configMap", "cm_name", configMap.Name, "crd_object", crdName)
return rclient.Create(ctx, configMap)
}
return fmt.Errorf("cannot get existing configMap: %s, for crd_object: %s, err: %w", configMap.Name, crdName, err)
}
configMap.Annotations = labels.Merge(currentConfigMap.Annotations, configMap.Annotations)
if configMap.ResourceVersion != "" {
configMap.ResourceVersion = currentConfigMap.ResourceVersion
}
return rclient.Update(ctx, configMap)
}

func reconcileSVC(ctx context.Context, rclient client.Client, crdName string, svc *corev1.Service) error {
logger := log.FromContext(ctx)

currentSvc := &corev1.Service{}
err := rclient.Get(ctx, types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}, currentSvc)
if err != nil {
if errors.IsNotFound(err) {
logger.V(2).Info("creating new service", "svc_name", svc.Name, "crd_object", crdName)
return rclient.Create(ctx, svc)
}
return fmt.Errorf("cannot get existing service: %s, for crd_object: %s, err: %w", svc.Name, crdName, err)
}
svc.Annotations = labels.Merge(currentSvc.Annotations, svc.Annotations)
if svc.ResourceVersion != "" {
svc.ResourceVersion = currentSvc.ResourceVersion
}
svc.Status = currentSvc.Status
return rclient.Update(ctx, svc)
}
55 changes: 54 additions & 1 deletion internal/controller/factory/configMap.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,61 @@ limitations under the License.

package factory

import etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
)

func GetClusterStateConfigMapName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
return cluster.Name + "-cluster-state"
}

func CreateOrUpdateClusterStateConfigMap(
ctx context.Context,
cluster *etcdaenixiov1alpha1.EtcdCluster,
isClusterInitialized bool,
rclient client.Client,
rscheme *runtime.Scheme,
) error {
initialCluster := ""
for i := int32(0); i < *cluster.Spec.Replicas; i++ {
if i > 0 {
initialCluster += ","
}
initialCluster += fmt.Sprintf("%s-%d=https://%s-%d.%s.%s.svc:2380",
cluster.Name, i,
cluster.Name, i, cluster.Name, cluster.Namespace,
)
}

configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Name: GetClusterStateConfigMapName(cluster),
},
Data: map[string]string{
"ETCD_INITIAL_CLUSTER_STATE": "new",
"ETCD_INITIAL_CLUSTER": initialCluster,
"ETCD_INITIAL_CLUSTER_TOKEN": cluster.Name + "-" + cluster.Namespace,
},
}

if isClusterInitialized {
// update cluster state to existing
configMap.Data["ETCD_INITIAL_CLUSTER_STATE"] = "existing"
}

if err := ctrl.SetControllerReference(cluster, configMap, rscheme); err != nil {
return fmt.Errorf("cannot set controller reference: %w", err)
}

return reconcileConfigMap(ctx, rclient, cluster.Name, configMap)
}
Loading

0 comments on commit c3194c1

Please sign in to comment.