Skip to content

Commit

Permalink
dev: extract strorage reconciler
Browse files Browse the repository at this point in the history
  • Loading branch information
sunsingerus committed May 23, 2024
1 parent a7946bb commit afdff67
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 26 deletions.
13 changes: 10 additions & 3 deletions pkg/controller/chi/controller-storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package chi
import (
"context"
"fmt"

core "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"

Expand All @@ -28,8 +27,16 @@ import (
"github.com/altinity/clickhouse-operator/pkg/util"
)

// updatePersistentVolumeClaim
func (c *Controller) updatePersistentVolumeClaim(ctx context.Context, pvc *core.PersistentVolumeClaim) (*core.PersistentVolumeClaim, error) {
func (c *Controller) kubePVCGet(ctx context.Context, namespace, name string) (*core.PersistentVolumeClaim, error) {
return c.kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, name, controller.NewGetOptions())
}

func (c *Controller) kubePVCDelete(ctx context.Context, namespace, name string) error {
return c.kubeClient.CoreV1().PersistentVolumeClaims(namespace).Delete(ctx, name, controller.NewDeleteOptions())
}

// updateOrCreatePVC
func (c *Controller) updateOrCreatePVC(ctx context.Context, pvc *core.PersistentVolumeClaim) (*core.PersistentVolumeClaim, error) {
log.V(2).M(pvc).F().P()
if util.IsContextDone(ctx) {
log.V(2).Info("task is done")
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/chi/worker-chi-reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error {
w.a.V(1).
M(host).F().
Info("Reconcile PVCs and check possible data loss for host: %s", host.GetName())
if errIsDataLoss(w.reconcilePVCs(ctx, host, api.DesiredStatefulSet)) {
if errIsDataLoss(NewStorageReconciler(w.a, w.task, w.c).reconcilePVCs(ctx, host, api.DesiredStatefulSet)) {
// In case of data loss detection on existing volumes, we need to:
// 1. recreate StatefulSet
// 2. run tables migration again
Expand All @@ -738,7 +738,7 @@ func (w *worker) reconcileHost(ctx context.Context, host *api.Host) error {
return err
}
// Polish all new volumes that operator has to create
_ = w.reconcilePVCs(ctx, host, api.DesiredStatefulSet)
_ = NewStorageReconciler(w.a, w.task, w.c).reconcilePVCs(ctx, host, api.DesiredStatefulSet)

_ = w.reconcileHostService(ctx, host)

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/chi/worker-statefulset-reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (w *worker) recreateStatefulSet(ctx context.Context, host *api.Host, regist
}

_ = w.c.deleteStatefulSet(ctx, host)
_ = w.reconcilePVCs(ctx, host, api.DesiredStatefulSet)
_ = NewStorageReconciler(w.a, w.task, w.c).reconcilePVCs(ctx, host, api.DesiredStatefulSet)
return w.createStatefulSet(ctx, host, register)
}

Expand Down
47 changes: 30 additions & 17 deletions pkg/controller/chi/worker-storage-reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

log "github.com/altinity/clickhouse-operator/pkg/announcer"
api "github.com/altinity/clickhouse-operator/pkg/apis/clickhouse.altinity.com/v1"
"github.com/altinity/clickhouse-operator/pkg/controller"
"github.com/altinity/clickhouse-operator/pkg/model"
"github.com/altinity/clickhouse-operator/pkg/model/common/interfaces"
"github.com/altinity/clickhouse-operator/pkg/model/common/volume"
Expand All @@ -44,8 +43,22 @@ import (
// })
//}

type StorageReconciler struct {
a Announcer
task *task
c *Controller
}

func NewStorageReconciler(a Announcer, task *task, c *Controller) *StorageReconciler {
return &StorageReconciler{
a: a,
task: task,
c: c,
}
}

// reconcilePVCs reconciles all PVCs of a host
func (w *worker) reconcilePVCs(ctx context.Context, host *api.Host, which api.WhichStatefulSet) (res ErrorDataPersistence) {
func (w *StorageReconciler) reconcilePVCs(ctx context.Context, host *api.Host, which api.WhichStatefulSet) (res ErrorDataPersistence) {
if util.IsContextDone(ctx) {
return nil
}
Expand All @@ -68,12 +81,12 @@ func (w *worker) reconcilePVCs(ctx context.Context, host *api.Host, which api.Wh
return
}

func (w *worker) reconcilePVCFromVolumeMount(
func (w *StorageReconciler) reconcilePVCFromVolumeMount(
ctx context.Context,
host *api.Host,
volumeMount *core.VolumeMount,
) (
res ErrorDataPersistence,
reconcileError ErrorDataPersistence,
) {
// Which PVC are we going to reconcile
pvc, volumeClaimTemplate, isModelCreated, err := w.fetchPVC(ctx, host, volumeMount)
Expand All @@ -100,7 +113,7 @@ func (w *worker) reconcilePVCFromVolumeMount(
if w.isLostPVC(pvc, isModelCreated, host) {
// Looks like data loss detected
w.a.V(1).M(host).Warning("PVC is either newly added to the host or was lost earlier (%s/%s/%s/%s)", namespace, host.GetName(), volumeMount.Name, pvcName)
res = errPVCIsLost
reconcileError = errPVCIsLost
}

// Check scenario 2 - PVC exists, but no PV available
Expand All @@ -114,7 +127,7 @@ func (w *worker) reconcilePVCFromVolumeMount(
// Refresh PVC model. Since PVC is just deleted refreshed model may not be fetched from the k8s,
// but can be provided by the operator still
pvc, volumeClaimTemplate, _, _ = w.fetchPVC(ctx, host, volumeMount)
res = errPVCWithLostPVDeleted
reconcileError = errPVCWithLostPVDeleted
}

// In any case - be PVC available or not - need to reconcile it
Expand All @@ -130,10 +143,10 @@ func (w *worker) reconcilePVCFromVolumeMount(
}

// It still may return data loss errors
return res
return reconcileError
}

func (w *worker) isLostPVC(pvc *core.PersistentVolumeClaim, isJustCreated bool, host *api.Host) bool {
func (w *StorageReconciler) isLostPVC(pvc *core.PersistentVolumeClaim, isJustCreated bool, host *api.Host) bool {
if !host.HasData() {
// No data to loose
return false
Expand All @@ -157,15 +170,15 @@ func (w *worker) isLostPVC(pvc *core.PersistentVolumeClaim, isJustCreated bool,
return false
}

func (w *worker) isLostPV(pvc *core.PersistentVolumeClaim) bool {
func (w *StorageReconciler) isLostPV(pvc *core.PersistentVolumeClaim) bool {
if pvc == nil {
return false
}

return pvc.Status.Phase == core.ClaimLost
}

func (w *worker) fetchPVC(
func (w *StorageReconciler) fetchPVC(
ctx context.Context,
host *api.Host,
volumeMount *core.VolumeMount,
Expand All @@ -187,7 +200,7 @@ func (w *worker) fetchPVC(
// We have a VolumeClaimTemplate for this VolumeMount
// Treat it as persistent storage mount

_pvc, e := w.c.kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, controller.NewGetOptions())
_pvc, e := w.c.kubePVCGet(ctx, namespace, pvcName)
if e == nil {
w.a.V(2).M(host).Info("PVC (%s/%s/%s/%s) found", namespace, host.GetName(), volumeMount.Name, pvcName)
return _pvc, volumeClaimTemplate, false, nil
Expand Down Expand Up @@ -221,7 +234,7 @@ func (w *worker) fetchPVC(
var errNilPVC = fmt.Errorf("nil PVC, nothing to reconcile")

// reconcilePVC reconciles specified PVC
func (w *worker) reconcilePVC(
func (w *StorageReconciler) reconcilePVC(
ctx context.Context,
pvc *core.PersistentVolumeClaim,
host *api.Host,
Expand All @@ -242,21 +255,21 @@ func (w *worker) reconcilePVC(

model.VolumeClaimTemplateApplyResourcesRequestsOnPVC(template, pvc)
pvc = w.task.creator.AdjustPVC(pvc, host, template)
return w.c.updatePersistentVolumeClaim(ctx, pvc)
return w.c.updateOrCreatePVC(ctx, pvc)
}

func (w *worker) deletePVC(ctx context.Context, pvc *core.PersistentVolumeClaim) bool {
func (w *StorageReconciler) deletePVC(ctx context.Context, pvc *core.PersistentVolumeClaim) bool {
w.a.V(1).M(pvc).F().S().Info("delete PVC with lost PV start: %s/%s", pvc.Namespace, pvc.Name)
defer w.a.V(1).M(pvc).F().E().Info("delete PVC with lost PV end: %s/%s", pvc.Namespace, pvc.Name)

w.a.V(2).M(pvc).F().Info("PVC with lost PV about to be deleted: %s/%s", pvc.Namespace, pvc.Name)
w.c.kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(ctx, pvc.Name, controller.NewDeleteOptions())
w.c.kubePVCDelete(ctx, pvc.Namespace, pvc.Name)

for i := 0; i < 360; i++ {

// Check availability
w.a.V(2).M(pvc).F().Info("check PVC with lost PV availability: %s/%s", pvc.Namespace, pvc.Name)
curPVC, err := w.c.kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(ctx, pvc.Name, controller.NewGetOptions())
curPVC, err := w.c.kubePVCGet(ctx, pvc.Namespace, pvc.Name)
if err != nil {
if apiErrors.IsNotFound(err) {
// Not available - concider to bbe deleted
Expand All @@ -269,7 +282,7 @@ func (w *worker) deletePVC(ctx context.Context, pvc *core.PersistentVolumeClaim)
if len(curPVC.Finalizers) > 0 {
w.a.V(2).M(pvc).F().Info("clean finalizers for PVC with lost PV: %s/%s", pvc.Namespace, pvc.Name)
curPVC.Finalizers = nil
w.c.updatePersistentVolumeClaim(ctx, curPVC)
w.c.updateOrCreatePVC(ctx, curPVC)
}
time.Sleep(10 * time.Second)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/chi/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type worker struct {
normalizer *normalizer.Normalizer
schemer *schemer.ClusterSchemer
start time.Time
task task
task *task
}

// task represents context of a worker. This also can be called "a reconcile task"
Expand All @@ -70,8 +70,8 @@ type task struct {
}

// newTask creates new context
func newTask(creator *commonCreator.Creator) task {
return task{
func newTask(creator *commonCreator.Creator) *task {
return &task{
creator: creator,
registryReconciled: model.NewRegistry(),
registryFailed: model.NewRegistry(),
Expand Down

0 comments on commit afdff67

Please sign in to comment.