Skip to content

Commit

Permalink
chore: support available condition for instanceSet and update the suc…
Browse files Browse the repository at this point in the history
…ceess condition check for h-scale opsRequest
  • Loading branch information
wangyelei committed Jun 23, 2024
1 parent 8ed6732 commit a092891
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 33 deletions.
10 changes: 10 additions & 0 deletions apis/workloads/v1alpha1/instanceset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,10 @@ const (
// Or, a NotReady reason with not ready instances encoded in the Message filed will be set.
InstanceReady ConditionType = "InstanceReady"

// InstanceAvailable ConditionStatus will be True if all instances(pods) are in the ready condition
// and continue for "MinReadySeconds" seconds. Otherwise, it will be set to False.
InstanceAvailable ConditionType = "InstanceAvailable"

// InstanceFailure is added in an instance set when at least one of its instances(pods) is in a `Failed` phase.
InstanceFailure ConditionType = "InstanceFailure"
)
Expand All @@ -648,6 +652,12 @@ const (
// ReasonReady is a reason for condition InstanceReady.
ReasonReady = "Ready"

// ReasonNotAvailable is a reason for condition InstanceAvailable.
ReasonNotAvailable = "NotAvailable"

// ReasonAvailable is a reason for condition InstanceAvailable.
ReasonAvailable = "Available"

// ReasonInstanceFailure is a reason for condition InstanceFailure.
ReasonInstanceFailure = "InstanceFailure"
)
Expand Down
4 changes: 4 additions & 0 deletions controllers/apps/operations/ops_progress_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ func handleScaleOutProgressWithInstanceSet(
compStatus *appsv1alpha1.OpsRequestComponentStatus) (completedCount int32, err error) {
currPodRevisionMap, _ := instanceset.GetRevisions(its.Status.CurrentRevisions)
notReadyPodSet := instanceset.GetPodNameSetFromInstanceSetCondition(its, workloads.InstanceReady)
notAvailablePodSet := instanceset.GetPodNameSetFromInstanceSetCondition(its, workloads.InstanceAvailable)
failurePodSet := instanceset.GetPodNameSetFromInstanceSetCondition(its, workloads.InstanceFailure)
pgRes.opsMessageKey = "Create"
memberStatusMap := map[string]sets.Empty{}
Expand All @@ -479,6 +480,9 @@ func handleScaleOutProgressWithInstanceSet(
updateProgressDetailForHScale(opsRes, pgRes, compStatus, objectKey, appsv1alpha1.ProcessingProgressStatus)
continue
}
if _, ok := notAvailablePodSet[podName]; ok {
continue
}
if _, ok := memberStatusMap[podName]; !ok && needToCheckRole(pgRes) {
continue
}
Expand Down
8 changes: 6 additions & 2 deletions controllers/apps/opsrequest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,9 @@ func (r *OpsRequestReconciler) reconcileStatusDuringRunningOrCanceling(reqCtx in
opsRequest := opsRes.OpsRequest
// wait for OpsRequest.status.phase to Succeed
if requeueAfter, err := operations.GetOpsManager().Reconcile(reqCtx, r.Client, opsRes); err != nil {
r.Recorder.Eventf(opsRequest, corev1.EventTypeWarning, reasonOpsReconcileStatusFailed, "Failed to reconcile the status of OpsRequest: %s", err.Error())
if !apierrors.IsConflict(err) {
r.Recorder.Eventf(opsRequest, corev1.EventTypeWarning, reasonOpsReconcileStatusFailed, "Failed to reconcile the status of OpsRequest: %s", err.Error())
}
return intctrlutil.ResultToP(intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, ""))
} else if requeueAfter != 0 {
// if the reconcileAction need requeue, do it
Expand Down Expand Up @@ -300,7 +302,9 @@ func (r *OpsRequestReconciler) doOpsRequestAction(reqCtx intctrlutil.RequestCtx,
opsDeepCopy := opsRequest.DeepCopy()
res, err := operations.GetOpsManager().Do(reqCtx, r.Client, opsRes)
if err != nil {
r.Recorder.Eventf(opsRequest, corev1.EventTypeWarning, reasonOpsDoActionFailed, "Failed to process the operation of OpsRequest: %s", err.Error())
if !apierrors.IsConflict(err) {
r.Recorder.Eventf(opsRequest, corev1.EventTypeWarning, reasonOpsDoActionFailed, "Failed to process the operation of OpsRequest: %s", err.Error())
}
if !reflect.DeepEqual(opsRequest.Status, opsDeepCopy.Status) {
if patchErr := r.Client.Status().Patch(reqCtx.Ctx, opsRequest, client.MergeFrom(opsDeepCopy)); patchErr != nil {
return intctrlutil.ResultToP(intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, ""))
Expand Down
1 change: 1 addition & 0 deletions controllers/apps/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func copyAndMergeITS(oldITS, newITS *workloads.InstanceSet, synthesizeComp *comp
itsObjCopy.Spec.Credential = itsProto.Spec.Credential
itsObjCopy.Spec.Instances = itsProto.Spec.Instances
itsObjCopy.Spec.OfflineInstances = itsProto.Spec.OfflineInstances
itsObjCopy.Spec.MinReadySeconds = itsProto.Spec.MinReadySeconds

if itsProto.Spec.UpdateStrategy.Type != "" || itsProto.Spec.UpdateStrategy.RollingUpdate != nil {
updateUpdateStrategy(itsObjCopy, itsProto)
Expand Down
5 changes: 4 additions & 1 deletion controllers/workloads/instanceset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ func (r *InstanceSetReconciler) Reconcile(ctx context.Context, req ctrl.Request)
Do(instanceset.NewReplicasAlignmentReconciler()).
Do(instanceset.NewUpdateReconciler()).
Commit()
if re, ok := err.(intctrlutil.DelayedRequeueError); ok {
return intctrlutil.RequeueAfter(re.RequeueAfter(), logger, re.Reason())
}
requeue := false
if err != nil && apierrors.IsConflict(err) {
if apierrors.IsConflict(err) {
requeue = true
err = nil
}
Expand Down
50 changes: 41 additions & 9 deletions pkg/controller/instanceset/reconciler_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package instanceset

import (
"encoding/json"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -68,6 +69,7 @@ func (r *statusReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (*kubebuilde
currentReplicas, updatedReplicas := int32(0), int32(0)
readyReplicas, availableReplicas := int32(0), int32(0)
notReadyNames := sets.New[string]()
notAvailableNames := sets.New[string]()
currentRevisions := map[string]string{}
for _, pod := range podList {
currentRevisions[pod.Name] = getPodRevision(pod)
Expand All @@ -80,6 +82,8 @@ func (r *statusReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (*kubebuilde
notReadyNames.Delete(pod.Name)
if isRunningAndAvailable(pod, its.Spec.MinReadySeconds) {
availableReplicas++
} else {
notAvailableNames.Insert(pod.Name)
}
}
if isCreated(pod) && !isTerminating(pod) {
Expand Down Expand Up @@ -116,6 +120,12 @@ func (r *statusReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (*kubebuilde
}
meta.SetStatusCondition(&its.Status.Conditions, *readyCondition)

availableCondition, err := buildAvailableCondition(its, availableReplicas >= replicas, notAvailableNames)
if err != nil {
return nil, err
}
meta.SetStatusCondition(&its.Status.Conditions, *availableCondition)

// 3. set InstanceFailure condition
failureCondition, err := buildFailureCondition(its, podList)
if err != nil {
Expand All @@ -134,9 +144,19 @@ func (r *statusReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (*kubebuilde
// TODO(free6om): should put this field to the spec
setReadyWithPrimary(its, podList)

if its.Spec.MinReadySeconds > 0 && availableReplicas != readyReplicas {
return tree, intctrlutil.NewDelayedRequeueError(time.Second, "requeue for right status update")
}
return tree, nil
}

func buildConditionMessageWithNames(podNames []string) ([]byte, error) {
baseSort(podNames, func(i int) (string, int) {
return ParseParentNameAndOrdinal(podNames[i])
}, nil, true)
return json.Marshal(podNames)
}

func buildReadyCondition(its *workloads.InstanceSet, ready bool, notReadyNames sets.Set[string]) (*metav1.Condition, error) {
condition := &metav1.Condition{
Type: string(workloads.InstanceReady),
Expand All @@ -147,11 +167,26 @@ func buildReadyCondition(its *workloads.InstanceSet, ready bool, notReadyNames s
if !ready {
condition.Status = metav1.ConditionFalse
condition.Reason = workloads.ReasonNotReady
names := notReadyNames.UnsortedList()
baseSort(names, func(i int) (string, int) {
return ParseParentNameAndOrdinal(names[i])
}, nil, true)
message, err := json.Marshal(names)
message, err := buildConditionMessageWithNames(notReadyNames.UnsortedList())
if err != nil {
return nil, err
}
condition.Message = string(message)
}
return condition, nil
}

func buildAvailableCondition(its *workloads.InstanceSet, available bool, notAvailableNames sets.Set[string]) (*metav1.Condition, error) {
condition := &metav1.Condition{
Type: string(workloads.InstanceAvailable),
Status: metav1.ConditionTrue,
ObservedGeneration: its.Generation,
Reason: workloads.ReasonAvailable,
}
if !available {
condition.Status = metav1.ConditionFalse
condition.Reason = workloads.ReasonNotAvailable
message, err := buildConditionMessageWithNames(notAvailableNames.UnsortedList())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -180,10 +215,7 @@ func buildFailureCondition(its *workloads.InstanceSet, pods []*corev1.Pod) (*met
if len(failureNames) == 0 {
return nil, nil
}
baseSort(failureNames, func(i int) (string, int) {
return ParseParentNameAndOrdinal(failureNames[i])
}, nil, true)
message, err := json.Marshal(failureNames)
message, err := buildConditionMessageWithNames(failureNames)
if err != nil {
return nil, err
}
Expand Down
32 changes: 20 additions & 12 deletions pkg/controller/instanceset/reconciler_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/controller/builder"
"github.com/apecloud/kubeblocks/pkg/controller/kubebuilderx"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

var _ = Describe("status reconciler test", func() {
Expand Down Expand Up @@ -109,39 +110,44 @@ var _ = Describe("status reconciler test", func() {
condition := corev1.PodCondition{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.NewTime(time.Now().Add(-1 * minReadySeconds * time.Second)),
LastTransitionTime: metav1.NewTime(time.Now()),
}
makePodAvailableWithRevision := func(pod *corev1.Pod, revision string) {
makePodAvailableWithRevision := func(pod *corev1.Pod, revision string, updatePodAvailable bool) {
pod.Labels[appsv1.ControllerRevisionHashLabelKey] = revision
pod.Status.Phase = corev1.PodRunning
pod.Status.Conditions = append(pod.Status.Conditions, condition)
if updatePodAvailable {
condition.LastTransitionTime = metav1.NewTime(time.Now().Add(-1 * minReadySeconds * time.Second))
}
pod.Status.Conditions = []corev1.PodCondition{condition}
}
pods := newTree.List(&corev1.Pod{})
currentRevisionMap := map[string]string{}
oldRevision := "old-revision"
for _, object := range pods {
pod, ok := object.(*corev1.Pod)
Expect(ok).Should(BeTrue())
makePodAvailableWithRevision(pod, oldRevision)
makePodAvailableWithRevision(pod, oldRevision, false)
currentRevisionMap[pod.Name] = oldRevision
}
_, err = reconciler.Reconcile(newTree)
Expect(err).Should(BeNil())
Expect(intctrlutil.IsDelayedRequeueError(err)).Should(BeTrue())
Expect(its.Status.Replicas).Should(BeEquivalentTo(replicas))
Expect(its.Status.ReadyReplicas).Should(BeEquivalentTo(replicas))
Expect(its.Status.AvailableReplicas).Should(BeEquivalentTo(replicas))
Expect(its.Status.AvailableReplicas).Should(BeEquivalentTo(0))
Expect(its.Status.UpdatedReplicas).Should(BeEquivalentTo(0))
Expect(its.Status.CurrentReplicas).Should(BeEquivalentTo(replicas))
currentRevisions, _ := buildRevisions(currentRevisionMap)
Expect(its.Status.CurrentRevisions).Should(Equal(currentRevisions))
Expect(its.Status.Conditions[1].Type).Should(BeEquivalentTo(workloads.InstanceAvailable))
Expect(its.Status.Conditions[1].Status).Should(BeEquivalentTo(corev1.ConditionFalse))

By("make all pods available with latest revision")
updateRevisions, err := GetRevisions(its.Status.UpdateRevisions)
Expect(err).Should(BeNil())
for _, object := range pods {
pod, ok := object.(*corev1.Pod)
Expect(ok).Should(BeTrue())
makePodAvailableWithRevision(pod, updateRevisions[pod.Name])
makePodAvailableWithRevision(pod, updateRevisions[pod.Name], true)
}
_, err = reconciler.Reconcile(newTree)
Expect(err).Should(BeNil())
Expand All @@ -151,7 +157,9 @@ var _ = Describe("status reconciler test", func() {
Expect(its.Status.UpdatedReplicas).Should(BeEquivalentTo(replicas))
Expect(its.Status.CurrentReplicas).Should(BeEquivalentTo(replicas))
Expect(its.Status.CurrentRevisions).Should(Equal(its.Status.UpdateRevisions))
Expect(its.Status.Conditions).Should(HaveLen(1))
Expect(its.Status.Conditions).Should(HaveLen(2))
Expect(its.Status.Conditions[1].Type).Should(BeEquivalentTo(workloads.InstanceAvailable))
Expect(its.Status.Conditions[1].Status).Should(BeEquivalentTo(corev1.ConditionTrue))

By("make all pods failed")
for _, object := range pods {
Expand All @@ -167,17 +175,17 @@ var _ = Describe("status reconciler test", func() {
Expect(its.Status.UpdatedReplicas).Should(BeEquivalentTo(replicas))
Expect(its.Status.CurrentReplicas).Should(BeEquivalentTo(replicas))
Expect(its.Status.CurrentRevisions).Should(Equal(its.Status.UpdateRevisions))
Expect(its.Status.Conditions).Should(HaveLen(2))
Expect(its.Status.Conditions).Should(HaveLen(3))
failureNames := []string{"bar-0", "bar-1", "bar-2", "bar-3", "bar-foo-0", "bar-foo-1", "bar-hello-0"}
message, err := json.Marshal(failureNames)
Expect(err).Should(BeNil())
Expect(its.Status.Conditions[0].Type).Should(BeEquivalentTo(workloads.InstanceReady))
Expect(its.Status.Conditions[0].Status).Should(BeEquivalentTo(metav1.ConditionFalse))
Expect(its.Status.Conditions[0].Reason).Should(BeEquivalentTo(workloads.ReasonNotReady))
Expect(its.Status.Conditions[0].Message).Should(BeEquivalentTo(message))
Expect(its.Status.Conditions[1].Type).Should(BeEquivalentTo(workloads.InstanceFailure))
Expect(its.Status.Conditions[1].Reason).Should(BeEquivalentTo(workloads.ReasonInstanceFailure))
Expect(its.Status.Conditions[1].Message).Should(BeEquivalentTo(message))
Expect(its.Status.Conditions[2].Type).Should(BeEquivalentTo(workloads.InstanceFailure))
Expect(its.Status.Conditions[2].Reason).Should(BeEquivalentTo(workloads.ReasonInstanceFailure))
Expect(its.Status.Conditions[2].Message).Should(BeEquivalentTo(message))
})
})

Expand Down
19 changes: 10 additions & 9 deletions pkg/controller/kubebuilderx/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/apecloud/kubeblocks/pkg/controller/graph"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

// TODO(free6om): this is a new reconciler framework in the very early stage leaving the following tasks to do:
Expand Down Expand Up @@ -100,22 +100,23 @@ func (c *controller) Do(reconcilers ...Reconciler) Controller {
func (c *controller) Commit() error {
defer c.emitFailureEvent()

if c.err != nil {
if c.err != nil && !intctrlutil.IsDelayedRequeueError(c.err) {
return c.err
}
if c.oldTree.GetRoot() == nil {
return nil
}
builder := NewPlanBuilder(c.ctx, c.cli, c.oldTree, c.tree, c.recorder, c.logger)
if c.err = builder.Init(); c.err != nil {
return c.err
if err := builder.Init(); err != nil {
return err
}
var plan graph.Plan
plan, c.err = builder.Build()
if c.err != nil {
return c.err
plan, err := builder.Build()
if err != nil {
return err
}
if err = plan.Execute(); err != nil {
return err
}
c.err = plan.Execute()
return c.err
}

Expand Down

0 comments on commit a092891

Please sign in to comment.