Skip to content

Commit

Permalink
enhancement (CollaSet): fix review issues
Browse files Browse the repository at this point in the history
  • Loading branch information
wu8685 committed Aug 12, 2023
1 parent aecafdb commit be73018
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 62 deletions.
8 changes: 2 additions & 6 deletions apis/apps/v1alpha1/well_known_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,10 @@ const (
PodCompleteLabelPrefix = "complete.lifecycle.kafed.kusionstack.io" // indicate a pod has finished all phases

PodServiceAvailableLabel = "kafed.kusionstack.io/service-available" // indicate a pod is available to serve
// --- Begin: Labels for CollaSet ---

CollaSetUpdateIndicateLabelKey = "apps.kafed.kusionstack.io/update-included"
PodInstanceIDLabelKey = "kafed.kusionstack.io/pod-instance-id"
PodScalingInLabelKey = "apps.kafed.kusionstack.io/scaling-in"

// --- End: Labels for CollaSet ---
CollaSetUpdateIndicateLabelKey = "collaset.kafed.kusionstack.io/update-included"

PodInstanceIDLabelKey = "kafed.kusionstack.io/pod-instance-id"
PodDeletionIndicationLabelKey = "kafed.kusionstack.io/to-delete" // Users can use this label to indicate a pod to delete
)

Expand Down
6 changes: 5 additions & 1 deletion pkg/controllers/collaset/collaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,14 @@ func DoReconcile(instance *appsv1alpha1.CollaSet, updatedRevision *appsv1.Contro
return calculateStatus(instance, newStatus, updatedRevision, podWrappers, syncErr), syncErr
}

// doSync is responsible for reconcile Pods with CollaSet spec.
// 1. sync Pods to prepare information, especially IDs, for following Scale and Update
// 2. scale Pods to match the Pod number indicated in `spec.replcas`. if an error thrown out or Pods is not matched recently, update will be skipped.
// 3. update Pods, to update each Pod to the updated revision indicated by `spec.template`
func doSync(instance *appsv1alpha1.CollaSet, updatedRevision *appsv1.ControllerRevision, revisions []*appsv1.ControllerRevision, newStatus *appsv1alpha1.CollaSetStatus) ([]*collasetutils.PodWrapper, *appsv1alpha1.CollaSetStatus, error) {
filteredPods, err := podControl.GetFilteredPods(instance.Spec.Selector, instance)
if err != nil {
return nil, newStatus, err
return nil, newStatus, fmt.Errorf("fail to get filtered Pods: %s", err)
}

synced, podWrappers, ownedIDs, err := syncControl.SyncPods(instance, filteredPods, updatedRevision, newStatus)
Expand Down
24 changes: 3 additions & 21 deletions pkg/controllers/collaset/podcontrol/pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,19 @@ import (
"context"
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "kusionstack.io/kafed/apis/apps/v1alpha1"
controllerutils "kusionstack.io/kafed/pkg/controllers/utils"
refmanagerutil "kusionstack.io/kafed/pkg/controllers/utils/refmanager"
"kusionstack.io/kafed/pkg/utils/inject"
)

type Interface interface {
GetFilteredPods(selector *metav1.LabelSelector, owner client.Object) ([]*corev1.Pod, error)
CreatePod(pod *corev1.Pod, revision *appsv1.ControllerRevision) (*corev1.Pod, error)
CreatePod(pod *corev1.Pod) (*corev1.Pod, error)
DeletePod(pod *corev1.Pod) error
UpdatePod(pod *corev1.Pod) error
}
Expand Down Expand Up @@ -70,9 +67,9 @@ func (pc *RealPodControl) GetFilteredPods(selector *metav1.LabelSelector, owner
return filteredPods, nil
}

func (pc *RealPodControl) CreatePod(pod *corev1.Pod, revision *appsv1.ControllerRevision) (*corev1.Pod, error) {
func (pc *RealPodControl) CreatePod(pod *corev1.Pod) (*corev1.Pod, error) {
if err := pc.client.Create(context.TODO(), pod); err != nil {
return nil, fmt.Errorf("fail to create Pod with revision %s: %s", revision.Name, err)
return nil, fmt.Errorf("fail to create Pod: %s", err)
}

return pod, nil
Expand Down Expand Up @@ -127,18 +124,3 @@ func FilterOutInactivePod(pods []corev1.Pod) []*corev1.Pod {
func IsPodInactive(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed
}

func NewPodFrom(cs *appsv1alpha1.CollaSet, revision *appsv1.ControllerRevision) (*corev1.Pod, error) {
pod, err := controllerutils.GetPodFromRevision(revision)
if err != nil {
return pod, err
}

pod.Namespace = cs.Namespace
pod.GenerateName = controllerutils.GetPodsPrefix(cs.Name)
pod.OwnerReferences = append(pod.OwnerReferences, *metav1.NewControllerRef(cs, appsv1alpha1.GroupVersion.WithKind("CollaSet")))

pod.Labels[appsv1.ControllerRevisionHashLabelKey] = revision.Name

return pod, nil
}
69 changes: 48 additions & 21 deletions pkg/controllers/collaset/synccontrol/sync_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "kusionstack.io/kafed/apis/apps/v1alpha1"
Expand Down Expand Up @@ -120,6 +121,7 @@ func (sc *RealSyncControl) SyncPods(instance *appsv1alpha1.CollaSet, filteredPod
// 2. do not filter out these terminating Pods

if needUpdateContext {
klog.V(1).Infof("try to update ResourceContext for CollaSet %s/%s when sync", instance.Namespace, instance.Name)
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return podcontext.UpdateToPodContext(sc.client, instance, ownedIDs)
}); err != nil {
Expand Down Expand Up @@ -157,13 +159,14 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll
// TODO use cache
pod, err := controllerutils.NewPodFrom(set, metav1.NewControllerRef(set, appsv1alpha1.GroupVersion.WithKind("CollaSet")), revision)
if err != nil {
return fmt.Errorf("fail to new pod from revision %s: %s", updatedRevision.Name, err)
return fmt.Errorf("fail to new Pod from revision %s: %s", revision.Name, err)
}
newPod := pod.DeepCopy()
// allocate new Pod a instance ID
newPod.Labels[appsv1alpha1.PodInstanceIDLabelKey] = fmt.Sprintf("%d", availableIDContext.ID)

if pod, err := sc.podControl.CreatePod(newPod, updatedRevision); err == nil {
klog.V(1).Info("try to create Pod with revision %s from CollaSet %s/%s", revision.Name, set.Namespace, set.Name)
if pod, err := sc.podControl.CreatePod(newPod); err == nil {
// add an expectation for this pod creation, before next reconciling
if err := collasetutils.ActiveExpectations.ExpectCreate(set, expectations.Pod, pod.Name); err != nil {
return err
Expand All @@ -172,12 +175,12 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll
return err
})

sc.recorder.Eventf(set, corev1.EventTypeNormal, "ScaleOut", "scale out %d Pod(s)", succCount)
if err != nil {
collasetutils.AddOrUpdateCondition(newStatus, appsv1alpha1.CollaSetScale, err, "ScaleOutFailed", err.Error())
return succCount > 0, err
} else {
collasetutils.AddOrUpdateCondition(newStatus, appsv1alpha1.CollaSetScale, nil, "ScaleOut", "")
}
collasetutils.AddOrUpdateCondition(newStatus, appsv1alpha1.CollaSetScale, nil, "ScaleOut", "")

return succCount > 0, err
} else if diff < 0 {
Expand All @@ -197,9 +200,11 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll
pod := <-podCh

// trigger PodOpsLifecycle with scaleIn OperationType
klog.V(1).Infof("try to begin PodOpsLifecycle for scaling in Pod %s/%s in CollaSet %s/%s", pod.Namespace, pod.Name, set.Namespace, set.Name)
if updated, err := podopslifecycle.Begin(sc.client, collasetutils.ScaleInOpsLifecycleAdapter, pod.Pod); err != nil {
return fmt.Errorf("fail to begin PodOpsLifecycle for Scaling in Pod %s/%s: %s", pod.Namespace, pod.Name, err)
} else if updated {
sc.recorder.Eventf(pod.Pod, corev1.EventTypeNormal, "BeginScaleInLifecycle", "succeed to begin PodOpsLifecycle for scaling in")
// add an expectation for this pod creation, before next reconciling
if err := collasetutils.ActiveExpectations.ExpectUpdate(set, expectations.Pod, pod.Name, pod.ResourceVersion); err != nil {
return err
Expand All @@ -220,7 +225,7 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll
needUpdateContext := false
for i, podWrapper := range podsToScaleIn {
if !podopslifecycle.AllowOps(collasetutils.ScaleInOpsLifecycleAdapter, podWrapper.Pod) && podWrapper.DeletionTimestamp == nil {
sc.recorder.Eventf(podWrapper.Pod.DeepCopy(), corev1.EventTypeNormal, "PodOpsLifecyclePromoting", "waiting for PodOpsLifecycle to promote")
sc.recorder.Eventf(podWrapper.Pod, corev1.EventTypeNormal, "PodScaleInLifecycle", "Pod is not allowed to scale in")
continue
}

Expand All @@ -239,12 +244,13 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll

// mark these Pods to scalingIn
if needUpdateContext {
klog.V(1).Infof("try to update ResourceContext for CollaSet %s/%s when scaling in Pod", set.Namespace, set.Name)
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return podcontext.UpdateToPodContext(sc.client, set, ownedIDs)
})

if err != nil {
collasetutils.AddOrUpdateCondition(newStatus, appsv1alpha1.CollaSetScale, err, "ScaleInFailed", fmt.Sprintf("fail to update Context for scaling in: %s", err))
collasetutils.AddOrUpdateCondition(newStatus, appsv1alpha1.CollaSetScale, err, "ScaleInFailed", fmt.Sprintf("failed to update Context for scaling in: %s", err))
return scaling, err
} else {
collasetutils.AddOrUpdateCondition(newStatus, appsv1alpha1.CollaSetScale, nil, "ScaleIn", "")
Expand All @@ -254,17 +260,25 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll
// do delete Pod resource
succCount, err = controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error {
pod := <-podCh
if err := sc.podControl.DeletePod(pod.Pod); err == nil {
if err := collasetutils.ActiveExpectations.ExpectDelete(set, expectations.Pod, pod.Name); err != nil {
return err
}
klog.V(1).Infof("try to scale in Pod %s/%s", pod.Namespace, pod.Name)
if err := sc.podControl.DeletePod(pod.Pod); err != nil {
return fmt.Errorf("fail to delete Pod %s/%s when scaling in: %s", pod.Namespace, pod.Name, err)
}

sc.recorder.Eventf(set, corev1.EventTypeNormal, "PodDeleted", "succeed to scale in Pod %s/%s", pod.Namespace, pod.Name)
if err := collasetutils.ActiveExpectations.ExpectDelete(set, expectations.Pod, pod.Name); err != nil {
return err
}

// TODO also need to delete PVC from PVC template here

return nil
})
scaling := scaling || succCount > 0

if succCount > 0 {
sc.recorder.Eventf(set, corev1.EventTypeNormal, "ScaleIn", "scale in %d Pod(s)", succCount)
}
if err != nil {
collasetutils.AddOrUpdateCondition(newStatus, appsv1alpha1.CollaSetScale, err, "ScaleInFailed", fmt.Sprintf("fail to delete Pod for scaling in: %s", err))
return scaling, err
Expand All @@ -285,6 +299,7 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll
}

if needUpdatePodContext {
klog.V(1).Infof("try to update ResourceContext for CollaSet %s/%s after scaling", set.Namespace, set.Name)
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return podcontext.UpdateToPodContext(sc.client, set, ownedIDs)
}); err != nil {
Expand All @@ -311,12 +326,12 @@ func extractAvailableContexts(diff int, ownedIDs map[int]*appsv1alpha1.ContextDe
return availableContexts
}

func (sc *RealSyncControl) Update(instance *appsv1alpha1.CollaSet, podWrapers []*collasetutils.PodWrapper, revisions []*appsv1.ControllerRevision, updatedRevision *appsv1.ControllerRevision, ownedIDs map[int]*appsv1alpha1.ContextDetail, newStatus *appsv1alpha1.CollaSetStatus) (bool, error) {
func (sc *RealSyncControl) Update(set *appsv1alpha1.CollaSet, podWrapers []*collasetutils.PodWrapper, revisions []*appsv1.ControllerRevision, updatedRevision *appsv1.ControllerRevision, ownedIDs map[int]*appsv1alpha1.ContextDetail, newStatus *appsv1alpha1.CollaSetStatus) (bool, error) {
// 1. scan and analysis pods update info
podUpdateInfos := attachPodUpdateInfo(podWrapers, revisions, updatedRevision)

// 2. decide Pod update candidates
podToUpdate := decidePodToUpdate(instance, podUpdateInfos)
podToUpdate := decidePodToUpdate(set, podUpdateInfos)

// 3. prepare Pods to begin PodOpsLifecycle
podCh := make(chan *PodUpdateInfo, len(podToUpdate))
Expand All @@ -333,16 +348,17 @@ func (sc *RealSyncControl) Update(instance *appsv1alpha1.CollaSet, podWrapers []
}

// 4. begin podOpsLifecycle parallel
updater := newPodUpdater(instance)
updater := newPodUpdater(set)
updating := false
succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(_ int, err error) error {
podInfo := <-podCh

klog.V(1).Infof("try to begin PodOpsLifecycle for updating Pod %s/%s of CollaSet %s/%s", podInfo.Namespace, podInfo.Name, set.Namespace, set.Name)
if updated, err := podopslifecycle.Begin(sc.client, utils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil {
return fmt.Errorf("fail to begin PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
} else if updated {
// add an expectation for this pod update, before next reconciling
if err := collasetutils.ActiveExpectations.ExpectUpdate(instance, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
if err := collasetutils.ActiveExpectations.ExpectUpdate(set, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
return err
}
}
Expand All @@ -362,6 +378,7 @@ func (sc *RealSyncControl) Update(instance *appsv1alpha1.CollaSet, podWrapers []
for i := range podToUpdate {
podInfo := podToUpdate[i]
if !podopslifecycle.AllowOps(utils.UpdateOpsLifecycleAdapter, podInfo) {
sc.recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "Pod is not allowed to update")
continue
}

Expand All @@ -380,8 +397,9 @@ func (sc *RealSyncControl) Update(instance *appsv1alpha1.CollaSet, podWrapers []

// 5. mark Pod to use updated revision before updating it.
if needUpdateContext {
klog.V(1).Infof("try to update ResourceContext for CollaSet %s/%s", set.Namespace, set.Name)
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return podcontext.UpdateToPodContext(sc.client, instance, ownedIDs)
return podcontext.UpdateToPodContext(sc.client, set, ownedIDs)
})

if err != nil {
Expand All @@ -397,29 +415,37 @@ func (sc *RealSyncControl) Update(instance *appsv1alpha1.CollaSet, podWrapers []
podInfo := <-podCh

// analyse Pod to get update information
inPlaceSupport, onlyMetadataChanged, updatedPod, err := updater.AnalyseAndGetUpdatedPod(instance, updatedRevision, podInfo)
inPlaceSupport, onlyMetadataChanged, updatedPod, err := updater.AnalyseAndGetUpdatedPod(set, updatedRevision, podInfo)
if err != nil {
return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err)
}

klog.V(1).Infof("Pod %s/%s update operation from revision %s to revision %s, is [%t] in-place update supported and [%t] only has metadata changed.",
podInfo.Namespace, podInfo.Name, podInfo.CurrentRevision.Name, updatedRevision.Namespace, inPlaceSupport, onlyMetadataChanged)
if onlyMetadataChanged || inPlaceSupport {
// 6.1 if pod template changes only include metadata or support in-place update, just apply these changes to pod directly
if err = sc.podControl.UpdatePod(updatedPod); err == nil {
if err = sc.podControl.UpdatePod(updatedPod); err != nil {
return fmt.Errorf("fail to update Pod %s/%s when updating by in-place: %s", podInfo.Namespace, podInfo.Name, err)
} else {
podInfo.Pod = updatedPod
if err := collasetutils.ActiveExpectations.ExpectUpdate(instance, expectations.Pod, podInfo.Name, updatedPod.ResourceVersion); err != nil {
sc.recorder.Eventf(podInfo.Pod, corev1.EventTypeNormal, "UpdatePod", "succeed to update Pod %s/%s to from revision %s to revision %s by in-place", podInfo.Namespace, podInfo.Name, podInfo.CurrentRevision.Name, updatedRevision.Name)
if err := collasetutils.ActiveExpectations.ExpectUpdate(set, expectations.Pod, podInfo.Name, updatedPod.ResourceVersion); err != nil {
return err
}
}
} else {
// 6.2 if pod has changes not in-place supported, recreate it
if err = sc.podControl.DeletePod(podInfo.Pod); err != nil {
if err := collasetutils.ActiveExpectations.ExpectDelete(instance, expectations.Pod, podInfo.Name); err != nil {
return fmt.Errorf("fail to delete Pod %s/%s when updating by recreate: %s", podInfo.Namespace, podInfo.Name, err)
} else {
sc.recorder.Eventf(podInfo.Pod, corev1.EventTypeNormal, "UpdatePod", "succeed to update Pod %s/%s to from revision %s to revision %s by recreate", podInfo.Namespace, podInfo.Name, podInfo.CurrentRevision.Name, updatedRevision.Name)
if err := collasetutils.ActiveExpectations.ExpectDelete(set, expectations.Pod, podInfo.Name); err != nil {
return err
}
}
}

return err
return nil
})

updating = updating || succCount > 0
Expand All @@ -441,11 +467,12 @@ func (sc *RealSyncControl) Update(instance *appsv1alpha1.CollaSet, podWrapers []
}

if finished {
klog.V(1).Infof("try to finish update PodOpsLifecycle for Pod %s/%s", podInfo.Namespace, podInfo.Name)
if updated, err := podopslifecycle.Finish(sc.client, utils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil {
return fmt.Errorf("fail to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
} else if updated {
// add an expectation for this pod update, before next reconciling
if err := collasetutils.ActiveExpectations.ExpectUpdate(instance, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
if err := collasetutils.ActiveExpectations.ExpectUpdate(set, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
return err
}
sc.recorder.Eventf(podInfo.Pod, corev1.EventTypeNormal, "UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name)
Expand Down
10 changes: 1 addition & 9 deletions pkg/controllers/collaset/utils/lifecycle_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ limitations under the License.
package utils

import (
"fmt"
"time"

"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "kusionstack.io/kafed/apis/apps/v1alpha1"
Expand Down Expand Up @@ -94,12 +91,7 @@ func (a *CollaSetScaleInOpsLifecycleAdapter) AllowMultiType() bool {

// WhenBegin will be executed when begin a lifecycle
func (a *CollaSetScaleInOpsLifecycleAdapter) WhenBegin(pod client.Object) (bool, error) {
if _, exist := pod.GetLabels()[appsv1alpha1.PodScalingInLabelKey]; exist {
return false, nil
}

pod.GetLabels()[appsv1alpha1.PodScalingInLabelKey] = fmt.Sprintf("%d", time.Now().UnixNano())
return true, nil
return false, nil
}

// WhenFinish will be executed when finish a lifecycle
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/poddeletion/poddeletion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ func AddToMgr(mgr ctrl.Manager, r reconcile.Reconciler) error {
return nil
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// Reconcile aims to delete Pod through PodOpsLifecycle. It will watch Pod with label `kafed.kusionstack.io/to-delete`.
// If a Pod is labeled, controller will first trigger a deletion PodOpsLifecycle. If all conditions are satisfied,
// it will then delete Pod.
func (r *PodDeletionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
instance := &corev1.Pod{}
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ func AddToMgr(mgr ctrl.Manager, r reconcile.Reconciler) error {
return nil
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// Reconcile aims to reclaim ResourceContext which is not in used which means the ResourceContext contains no Context.
func (r *ResourceContextReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
instance := &appsv1alpha1.ResourceContext{}
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
Expand Down

0 comments on commit be73018

Please sign in to comment.