Skip to content

Commit

Permalink
feat: Add statefulSet reconcile function
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeyshevch committed Mar 22, 2024
1 parent 773ea04 commit 0957a32
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 207 deletions.
79 changes: 74 additions & 5 deletions api/v1alpha1/etcdcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
)

// EtcdClusterSpec defines the desired state of EtcdCluster
Expand All @@ -27,8 +28,10 @@ type EtcdClusterSpec struct {
// +optional
// +kubebuilder:default:=3
// +kubebuilder:validation:Minimum:=0
Replicas *int32 `json:"replicas,omitempty"`
Storage StorageSpec `json:"storage"`
Replicas *int32 `json:"replicas,omitempty"`
// PodSpec defines the desired state of PodSpec for etcd members. If not specified, default values will be used.
PodSpec *PodSpec `json:"pod,omitempty"`
Storage StorageSpec `json:"storage"`
}

const (
Expand All @@ -50,8 +53,8 @@ type EtcdClusterStatus struct {
Conditions []metav1.Condition `json:"conditions,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status

// EtcdCluster is the Schema for the etcdclusters API
type EtcdCluster struct {
Expand All @@ -62,7 +65,20 @@ type EtcdCluster struct {
Status EtcdClusterStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true
func (e *EtcdCluster) AsOwner() []metav1.OwnerReference {
return []metav1.OwnerReference{
{
APIVersion: e.APIVersion,
Kind: e.Kind,
Name: e.Name,
UID: e.UID,
Controller: ptr.To(true),
BlockOwnerDeletion: ptr.To(true),
},
}
}

// +kubebuilder:object:root=true

// EtcdClusterList contains a list of EtcdCluster
type EtcdClusterList struct {
Expand Down Expand Up @@ -101,6 +117,59 @@ type EmbeddedObjectMetadata struct {
Annotations map[string]string `json:"annotations,omitempty" protobuf:"bytes,12,rep,name=annotations"`
}

// PodSpec defines the desired state of PodSpec for etcd members.
// +k8s:openapi-gen=true
type PodSpec struct {
// ImagePullPolicy describes a policy for if/when to pull a container image
// +kubebuilder:default:=IfNotPresent
// +optional
ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
// ImagePullSecrets An optional list of references to secrets in the same namespace
// to use for pulling images from registries
// see https://kubernetes.io/docs/concepts/containers/images/#referring-to-an-imagepullsecrets-on-a-pod
// +optional
ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
// PodMetadata contains metadata relevant to a PodSpec.
// +optional
PodMetadata *EmbeddedObjectMetadata `json:"metadata,omitempty"`
// Resources describes the compute resource requirements.
// +optional
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
// Affinity sets the scheduling constraints for the pod.
// +optional
Affinity *corev1.Affinity `json:"affinity,omitempty"`
// NodeSelector is a selector which must be true for the pod to fit on a node.
// +optional
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
// TopologySpreadConstraints describes how a group of pods ought to spread across topology domains.
// +optional
TopologySpreadConstraints []corev1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"`
// Tolerations is a list of tolerations.
// +optional
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
// SecurityContext holds pod-level security attributes and common container settings.
// +optional
SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"`
// PriorityClassName is the name of the PriorityClass for this pod.
// +optional
PriorityClassName string `json:"priorityClassName,omitempty"`
// TerminationGracePeriodSeconds is the time to wait before forceful pod shutdown.
// +optional
TerminationGracePeriodSeconds *int64 `json:"terminationGracePeriodSeconds,omitempty"`
// SchedulerName is the name of the scheduler to be used for scheduling the pod.
// +optional
SchedulerName string `json:"schedulerName,omitempty"`
// RuntimeClassName refers to a RuntimeClass object in the node.k8s.io group, which should be used to run this pod.
// +optional
RuntimeClassName *string `json:"runtimeClassName,omitempty"`
// ExtraArgs are the extra arguments to pass to the etcd container.
// +optional
ExtraArgs map[string]string `json:"extraArgs,omitempty"`
// ExtraEnv are the extra environment variables to pass to the etcd container.
// +optional
ExtraEnv []corev1.EnvVar `json:"extraEnv,omitempty"`
}

// StorageSpec defines the configured storage for a etcd members.
// If neither `emptyDir` nor `volumeClaimTemplate` is specified, then by default an [EmptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) will be used.
// +k8s:openapi-gen=true
Expand Down
211 changes: 10 additions & 201 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
"github.com/aenix-io/etcd-operator/internal/controller/factory"
)

// EtcdClusterReconciler reconciles a EtcdCluster object
Expand All @@ -45,12 +46,12 @@ type EtcdClusterReconciler struct {
Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/finalizers,verbs=update
//+kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;watch;delete;patch
//+kubebuilder:rbac:groups="",resources=services,verbs=get;create;delete;update;patch;list;watch
//+kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;create;delete;update;patch;list;watch
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/finalizers,verbs=update
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;watch;delete;patch
// +kubebuilder:rbac:groups="",resources=services,verbs=get;create;delete;update;patch;list;watch
// +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;create;delete;update;patch;list;watch

// Reconcile checks CR and current cluster state and performs actions to transform current state to desired.
func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -138,7 +139,7 @@ func (r *EtcdClusterReconciler) ensureClusterObjects(
return err
}
// 2. create or update statefulset
if err := r.ensureClusterStatefulSet(ctx, cluster); err != nil {
if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client); err != nil {
return err
}
// 3. create or update ClusterIP Service
Expand Down Expand Up @@ -248,7 +249,7 @@ func (r *EtcdClusterReconciler) ensureClusterStateConfigMap(
configMap := &corev1.ConfigMap{}
err := r.Get(ctx, client.ObjectKey{
Namespace: cluster.Namespace,
Name: r.getClusterStateConfigMapName(cluster),
Name: factory.GetClusterStateConfigMapName(cluster),
}, configMap)
// configmap exists, skip editing.
if err == nil {
Expand Down Expand Up @@ -278,7 +279,7 @@ func (r *EtcdClusterReconciler) ensureClusterStateConfigMap(
configMap = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Name: r.getClusterStateConfigMapName(cluster),
Name: factory.GetClusterStateConfigMapName(cluster),
},
Data: map[string]string{
"ETCD_INITIAL_CLUSTER_STATE": "new",
Expand All @@ -298,202 +299,10 @@ func (r *EtcdClusterReconciler) ensureClusterStateConfigMap(
return fmt.Errorf("cannot get cluster state configmap: %w", err)
}

func (r *EtcdClusterReconciler) ensureClusterStatefulSet(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {
statefulSet := &appsv1.StatefulSet{}
err := r.Get(ctx, client.ObjectKey{
Namespace: cluster.Namespace,
Name: cluster.Name,
}, statefulSet)

// statefulset does not exist, create new one
notFound := false
if errors.IsNotFound(err) {
notFound = true
// prepare initial cluster members

statefulSet = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Name: cluster.Name,
},
Spec: appsv1.StatefulSetSpec{
// initialize static fields that cannot be changed across updates.
Replicas: cluster.Spec.Replicas,
ServiceName: cluster.Name,
PodManagementPolicy: appsv1.ParallelPodManagement,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/name": "etcd",
"app.kubernetes.io/instance": cluster.Name,
"app.kubernetes.io/managed-by": "etcd-operator",
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app.kubernetes.io/name": "etcd",
"app.kubernetes.io/instance": cluster.Name,
"app.kubernetes.io/managed-by": "etcd-operator",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "etcd",
Image: "quay.io/coreos/etcd:v3.5.12",
Command: []string{
"etcd",
"--name=$(POD_NAME)",
"--listen-peer-urls=https://0.0.0.0:2380",
// for first version disable TLS for client access
"--listen-client-urls=http://0.0.0.0:2379",
"--initial-advertise-peer-urls=https://$(POD_NAME)." + cluster.Name + ".$(POD_NAMESPACE).svc:2380",
"--data-dir=/var/run/etcd/default.etcd",
"--auto-tls",
"--peer-auto-tls",
"--advertise-client-urls=http://$(POD_NAME)." + cluster.Name + ".$(POD_NAMESPACE).svc:2379",
},
Ports: []corev1.ContainerPort{
{Name: "peer", ContainerPort: 2380},
{Name: "client", ContainerPort: 2379},
},
EnvFrom: []corev1.EnvFromSource{
{
ConfigMapRef: &corev1.ConfigMapEnvSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: r.getClusterStateConfigMapName(cluster),
},
},
},
},
Env: []corev1.EnvVar{
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "POD_NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "data",
ReadOnly: false,
MountPath: "/var/run/etcd",
},
},
StartupProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/readyz?serializable=false",
Port: intstr.FromInt32(2379),
},
},
InitialDelaySeconds: 1,
PeriodSeconds: 5,
},
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/livez",
Port: intstr.FromInt32(2379),
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 5,
},
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/readyz",
Port: intstr.FromInt32(2379),
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 5,
},
},
},
},
},
},
}
if cluster.Spec.Storage.EmptyDir != nil {
statefulSet.Spec.Template.Spec.Volumes = []corev1.Volume{
{
Name: "data",
VolumeSource: corev1.VolumeSource{EmptyDir: cluster.Spec.Storage.EmptyDir},
},
}
} else {
statefulSet.Spec.Template.Spec.Volumes = []corev1.Volume{
{
Name: "data",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: r.getPVCName(cluster),
},
},
},
}
statefulSet.Spec.VolumeClaimTemplates = []corev1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{
Name: r.getPVCName(cluster),
Labels: cluster.Spec.Storage.VolumeClaimTemplate.Labels,
Annotations: cluster.Spec.Storage.VolumeClaimTemplate.Annotations,
},
Spec: cluster.Spec.Storage.VolumeClaimTemplate.Spec,
Status: cluster.Spec.Storage.VolumeClaimTemplate.Status,
},
}
}
if err := ctrl.SetControllerReference(cluster, statefulSet, r.Scheme); err != nil {
return fmt.Errorf("cannot set controller reference: %w", err)
}
} else if err != nil {
return fmt.Errorf("cannot get cluster statefulset: %w", err)
}

if notFound {
if err := r.Create(ctx, statefulSet); err != nil {
return fmt.Errorf("cannot create statefulset: %w", err)
}
} else {
if err := r.Update(ctx, statefulSet); err != nil {
return fmt.Errorf("cannot update statefulset: %w", err)
}
}

return nil
}

func (r *EtcdClusterReconciler) getClusterStateConfigMapName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
return cluster.Name + "-cluster-state"
}

func (r *EtcdClusterReconciler) getClientServiceName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
return cluster.Name + "-client"
}

func (r *EtcdClusterReconciler) getPVCName(cluster *etcdaenixiov1alpha1.EtcdCluster) string {
if len(cluster.Spec.Storage.VolumeClaimTemplate.Name) > 0 {
return cluster.Spec.Storage.VolumeClaimTemplate.Name
}

return "data"
}

// updateStatusOnErr wraps error and updates EtcdCluster status
func (r *EtcdClusterReconciler) updateStatusOnErr(ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster, err error) (ctrl.Result, error) {
res, statusErr := r.updateStatus(ctx, cluster)
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/etcdcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

etcdaenixiov1alpha1 "github.com/aenix-io/etcd-operator/api/v1alpha1"
"github.com/aenix-io/etcd-operator/internal/controller/factory"
)

var _ = Describe("EtcdCluster Controller", func() {
Expand Down Expand Up @@ -100,7 +101,7 @@ var _ = Describe("EtcdCluster Controller", func() {
cm := &v1.ConfigMap{}
cmName := types.NamespacedName{
Namespace: typeNamespacedName.Namespace,
Name: controllerReconciler.getClusterStateConfigMapName(etcdcluster),
Name: factory.GetClusterStateConfigMapName(etcdcluster),
}
err = k8sClient.Get(ctx, cmName, cm)
Expect(err).NotTo(HaveOccurred(), "cluster configmap state should exist")
Expand Down
Loading

0 comments on commit 0957a32

Please sign in to comment.