Skip to content

Commit

Permalink
chore: support to run a opsRequest after the dependent opsRequests ar…
Browse files Browse the repository at this point in the history
…e succeed.
  • Loading branch information
wangyelei committed Jun 21, 2024
1 parent 40cf29d commit 1d6f2dc
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 2 deletions.
55 changes: 55 additions & 0 deletions controllers/apps/operations/ops_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package operations

import (
"slices"
"strings"
"sync"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

Expand Down Expand Up @@ -96,6 +100,14 @@ func (opsMgr *OpsManager) Do(reqCtx intctrlutil.RequestCtx, cli client.Client, o
return intctrlutil.ResultToP(intctrlutil.Reconciled())
}
}
// validate if the dependent ops have been successful
if pass, err := opsMgr.validateDependOnSuccessfulOps(reqCtx, cli, opsRes); intctrlutil.IsTargetError(err, intctrlutil.ErrorTypeFatal) {
return &ctrl.Result{}, patchValidateErrorCondition(reqCtx.Ctx, cli, opsRes, err.Error())
} else if err != nil {
return nil, err
} else if !pass {
return intctrlutil.ResultToP(intctrlutil.Reconciled())
}
opsDeepCopy := opsRequest.DeepCopy()
// save last configuration into status.lastConfiguration
if err = opsBehaviour.OpsHandler.SaveLastConfiguration(reqCtx, cli, opsRes); err != nil {
Expand Down Expand Up @@ -154,6 +166,49 @@ func (opsMgr *OpsManager) Reconcile(reqCtx intctrlutil.RequestCtx, cli client.Cl
}
}

// validateDependOnOps validates if the dependent ops have been successful
func (opsMgr *OpsManager) validateDependOnSuccessfulOps(reqCtx intctrlutil.RequestCtx,
cli client.Client,
opsRes *OpsResource) (bool, error) {
dependentOpsStr := opsRes.OpsRequest.Annotations[constant.OpsDependentOnSuccessfulOpsAnnoKey]
if dependentOpsStr == "" {
return true, nil
}
opsNames := strings.Split(dependentOpsStr, ",")
for _, opsName := range opsNames {
ops := &appsv1alpha1.OpsRequest{}
if err := cli.Get(reqCtx.Ctx, client.ObjectKey{Name: opsName, Namespace: opsRes.OpsRequest.Namespace}, ops); err != nil {
if apierrors.IsNotFound(err) {
return false, intctrlutil.NewFatalError(err.Error())
}
return false, err
}
var relatedOpsArr []string
relatedOpsStr := ops.Annotations[constant.RelatedOpsAnnotationKey]
if relatedOpsStr != "" {
relatedOpsArr = strings.Split(relatedOpsStr, ",")
}
if !slices.Contains(relatedOpsArr, opsRes.OpsRequest.Name) {
// annotate to the dependent opsRequest
relatedOpsArr = append(relatedOpsArr, opsRes.OpsRequest.Name)
if ops.Annotations == nil {
ops.Annotations = map[string]string{}
}
ops.Annotations[constant.RelatedOpsAnnotationKey] = strings.Join(relatedOpsArr, ",")
if err := cli.Update(reqCtx.Ctx, ops); err != nil {
return false, err
}
}
if slices.Contains([]appsv1alpha1.OpsPhase{appsv1alpha1.OpsFailedPhase, appsv1alpha1.OpsCancelledPhase}, ops.Status.Phase) {
return false, PatchOpsStatus(reqCtx.Ctx, cli, opsRes, appsv1alpha1.OpsCancelledPhase)
}
if ops.Status.Phase != appsv1alpha1.OpsSucceedPhase {
return false, nil
}
}
return true, nil
}

func (opsMgr *OpsManager) handleOpsCompleted(reqCtx intctrlutil.RequestCtx,
cli client.Client,
opsRes *OpsResource,
Expand Down
39 changes: 37 additions & 2 deletions controllers/apps/opsrequest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"math"
"reflect"
"strings"
"time"

"golang.org/x/exp/slices"
Expand Down Expand Up @@ -172,10 +173,12 @@ func (r *OpsRequestReconciler) handleOpsRequestByPhase(reqCtx intctrlutil.Reques
return r.reconcileStatusDuringRunningOrCanceling(reqCtx, opsRes)
case appsv1alpha1.OpsSucceedPhase:
return r.handleSucceedOpsRequest(reqCtx, opsRes.OpsRequest)
case appsv1alpha1.OpsFailedPhase, appsv1alpha1.OpsCancelledPhase:
default:
if err := r.annotateRelatedOps(reqCtx, opsRes.OpsRequest); err != nil {
return intctrlutil.ResultToP(intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, ""))
}
return intctrlutil.ResultToP(intctrlutil.Reconciled())
}
return intctrlutil.ResultToP(intctrlutil.Reconciled())
}

// handleCancelSignal handles the cancel signal for opsRequest.
Expand Down Expand Up @@ -208,6 +211,9 @@ func (r *OpsRequestReconciler) handleCancelSignal(reqCtx intctrlutil.RequestCtx,

// handleSucceedOpsRequest the opsRequest will be deleted after one hour when status.phase is Succeed
func (r *OpsRequestReconciler) handleSucceedOpsRequest(reqCtx intctrlutil.RequestCtx, opsRequest *appsv1alpha1.OpsRequest) (*ctrl.Result, error) {
if err := r.annotateRelatedOps(reqCtx, opsRequest); err != nil {
return intctrlutil.ResultToP(intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, ""))
}
if err := r.deleteExternalJobs(reqCtx.Ctx, opsRequest); err != nil {
return intctrlutil.ResultToP(intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, ""))
}
Expand Down Expand Up @@ -413,6 +419,35 @@ func (r *OpsRequestReconciler) deleteExternalJobs(ctx context.Context, ops *apps
return nil
}

// annotateRelatedOps annotates the related opsRequests to reconcile.
func (r *OpsRequestReconciler) annotateRelatedOps(reqCtx intctrlutil.RequestCtx, opsRequest *appsv1alpha1.OpsRequest) error {
relatedOpsStr := opsRequest.Annotations[constant.RelatedOpsAnnotationKey]
if relatedOpsStr == "" {
return nil
}
relatedOpsNames := strings.Split(relatedOpsStr, ",")
for _, opsName := range relatedOpsNames {
relatedOps := &appsv1alpha1.OpsRequest{}
if err := r.Client.Get(reqCtx.Ctx, client.ObjectKey{Name: opsName, Namespace: opsRequest.Namespace}, relatedOps); err != nil {
if apierrors.IsNotFound(err) {
continue
}
return err
}
if relatedOps.Annotations[constant.ReconcileAnnotationKey] == opsRequest.ResourceVersion {
continue
}
if relatedOps.Annotations == nil {
relatedOps.Annotations = map[string]string{}
}
relatedOps.Annotations[constant.ReconcileAnnotationKey] = opsRequest.ResourceVersion
if err := r.Client.Update(reqCtx.Ctx, relatedOps); err != nil && !apierrors.IsNotFound(err) {
return err
}
}
return nil
}

func (r *OpsRequestReconciler) parseBackupOpsRequest(ctx context.Context, object client.Object) []reconcile.Request {
backup := object.(*dpv1alpha1.Backup)
var (
Expand Down
2 changes: 2 additions & 0 deletions pkg/constant/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ const (
StatefulSetPodNameLabelKey = "statefulset.kubernetes.io/pod-name"

// kubeblocks.io annotations
OpsDependentOnSuccessfulOpsAnnoKey = "ops.kubeblocks.io/dependent-on-successful-ops" // OpsDependentOnSuccessfulOpsAnnoKey wait for the dependent ops to succeed before executing the current ops. If it fails, this ops will also fail.
RelatedOpsAnnotationKey = "ops.kubeblocks.io/related-ops"
ClusterSnapshotAnnotationKey = "kubeblocks.io/cluster-snapshot" // ClusterSnapshotAnnotationKey saves the snapshot of cluster.
DefaultClusterVersionAnnotationKey = "kubeblocks.io/is-default-cluster-version" // DefaultClusterVersionAnnotationKey specifies the default cluster version.
OpsRequestAnnotationKey = "kubeblocks.io/ops-request" // OpsRequestAnnotationKey OpsRequest annotation key in Cluster
Expand Down

0 comments on commit 1d6f2dc

Please sign in to comment.