Skip to content

Commit

Permalink
operator: Decommission cluster on default CR deletion
Browse files Browse the repository at this point in the history
default deletion of CR used to shutdown cluster, but now we are changing the default behavior to decommission as we have introduced a new field for shutdown.

Signed-off-by: Abhishek Gaikwad <gaikwadabhishek1997@gmail.com>
  • Loading branch information
gaikwadabhishek committed Apr 22, 2024
1 parent 55c6279 commit e388c27
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 76 deletions.
8 changes: 1 addition & 7 deletions operator/api/v1beta1/aistore_types.go
Expand Up @@ -76,15 +76,9 @@ type AIStoreSpec struct {
// +optional
ShutdownCluster *bool `json:"shutdownCluster,omitempty"`

// DecommissionCluster indicates whether the cluster should be decommissioned upon deletion of the CR.
// When enabled, this process removes all AIS daemons and deletes metadata from the configuration directories.
// Note: Decommissioning is irreversible, and it may result in the permanent loss of the cluster and all user data**.
// +optional
DecommissionCluster *bool `json:"decommissionCluster,omitempty"`

// CleanupData determines whether to clean up PVCs and user data (including buckets and objects) when the cluster is decommissioned.
// The reclamation of PVs linked to the PVCs depends on the PV reclaim policy or the default policy of the associated StorageClass.
// This field is relevant only if DecommissionCluster is enabled.
// This field is relevant only if you are deleting the CR (leading to decommissioning of the cluster).
// +optional
CleanupData *bool `json:"cleanupData,omitempty"`

Expand Down
5 changes: 0 additions & 5 deletions operator/api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions operator/config/crd/bases/ais.nvidia.com_aistores.yaml
Expand Up @@ -46,7 +46,7 @@ spec:
description: |-
CleanupData determines whether to clean up PVCs and user data (including buckets and objects) when the cluster is decommissioned.
The reclamation of PVs linked to the PVCs depends on the PV reclaim policy or the default policy of the associated StorageClass.
This field is relevant only if DecommissionCluster is enabled.
This field is relevant only if you are deleting the CR (leading to decommissioning of the cluster).
type: boolean
clusterDomain:
description: 'Defines the cluster domain name for DNS. Default: cluster.local.'
Expand Down Expand Up @@ -422,12 +422,6 @@ spec:
type: string
type: object
type: object
decommissionCluster:
description: |-
DecommissionCluster indicates whether the cluster should be decommissioned upon deletion of the CR.
When enabled, this process removes all AIS daemons and deletes metadata from the configuration directories.
Note: Decommissioning is irreversible, and it may result in the permanent loss of the cluster and all user data**.
type: boolean
disablePodAntiAffinity:
description: DisablePodAntiAffinity, if set allows more than one target/proxy
daemon pods to be scheduled on same K8s node.
Expand Down
76 changes: 39 additions & 37 deletions operator/pkg/controllers/cluster_controller.go
Expand Up @@ -116,17 +116,36 @@ func (r *AIStoreReconciler) initializeCR(ctx context.Context, ais *aisv1.AIStore
aisv1.AIStoreStatus{State: aisv1.ConditionInitialized})
return reconcile.Result{Requeue: retry}, err
}

func (r *AIStoreReconciler) shutdownCluster(ctx context.Context, ais *aisv1.AIStore) (reconcile.Result, error) {
r.log.Info("Shutting down AIS cluster", "name", ais.Name)
r.attemptGracefulShutdown(ctx, ais)
r.log.Info("Scaling statefulsets to size 0", "name", ais.Name)
err := r.scaleProxiesToZero(ctx, ais)
var (
params *aisapi.BaseParams
err error
)

r.log.Info("Starting shutdown of AIS cluster", "clusterName", ais.Name)
if r.isExternal {
params, err = r.getAPIParams(ctx, ais)
} else {
params, err = r.primaryBaseParams(ctx, ais)
}
if err != nil {
r.log.Error(err, "Failed to get API parameters", "clusterName", ais.Name)
return reconcile.Result{}, err
}

r.log.Info("Attempting graceful shutdown", "clusterName", ais.Name)
r.attemptGracefulShutdown(params)

if err = r.scaleProxiesToZero(ctx, ais); err != nil {
return reconcile.Result{}, err
}
err = r.scaleTargetsToZero(ctx, ais)
return reconcile.Result{}, err

if err = r.scaleTargetsToZero(ctx, ais); err != nil {
return reconcile.Result{}, err
}

r.log.Info("AIS cluster shutdown completed", "clusterName", ais.Name)
return reconcile.Result{}, nil
}

func (r *AIStoreReconciler) handleCRDeletion(ctx context.Context, ais *aisv1.AIStore) (reconcile.Result, error) {
Expand All @@ -152,13 +171,10 @@ func (r *AIStoreReconciler) handleCRDeletion(ctx context.Context, ais *aisv1.AIS
}

func (r *AIStoreReconciler) cleanup(ctx context.Context, ais *aisv1.AIStore) (updated bool, err error) {
decommissionCluster := ais.Spec.DecommissionCluster != nil && *ais.Spec.DecommissionCluster
var nodeNames map[string]bool
if decommissionCluster {
nodeNames, err = r.client.ListNodesRunningAIS(ctx, ais)
if err != nil {
r.log.Error(err, "Failed to list nodes running AIS")
}
nodeNames, err = r.client.ListNodesRunningAIS(ctx, ais)
if err != nil {
r.log.Error(err, "Failed to list nodes running AIS")
}

updated, err = cmn.AnyFunc(
Expand All @@ -168,7 +184,7 @@ func (r *AIStoreReconciler) cleanup(ctx context.Context, ais *aisv1.AIStore) (up
func() (bool, error) { return r.cleanupRBAC(ctx, ais) },
func() (bool, error) { return r.cleanupPVC(ctx, ais) },
)
if updated && decommissionCluster {
if updated {
err := r.runManualCleanupJob(ctx, ais, nodeNames)
if err != nil {
r.log.Error(err, "Failed to run manual cleanup job")
Expand All @@ -189,36 +205,22 @@ func (r *AIStoreReconciler) runManualCleanupJob(ctx context.Context, ais *aisv1.
return nil
}

func (r *AIStoreReconciler) attemptGracefulShutdown(ctx context.Context, ais *aisv1.AIStore) {
var (
params *aisapi.BaseParams
err error
)
if r.isExternal {
params, err = r.getAPIParams(ctx, ais)
} else {
params, err = r.primaryBaseParams(ctx, ais)
}
if err != nil {
r.log.Error(err, "failed to create BaseAPIParams")
return
}
if ais.Spec.DecommissionCluster != nil && *ais.Spec.DecommissionCluster {
cleanupData := ais.Spec.CleanupData != nil && *ais.Spec.CleanupData
r.log.Info("Attempting graceful decommission of cluster")
if err = aisapi.DecommissionCluster(*params, cleanupData); err != nil {
r.log.Error(err, "Failed to gracefully decommission cluster")
}
return
func (r *AIStoreReconciler) attemptGracefulDecommission(params *aisapi.BaseParams, cleanupData bool) {
r.log.Info("Attempting graceful decommission of cluster")
if err := aisapi.DecommissionCluster(*params, cleanupData); err != nil {
r.log.Error(err, "Failed to gracefully decommission cluster")
}
}

func (r *AIStoreReconciler) attemptGracefulShutdown(params *aisapi.BaseParams) {
r.log.Info("Attempting graceful shutdown of cluster")
if err = aisapi.ShutdownCluster(*params); err != nil {
if err := aisapi.ShutdownCluster(*params); err != nil {
r.log.Error(err, "Failed to gracefully shutdown cluster")
}
}

func (r *AIStoreReconciler) cleanupPVC(ctx context.Context, ais *aisv1.AIStore) (anyUpdated bool, err error) {
if ais.Spec.DecommissionCluster != nil && *ais.Spec.DecommissionCluster && ais.Spec.CleanupData != nil && *ais.Spec.CleanupData {
if ais.Spec.CleanupData != nil && *ais.Spec.CleanupData {
r.log.Info("Cleaning up PVCs")
return r.client.DeleteAllPVCsIfExist(ctx, ais.Namespace, target.PodLabels(ais))
}
Expand Down
3 changes: 2 additions & 1 deletion operator/pkg/controllers/proxy_controller.go
Expand Up @@ -302,9 +302,10 @@ func (r *AIStoreReconciler) handleProxyScaledown(ctx context.Context, ais *aisv1

// Scale down the statefulset without decommissioning or resetting primary
func (r *AIStoreReconciler) scaleProxiesToZero(ctx context.Context, ais *aisv1.AIStore) error {
r.log.Info("Scaling proxies to zero", "clusterName", ais.Name)
changed, err := r.client.UpdateStatefulSetReplicas(ctx, proxy.StatefulSetNSName(ais), 0)
if err != nil {
r.log.Error(err, "Failed to update proxy StatefulSet replicas")
r.log.Error(err, "Failed to scale proxies to zero", "clusterName", ais.Name)
} else if changed {
r.log.Info("Proxy StatefulSet set to size 0", "name", ais.Name)
} else {
Expand Down
23 changes: 19 additions & 4 deletions operator/pkg/controllers/target_controllers.go
Expand Up @@ -66,16 +66,30 @@ func (r *AIStoreReconciler) cleanupTarget(ctx context.Context, ais *aisv1.AIStor
}

func (r *AIStoreReconciler) cleanupTargetSS(ctx context.Context, ais *aisv1.AIStore) (anyUpdated bool, err error) {
var baseParams *aisapi.BaseParams

// If the target statefulset it not present, we can return immediately.
r.log.Info("Cleaning up target statefulset")
targetSS := target.StatefulSetNSName(ais)
if exists, err := r.client.StatefulSetExists(ctx, targetSS); err != nil || !exists {
return false, err
}

// If we reach here implies, we didn't attempt to shutdown the cluster yet.
// Attempt graceful cluster shutdown followed by deleting target statefulset.
r.attemptGracefulShutdown(ctx, ais)
if r.isExternal {
baseParams, err = r.getAPIParams(ctx, ais)
} else {
baseParams, err = r.primaryBaseParams(ctx, ais)
}
if err != nil {
r.log.Error(err, "Failed to get API parameters", "clusterName", ais.Name)
return false, err
}

// If we reach here implies, we didn't attempt to decommission the cluster yet.
// Attempt graceful cluster decommission followed by deleting target statefulset.
cleanupData := ais.Spec.CleanupData != nil && *ais.Spec.CleanupData
r.attemptGracefulDecommission(baseParams, cleanupData)

// TODO: if the environment is slow the statefulset controller might create new pods to compensate for the old ones being
// deleted in the shutdown/decomission operation. Find a way to stop the statefulset controller from creating new pods
return r.client.DeleteStatefulSetIfExists(ctx, targetSS)
Expand Down Expand Up @@ -143,9 +157,10 @@ func (r *AIStoreReconciler) handleTargetScaleDown(ctx context.Context, ais *aisv

// Scale down the statefulset without decommissioning
func (r *AIStoreReconciler) scaleTargetsToZero(ctx context.Context, ais *aisv1.AIStore) error {
r.log.Info("Scaling targets to zero", "clusterName", ais.Name)
changed, err := r.client.UpdateStatefulSetReplicas(ctx, target.StatefulSetNSName(ais), 0)
if err != nil {
r.log.Error(err, "Failed to update target StatefulSet replicas")
r.log.Error(err, "Failed to scale targets to zero", "clusterName", ais.Name)
} else if changed {
r.log.Info("Target StatefulSet set to size 0", "name", ais.Name)
} else {
Expand Down
17 changes: 8 additions & 9 deletions operator/tests/integration/cluster_test.go
Expand Up @@ -356,16 +356,15 @@ var _ = Describe("Run Controller", func() {
cc.cleanup(pvs)
})

It("Re-deploying with CleanupData AND DecommissionCluster should wipe out all data", func() {
// Define CleanupData and DecommissionCluster to wipe when we destroy the cluster
It("Re-deploying with CleanupData should wipe out all data", func() {
// Define CleanupData to wipe when we destroy the cluster
cluArgs := tutils.ClusterSpecArgs{
Name: clusterName(),
Namespace: testNSName,
StorageClass: storageClass,
Size: 1,
EnableExternalLB: testAsExternalClient,
DecommissionCluster: true,
CleanupData: true,
Name: clusterName(),
Namespace: testNSName,
StorageClass: storageClass,
Size: 1,
EnableExternalLB: testAsExternalClient,
CleanupData: true,
}
cc, pvs := newClientCluster(cluArgs)
cc.create()
Expand Down
2 changes: 0 additions & 2 deletions operator/tests/tutils/ais_cluster.go
Expand Up @@ -34,7 +34,6 @@ type (
DisableAntiAffinity bool
EnableExternalLB bool
ShutdownCluster bool
DecommissionCluster bool
CleanupData bool
// Create a cluster with more PVs than targets for future scaling
MaxPVs int32
Expand Down Expand Up @@ -112,7 +111,6 @@ func newAISClusterCR(args ClusterSpecArgs, mounts []aisv1.Mount) *aisv1.AIStore
spec := aisv1.AIStoreSpec{
Size: args.Size,
ShutdownCluster: apc.Ptr(args.ShutdownCluster),
DecommissionCluster: apc.Ptr(args.DecommissionCluster),
CleanupData: apc.Ptr(args.CleanupData),
NodeImage: aisNodeImage,
InitImage: aisInitImage,
Expand Down
@@ -1,8 +1,4 @@
---
- name: Decommission cluster instead of shutdown
shell: kubectl patch aistores.ais.nvidia.com ais -n {{ cluster }} --type=merge -p '{"spec":{"decommissionCluster":true}}'
ignore_errors: true

- name: Delete user data if `cleanup_data` is true
shell: kubectl patch aistores.ais.nvidia.com ais -n {{ cluster }} --type=merge -p '{"spec":{"cleanupData":true}}'
ignore_errors: true
Expand Down

0 comments on commit e388c27

Please sign in to comment.