Skip to content

Commit

Permalink
chore: support to run a opsRequest after the dependent opsRequests ar…
Browse files Browse the repository at this point in the history
…e succeed. (#7594)
  • Loading branch information
wangyelei committed Jun 21, 2024
1 parent 7260d53 commit 8ed6732
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 2 deletions.
56 changes: 56 additions & 0 deletions controllers/apps/operations/ops_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package operations

import (
"slices"
"strings"
"sync"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

Expand Down Expand Up @@ -96,6 +100,15 @@ func (opsMgr *OpsManager) Do(reqCtx intctrlutil.RequestCtx, cli client.Client, o
return intctrlutil.ResultToP(intctrlutil.Reconciled())
}
}

// validate if the dependent ops have been successful
if pass, err := opsMgr.validateDependOnSuccessfulOps(reqCtx, cli, opsRes); intctrlutil.IsTargetError(err, intctrlutil.ErrorTypeFatal) {
return &ctrl.Result{}, patchValidateErrorCondition(reqCtx.Ctx, cli, opsRes, err.Error())
} else if err != nil {
return nil, err
} else if !pass {
return intctrlutil.ResultToP(intctrlutil.Reconciled())
}
opsDeepCopy := opsRequest.DeepCopy()
// save last configuration into status.lastConfiguration
if err = opsBehaviour.OpsHandler.SaveLastConfiguration(reqCtx, cli, opsRes); err != nil {
Expand Down Expand Up @@ -175,6 +188,49 @@ func (opsMgr *OpsManager) handleOpsCompleted(reqCtx intctrlutil.RequestCtx,
return PatchOpsStatus(reqCtx.Ctx, cli, opsRes, opsRequestPhase, completedCondition)
}

// validateDependOnOps validates if the dependent ops have been successful
func (opsMgr *OpsManager) validateDependOnSuccessfulOps(reqCtx intctrlutil.RequestCtx,
cli client.Client,
opsRes *OpsResource) (bool, error) {
dependentOpsStr := opsRes.OpsRequest.Annotations[constant.OpsDependentOnSuccessfulOpsAnnoKey]
if dependentOpsStr == "" {
return true, nil
}
opsNames := strings.Split(dependentOpsStr, ",")
for _, opsName := range opsNames {
ops := &appsv1alpha1.OpsRequest{}
if err := cli.Get(reqCtx.Ctx, client.ObjectKey{Name: opsName, Namespace: opsRes.OpsRequest.Namespace}, ops); err != nil {
if apierrors.IsNotFound(err) {
return false, intctrlutil.NewFatalError(err.Error())
}
return false, err
}
var relatedOpsArr []string
relatedOpsStr := ops.Annotations[constant.RelatedOpsAnnotationKey]
if relatedOpsStr != "" {
relatedOpsArr = strings.Split(relatedOpsStr, ",")
}
if !slices.Contains(relatedOpsArr, opsRes.OpsRequest.Name) {
// annotate to the dependent opsRequest
relatedOpsArr = append(relatedOpsArr, opsRes.OpsRequest.Name)
if ops.Annotations == nil {
ops.Annotations = map[string]string{}
}
ops.Annotations[constant.RelatedOpsAnnotationKey] = strings.Join(relatedOpsArr, ",")
if err := cli.Update(reqCtx.Ctx, ops); err != nil {
return false, err
}
}
if slices.Contains([]appsv1alpha1.OpsPhase{appsv1alpha1.OpsFailedPhase, appsv1alpha1.OpsCancelledPhase, appsv1alpha1.OpsAbortedPhase}, ops.Status.Phase) {
return false, PatchOpsStatus(reqCtx.Ctx, cli, opsRes, appsv1alpha1.OpsCancelledPhase)
}
if ops.Status.Phase != appsv1alpha1.OpsSucceedPhase {
return false, nil
}
}
return true, nil
}

func GetOpsManager() *OpsManager {
opsManagerOnce.Do(func() {
opsManager = &OpsManager{OpsMap: make(map[appsv1alpha1.OpsType]OpsBehaviour)}
Expand Down
56 changes: 56 additions & 0 deletions controllers/apps/operations/ops_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,5 +244,61 @@ var _ = Describe("OpsUtil functions", func() {
g.Expect(opsSlice).Should(BeEmpty())
})
})

It("Test opsRequest dependency", func() {
By("init operations resources ")
reqCtx := intctrlutil.RequestCtx{Ctx: testCtx.Ctx}
opsRes, _, _ := initOperationsResources(clusterDefinitionName, clusterVersionName, clusterName)

By("create a first horizontal opsRequest")
ops1 := createHorizontalScaling(clusterName, appsv1alpha1.HorizontalScaling{
ComponentOps: appsv1alpha1.ComponentOps{ComponentName: consensusComp},
ScaleIn: &appsv1alpha1.ScaleIn{
ReplicaChanger: appsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)},
},
})
opsRes.OpsRequest = ops1
_, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
Expect(opsRes.OpsRequest.Status.Phase).Should(Equal(appsv1alpha1.OpsCreatingPhase))

By("create another horizontal opsRequest with force flag and dependent the first opsRequest")
ops2 := createHorizontalScaling(clusterName, appsv1alpha1.HorizontalScaling{
ComponentOps: appsv1alpha1.ComponentOps{ComponentName: consensusComp},
ScaleOut: &appsv1alpha1.ScaleOut{
ReplicaChanger: appsv1alpha1.ReplicaChanger{ReplicaChanges: pointer.Int32(1)},
},
})
ops2.Annotations = map[string]string{constant.OpsDependentOnSuccessfulOpsAnnoKey: ops1.Name}
ops2.Spec.Force = true
opsRes.OpsRequest = ops2
_, err = GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
Expect(opsRes.OpsRequest.Status.Phase).Should(Equal(appsv1alpha1.OpsPendingPhase))
// expect the dependent ops has been annotated
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(ops1), func(g Gomega, ops *appsv1alpha1.OpsRequest) {
g.Expect(ops.Annotations[constant.RelatedOpsAnnotationKey]).Should(Equal(ops2.Name))
})).Should(Succeed())

By("expect for the ops is Creating when dependent ops is succeed")
Expect(testapps.ChangeObjStatus(&testCtx, ops1, func() {
ops1.Status.Phase = appsv1alpha1.OpsSucceedPhase
})).Should(Succeed())

_, err = GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
Expect(opsRes.OpsRequest.Status.Phase).Should(Equal(appsv1alpha1.OpsCreatingPhase))

By("expect for the ops is Cancelled when dependent ops is Failed")
Expect(testapps.ChangeObjStatus(&testCtx, ops1, func() {
ops1.Status.Phase = appsv1alpha1.OpsFailedPhase
})).Should(Succeed())

ops2.Annotations = map[string]string{constant.OpsDependentOnSuccessfulOpsAnnoKey: ops1.Name}
ops2.Status.Phase = appsv1alpha1.OpsPendingPhase
_, err = GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(ops2))).Should(Equal(appsv1alpha1.OpsCancelledPhase))
})
})
})
39 changes: 37 additions & 2 deletions controllers/apps/opsrequest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"math"
"reflect"
"strings"
"time"

"golang.org/x/exp/slices"
Expand Down Expand Up @@ -175,10 +176,12 @@ func (r *OpsRequestReconciler) handleOpsRequestByPhase(reqCtx intctrlutil.Reques
return r.reconcileStatusDuringRunningOrCanceling(reqCtx, opsRes)
case appsv1alpha1.OpsSucceedPhase:
return r.handleSucceedOpsRequest(reqCtx, opsRes.OpsRequest)
case appsv1alpha1.OpsFailedPhase, appsv1alpha1.OpsCancelledPhase:
default:
if err := r.annotateRelatedOps(reqCtx, opsRes.OpsRequest); err != nil {
return intctrlutil.ResultToP(intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, ""))
}
return intctrlutil.ResultToP(intctrlutil.Reconciled())
}
return intctrlutil.ResultToP(intctrlutil.Reconciled())
}

// handleCancelSignal handles the cancel signal for opsRequest.
Expand Down Expand Up @@ -218,6 +221,9 @@ func (r *OpsRequestReconciler) handleCancelSignal(reqCtx intctrlutil.RequestCtx,

// handleSucceedOpsRequest the opsRequest will be deleted after one hour when status.phase is Succeed
func (r *OpsRequestReconciler) handleSucceedOpsRequest(reqCtx intctrlutil.RequestCtx, opsRequest *appsv1alpha1.OpsRequest) (*ctrl.Result, error) {
if err := r.annotateRelatedOps(reqCtx, opsRequest); err != nil {
return intctrlutil.ResultToP(intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, ""))
}
if err := r.deleteExternalJobs(reqCtx.Ctx, opsRequest); err != nil {
return intctrlutil.ResultToP(intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, ""))
}
Expand Down Expand Up @@ -491,6 +497,35 @@ func (r *OpsRequestReconciler) deleteCreatedPodsInKBNamespace(reqCtx intctrlutil
return nil
}

// annotateRelatedOps annotates the related opsRequests to reconcile.
func (r *OpsRequestReconciler) annotateRelatedOps(reqCtx intctrlutil.RequestCtx, opsRequest *appsv1alpha1.OpsRequest) error {
relatedOpsStr := opsRequest.Annotations[constant.RelatedOpsAnnotationKey]
if relatedOpsStr == "" {
return nil
}
relatedOpsNames := strings.Split(relatedOpsStr, ",")
for _, opsName := range relatedOpsNames {
relatedOps := &appsv1alpha1.OpsRequest{}
if err := r.Client.Get(reqCtx.Ctx, client.ObjectKey{Name: opsName, Namespace: opsRequest.Namespace}, relatedOps); err != nil {
if apierrors.IsNotFound(err) {
continue
}
return err
}
if relatedOps.Annotations[constant.ReconcileAnnotationKey] == opsRequest.ResourceVersion {
continue
}
if relatedOps.Annotations == nil {
relatedOps.Annotations = map[string]string{}
}
relatedOps.Annotations[constant.ReconcileAnnotationKey] = opsRequest.ResourceVersion
if err := r.Client.Update(reqCtx.Ctx, relatedOps); err != nil && !apierrors.IsNotFound(err) {
return err
}
}
return nil
}

type opsRequestStep func(reqCtx intctrlutil.RequestCtx, opsRes *operations.OpsResource) (*ctrl.Result, error)

type opsControllerHandler struct {
Expand Down
9 changes: 9 additions & 0 deletions controllers/apps/opsrequest_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,12 @@ var _ = Describe("OpsRequest Controller", func() {

By("create second restart ops")
ops2 := createRestartOps(clusterObj.Name, 2)
Eventually(testapps.ChangeObj(&testCtx, ops2, func(request *appsv1alpha1.OpsRequest) {
if request.Annotations == nil {
request.Annotations = map[string]string{}
}
request.Annotations[constant.OpsDependentOnSuccessfulOpsAnnoKey] = ops1.Name
})).Should(Succeed())
Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(ops2))).Should(Equal(appsv1alpha1.OpsPendingPhase))

By("create third restart ops")
Expand All @@ -740,6 +746,9 @@ var _ = Describe("OpsRequest Controller", func() {
By("mock ops1 phase to Succeed")
mockCompRunning(replicas, true)
Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(ops1))).Should(Equal(appsv1alpha1.OpsSucceedPhase))
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(ops2), func(g Gomega, opsRequest *appsv1alpha1.OpsRequest) {
g.Expect(opsRequest.Annotations[constant.ReconcileAnnotationKey]).ShouldNot(BeEmpty())
})).Should(Succeed())

By("expect for next ops is Running")
Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(ops2))).Should(Equal(appsv1alpha1.OpsRunningPhase))
Expand Down
2 changes: 2 additions & 0 deletions pkg/constant/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
LastRoleSnapshotVersionAnnotationKey = "apps.kubeblocks.io/last-role-snapshot-version"
ComponentScaleInAnnotationKey = "apps.kubeblocks.io/component-scale-in" // ComponentScaleInAnnotationKey specifies whether the component is scaled in
DisableHAAnnotationKey = "kubeblocks.io/disable-ha"
OpsDependentOnSuccessfulOpsAnnoKey = "ops.kubeblocks.io/dependent-on-successful-ops" // OpsDependentOnSuccessfulOpsAnnoKey wait for the dependent ops to succeed before executing the current ops. If it fails, this ops will also fail.
RelatedOpsAnnotationKey = "ops.kubeblocks.io/related-ops"
)

// annotations for multi-cluster
Expand Down

0 comments on commit 8ed6732

Please sign in to comment.