Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: reconcile thinruntime failed when dataset is deleted #3300

Merged
merged 13 commits into from
Jun 29, 2023
16 changes: 8 additions & 8 deletions pkg/controllers/runtime_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,17 @@ func (r *RuntimeReconciler) ReconcileInternal(ctx cruntime.ReconcileRequestConte
return utils.RequeueIfError(errors.Wrap(err, "Failed to create"))
}

// 2.Get or create the engine
engine, err := r.implement.GetOrCreateEngine(ctx)
// 2.Get the ObjectMeta of runtime
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add comments about the reason of changing the order of step 2 and 3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the order was used to judge GetOrCreateEngine failed reason which is runtime having deletionTimeStamp. In this case we should ignore the GetOrCreateEngine error and continue to reconcileruntimeDeletion, but it would cause engine is a nil pointer. And we have resolvd this problem inside GetOrCreateEngine so that it would return an engine although it could not get mounted dataset. So the order is no need to change now and i will fix it.

objectMeta, err := r.implement.GetRuntimeObjectMeta(ctx)
if err != nil {
r.Recorder.Eventf(runtime, corev1.EventTypeWarning, common.ErrorProcessRuntimeReason, "Process Runtime error %v", err)
return utils.RequeueIfError(errors.Wrap(err, "Failed to create"))
return utils.RequeueIfError(err)
}

// 3.Get the ObjectMeta of runtime
objectMeta, err := r.implement.GetRuntimeObjectMeta(ctx)
// 3.Get or create the engine
engine, err := r.implement.GetOrCreateEngine(ctx)
if err != nil {
return utils.RequeueIfError(err)
r.Recorder.Eventf(runtime, corev1.EventTypeWarning, common.ErrorProcessRuntimeReason, "Process Runtime error %v", err)
return utils.RequeueIfError(errors.Wrap(err, "Failed to create"))
}

// 4.Get the dataset
Expand Down Expand Up @@ -347,7 +347,7 @@ func (r *RuntimeReconciler) GetDataset(ctx cruntime.ReconcileRequestContext) (*d
}

func (r *RuntimeReconciler) CheckIfReferenceDatasetIsSupported(ctx cruntime.ReconcileRequestContext) (bool, string) {
mounted := base.GetMountedDatasetNamespacedName(ctx.Dataset)
mounted := base.GetMountedDatasetNamespacedName(ctx.Dataset.Spec.Mounts)

if len(mounted) > 0 && ctx.RuntimeType != common.ThinRuntime {
return false, "dataset mounting another dataset can only use thin runtime"
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddc/base/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func GetDatasetRefName(name, namespace string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

func GetMountedDatasetNamespacedName(virtualDataset *datav1alpha1.Dataset) []types.NamespacedName {
func GetMountedDatasetNamespacedName(mounts []datav1alpha1.Mount) []types.NamespacedName {
// virtual dataset can only mount dataset
var physicalNameSpacedName []types.NamespacedName
for _, mount := range virtualDataset.Spec.Mounts {
for _, mount := range mounts {
if common.IsFluidRefSchema(mount.MountPoint) {
datasetPath := strings.TrimPrefix(mount.MountPoint, string(common.RefSchema))
namespaceAndName := strings.Split(datasetPath, "/")
Expand Down Expand Up @@ -62,7 +62,7 @@ func GetMountedDatasetSubPath(virtualDataset *datav1alpha1.Dataset) []string {
}

func CheckReferenceDataset(dataset *datav1alpha1.Dataset) (check bool, err error) {
mounts := len(GetMountedDatasetNamespacedName(dataset))
mounts := len(GetMountedDatasetNamespacedName(dataset.Spec.Mounts))
totalMounts := len(dataset.Spec.Mounts)
switch {
case mounts == 1:
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddc/base/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestGetMountedDatasetNamespacedName(t *testing.T) {
},
}
for _, tt := range tests {
if got := GetMountedDatasetNamespacedName(tt.virtualDataset); len(got) != tt.want {
if got := GetMountedDatasetNamespacedName(tt.virtualDataset.Spec.Mounts); len(got) != tt.want {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about renaming the function name to GetPhysicalDatasetFromMounts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

t.Errorf("GetMountedDatasetNamespacedName() len = %v, want %v", got, tt.want)
}
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/ddc/thin/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,21 @@ func Precheck(client client.Client, key types.NamespacedName) (found bool, err e
func CheckReferenceDatasetRuntime(client client.Client, runtime *datav1alpha1.ThinRuntime) (bool, error) {
dataset, err := utils.GetDataset(client, runtime.Name, runtime.Namespace)
if err != nil {
return false, err
if utils.IgnoreNotFound(err) == nil && runtime.Status.Mounts != nil && len(runtime.Status.Mounts) != 0 {
Copy link
Collaborator

@cheyang cheyang Jun 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make it work even the virtualDataset is already deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, the virtualDataset would not be deleted because its reference runtime has not been cleaned up.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it may happen when deleting virtualDataset forcely. How to handle this then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

// dataset not found, but can GetMountedDatasetNamespacedName from runtime
} else {
return false, err
}
}

mounted := base.GetMountedDatasetNamespacedName(dataset)
var mounted []types.NamespacedName
if dataset != nil {
// getMountedDataset from dataset first
mounted = base.GetMountedDatasetNamespacedName(dataset.Spec.Mounts)
} else if runtime.Status.Mounts != nil && len(runtime.Status.Mounts) != 0 {
// then try to getMountedDataset from runtime
mounted = base.GetMountedDatasetNamespacedName(runtime.Status.Mounts)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if dataset is not found and the length of runtime mounts is 0? How will the user handle this situation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case will be protected by checking existence of reference datasets before removing any physical dataset. This can be done in the next PR.

// not mount other datasets
if len(mounted) == 0 {
return false, nil
Expand Down
36 changes: 23 additions & 13 deletions pkg/ddc/thin/referencedataset/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package referencedataset
import (
"context"
"fmt"
"time"

"github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/dataoperation"
"github.com/fluid-cloudnative/fluid/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
ctrl "sigs.k8s.io/controller-runtime"
"time"

"github.com/go-logr/logr"
"k8s.io/client-go/util/retry"
Expand Down Expand Up @@ -131,7 +132,7 @@ func (e *ReferenceDatasetEngine) Setup(ctx cruntime.ReconcileRequestContext) (re
// 1. get the physical datasets according the virtual dataset
dataset := ctx.Dataset

physicalNameSpacedNames := base.GetMountedDatasetNamespacedName(dataset)
physicalNameSpacedNames := base.GetMountedDatasetNamespacedName(dataset.Spec.Mounts)
if len(physicalNameSpacedNames) != 1 {
return false, fmt.Errorf("ThinEngine can only handle dataset only mounting one dataset")
}
Expand Down Expand Up @@ -193,29 +194,38 @@ func (e *ReferenceDatasetEngine) Shutdown() (err error) {
if err != nil {
return err
}
mountedDataset, err := utils.GetDataset(e.Client, mountedRuntimeInfo.GetName(), mountedRuntimeInfo.GetNamespace())
if err != nil {
return err
}

if utils.ContainsString(mountedDataset.Status.DatasetRef, datasetRefName) {
newDataset := mountedDataset.DeepCopy()
newDataset.Status.DatasetRef = utils.RemoveString(newDataset.Status.DatasetRef, datasetRefName)
err := e.Client.Status().Update(context.TODO(), newDataset)
if mountedRuntimeInfo != nil {
mountedDataset, err := utils.GetDataset(e.Client, mountedRuntimeInfo.GetName(), mountedRuntimeInfo.GetNamespace())
if err != nil {
return err
}

if utils.ContainsString(mountedDataset.Status.DatasetRef, datasetRefName) {
newDataset := mountedDataset.DeepCopy()
newDataset.Status.DatasetRef = utils.RemoveString(newDataset.Status.DatasetRef, datasetRefName)
err := e.Client.Status().Update(context.TODO(), newDataset)
if err != nil {
return err
}
}
}
return
}

func (e *ReferenceDatasetEngine) checkDatasetMountSupport() error {
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
if err != nil {
return err
// not found dataset error indicates the runtime is deleting, pass checkDatasetMountSupport
if utils.IgnoreNotFound(err) == nil {
e.Log.Info("The dataset is not found, pass checkDatasetMountSupport because runtime is deleting")
return nil
} else {
return err
}
}

mountedNamespacedName := base.GetMountedDatasetNamespacedName(dataset)
mountedNamespacedName := base.GetMountedDatasetNamespacedName(dataset.Spec.Mounts)
mountedSize := len(mountedNamespacedName)

// currently only support dataset mounting only one dataset
Expand All @@ -234,7 +244,7 @@ func (e *ReferenceDatasetEngine) checkDatasetMountSupport() error {
}

// currently not support mounted dataset mounting another dataset
if len(base.GetMountedDatasetNamespacedName(mountedDataset)) != 0 {
if len(base.GetMountedDatasetNamespacedName(mountedDataset.Spec.Mounts)) != 0 {
return fmt.Errorf("ThinRuntime with no profile name can only handle dataset only mounting one dataset")
}

Expand Down
28 changes: 26 additions & 2 deletions pkg/ddc/thin/referencedataset/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (e *ReferenceDatasetEngine) getMountedDatasetRuntimeStatus() (status *datav
return status, err
}

// if mountedRuntimeInfo is nil and no err, the runtime is deleting.
if mountedRuntimeInfo == nil {
return nil, nil
}

return base.GetRuntimeStatus(e.Client, mountedRuntimeInfo.GetRuntimeType(),
mountedRuntimeInfo.GetName(), mountedRuntimeInfo.GetNamespace())
}
Expand Down Expand Up @@ -91,20 +96,39 @@ func (e *ReferenceDatasetEngine) getRuntimeInfo() (base.RuntimeInfoInterface, er
e.Log.Info("Setup with dataset done", "exclusive", e.runtimeInfo.IsExclusive())

return e.runtimeInfo, nil

}

// getMountedRuntimeInfo get mountedRuntimeInfo from dataset.
// If could not get dataset, getMountedRuntimeInfo try to get mountedRuntimeInfo from runtime status.
// And if dataset is deleted and no status.mounts, it returns nil, nil to continue runtimeDeletion.
func (e *ReferenceDatasetEngine) getMountedRuntimeInfo() (base.RuntimeInfoInterface, error) {
if e.mountedRuntimeInfo != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment: // If already have mountedRuntimeInfo, return it directly

return e.mountedRuntimeInfo, nil
}

dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
// if err is not found, try to get mountedRuntimeInfo from runtime status, don't return here.
if err != nil && utils.IgnoreNotFound(err) != nil {
return e.mountedRuntimeInfo, err
}

runtime, err := e.getRuntime()
if err != nil {
return e.mountedRuntimeInfo, err
}

mountedNameSpacedNames := base.GetMountedDatasetNamespacedName(dataset)
var mountedNameSpacedNames []types.NamespacedName
if dataset != nil {
// get mountedRuntimeInfo from dataset first
mountedNameSpacedNames = base.GetMountedDatasetNamespacedName(dataset.Spec.Mounts)
} else if runtime.Status.Mounts != nil && len(runtime.Status.Mounts) != 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check runtime.Status.Mounts != nil here because len(nil) == 0. We can remove runtime.Status.Mounts != nil to avoid code redundancy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// then try to get mountedRuntimeInfo from runtime status
mountedNameSpacedNames = base.GetMountedDatasetNamespacedName(runtime.Status.Mounts)
} else {
// The dataset is not found and no status.mounts, in this case, the runtime is deleting, return nil, nil
e.Log.Info("The dataset is deleted and no runtime.Status.Mounts, in this case, the runtime is deleting, return nil, nil")
return nil, nil
}
if len(mountedNameSpacedNames) != 1 {
return e.mountedRuntimeInfo, fmt.Errorf("ThinEngine with no profile name can only handle dataset only mounting one dataset")
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/ddc/thin/referencedataset/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,13 @@ func (e *ReferenceDatasetEngine) Sync(ctx cruntime.ReconcileRequestContext) (err
if err != nil {
return err
}
// status copy, include cacheStates, conditions, selector, valueFile, current*, desired*, fuse*, master*, worker* ...
// TODO: Are there some fields should not copy?
runtimeToUpdate.Status = *mountedRuntimeStatus.DeepCopy()
if mountedRuntimeStatus != nil {
// status copy, include cacheStates, conditions, selector, valueFile, current*, desired*, fuse*, master*, worker* ...
// TODO: Are there some fields should not copy?
runtimeToUpdate.Status = *mountedRuntimeStatus.DeepCopy()
}
// update status.mounts to dataset mounts
runtimeToUpdate.Status.Mounts = virtualDatasetToUpdate.Spec.Mounts

if !reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) {
err = e.Client.Status().Update(context.TODO(), runtimeToUpdate)
Expand Down
Loading