Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: reconcile thinruntime failed when dataset is deleted #3300

Merged
merged 13 commits into from
Jun 29, 2023
2 changes: 1 addition & 1 deletion pkg/controllers/runtime_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (r *RuntimeReconciler) GetDataset(ctx cruntime.ReconcileRequestContext) (*d
}

func (r *RuntimeReconciler) CheckIfReferenceDatasetIsSupported(ctx cruntime.ReconcileRequestContext) (bool, string) {
mounted := base.GetMountedDatasetNamespacedName(ctx.Dataset)
mounted := base.GetPhysicalDatasetFromMounts(ctx.Dataset.Spec.Mounts)

if len(mounted) > 0 && ctx.RuntimeType != common.ThinRuntime {
return false, "dataset mounting another dataset can only use thin runtime"
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddc/base/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func GetDatasetRefName(name, namespace string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

func GetMountedDatasetNamespacedName(virtualDataset *datav1alpha1.Dataset) []types.NamespacedName {
func GetPhysicalDatasetFromMounts(mounts []datav1alpha1.Mount) []types.NamespacedName {
// virtual dataset can only mount dataset
var physicalNameSpacedName []types.NamespacedName
for _, mount := range virtualDataset.Spec.Mounts {
for _, mount := range mounts {
if common.IsFluidRefSchema(mount.MountPoint) {
datasetPath := strings.TrimPrefix(mount.MountPoint, string(common.RefSchema))
namespaceAndName := strings.Split(datasetPath, "/")
Expand Down Expand Up @@ -62,7 +62,7 @@ func GetMountedDatasetSubPath(virtualDataset *datav1alpha1.Dataset) []string {
}

func CheckReferenceDataset(dataset *datav1alpha1.Dataset) (check bool, err error) {
mounts := len(GetMountedDatasetNamespacedName(dataset))
mounts := len(GetPhysicalDatasetFromMounts(dataset.Spec.Mounts))
totalMounts := len(dataset.Spec.Mounts)
switch {
case mounts == 1:
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddc/base/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestGetMountedDatasetNamespacedName(t *testing.T) {
func TestGetPhysicalDatasetFromMounts(t *testing.T) {
tests := []struct {
virtualDataset *datav1alpha1.Dataset
want int
Expand Down Expand Up @@ -79,8 +79,8 @@ func TestGetMountedDatasetNamespacedName(t *testing.T) {
},
}
for _, tt := range tests {
if got := GetMountedDatasetNamespacedName(tt.virtualDataset); len(got) != tt.want {
t.Errorf("GetMountedDatasetNamespacedName() len = %v, want %v", got, tt.want)
if got := GetPhysicalDatasetFromMounts(tt.virtualDataset.Spec.Mounts); len(got) != tt.want {
t.Errorf("GetPhysicalDatasetFromMounts() len = %v, want %v", got, tt.want)
}
}
}
Expand Down
20 changes: 15 additions & 5 deletions pkg/ddc/thin/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Build(id string, ctx cruntime.ReconcileRequestContext) (base.Engine, error)
return nil, fmt.Errorf("engine %s is failed due to type conversion", ctx.Name)
}

isRef, err := CheckReferenceDatasetRuntime(ctx.Client, runtime)
isRef, err := CheckReferenceDatasetRuntime(ctx, runtime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in future we can simply check len(runtime.profileName) == 0 to indicate whether it is a VirtualRuntime or ThinRuntime instead of checking all the dataset mounts.

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,13 +114,23 @@ func Precheck(client client.Client, key types.NamespacedName) (found bool, err e
}

// CheckReferenceDatasetRuntime judge if this runtime is used for handling dataset mounting another dataset.
func CheckReferenceDatasetRuntime(client client.Client, runtime *datav1alpha1.ThinRuntime) (bool, error) {
dataset, err := utils.GetDataset(client, runtime.Name, runtime.Namespace)
if err != nil {
func CheckReferenceDatasetRuntime(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.ThinRuntime) (bool, error) {
dataset, err := utils.GetDataset(ctx.Client, runtime.Name, runtime.Namespace)
if err != nil && utils.IgnoreNotFound(err) != nil {
// ignore dataset not found err and try to get mounted dataset from runtime
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment should be added below? This is the case which does not ignore error.

return false, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not return every error here because in cases where len(runtime.Status.Mounts) == 0 && Dataset not found, the func will return error to keep engine building failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, CheckReferenceDatasetRuntime can't judge whether this dataset is a reference dataset, so we raise the error now. And this case will be repaired by next PR.

}

mounted := base.GetMountedDatasetNamespacedName(dataset)
var mounted []types.NamespacedName
if dataset != nil {
// getMountedDataset from dataset first
ctx.Log.V(1).Info("Get physical dataset from virtual dataset mounts")
mounted = base.GetPhysicalDatasetFromMounts(dataset.Spec.Mounts)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding more logging info for debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

} else if runtime.Status.Mounts != nil && len(runtime.Status.Mounts) != 0 {
// then try to getMountedDataset from runtime
ctx.Log.V(1).Info("Virtual dataset not found, try to get physical dataset from runtime mounts")
mounted = base.GetPhysicalDatasetFromMounts(runtime.Status.Mounts)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if dataset is not found and the length of runtime mounts is 0? How will the user handle this situation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case will be protected by checking existence of reference datasets before removing any physical dataset. This can be done in the next PR.

// not mount other datasets
if len(mounted) == 0 {
return false, nil
Expand Down
43 changes: 38 additions & 5 deletions pkg/ddc/thin/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestBuild(t *testing.T) {
Expand Down Expand Up @@ -239,7 +238,7 @@ func TestCheckReferenceDatasetRuntime(t *testing.T) {
name string
dataset *datav1alpha1.Dataset
runtime *datav1alpha1.ThinRuntime
client client.Client
ctx cruntime.ReconcileRequestContext
want bool
wantErr bool
}{
Expand Down Expand Up @@ -306,14 +305,48 @@ func TestCheckReferenceDatasetRuntime(t *testing.T) {
},
},
want: false,
wantErr: true,
wantErr: false,
},
{
name: "dataset-not-exist-but-get-physical-dataset-from-runtime",
dataset: &datav1alpha1.Dataset{
ObjectMeta: metav1.ObjectMeta{
Name: "hbase-no-use",
Namespace: "fluid",
},
},
runtime: &datav1alpha1.ThinRuntime{
ObjectMeta: metav1.ObjectMeta{
Name: "hbase",
Namespace: "fluid",
},
Spec: datav1alpha1.ThinRuntimeSpec{
ThinRuntimeProfileName: "1",
},
Status: datav1alpha1.RuntimeStatus{
Mounts: []datav1alpha1.Mount{{
MountPoint: "dataset://ns-a/n-a",
}},
},
},
want: true,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fakeClient := fake.NewFakeClientWithScheme(testScheme, tt.dataset, tt.runtime)

isRef, err := CheckReferenceDatasetRuntime(fakeClient, tt.runtime)
var ctx = cruntime.ReconcileRequestContext{
NamespacedName: types.NamespacedName{
Name: "hbase",
Namespace: "fluid",
},
Client: fakeClient,
Log: fake.NullLogger(),
RuntimeType: "thin",
Runtime: tt.runtime,
}
isRef, err := CheckReferenceDatasetRuntime(ctx, tt.runtime)

if (err != nil) != tt.wantErr {
t.Errorf("expect has error %t, but get error %v", tt.wantErr, err)
Expand Down
36 changes: 23 additions & 13 deletions pkg/ddc/thin/referencedataset/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package referencedataset
import (
"context"
"fmt"
"time"

"github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/dataoperation"
"github.com/fluid-cloudnative/fluid/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
ctrl "sigs.k8s.io/controller-runtime"
"time"

"github.com/go-logr/logr"
"k8s.io/client-go/util/retry"
Expand Down Expand Up @@ -131,7 +132,7 @@ func (e *ReferenceDatasetEngine) Setup(ctx cruntime.ReconcileRequestContext) (re
// 1. get the physical datasets according the virtual dataset
dataset := ctx.Dataset

physicalNameSpacedNames := base.GetMountedDatasetNamespacedName(dataset)
physicalNameSpacedNames := base.GetPhysicalDatasetFromMounts(dataset.Spec.Mounts)
if len(physicalNameSpacedNames) != 1 {
return false, fmt.Errorf("ThinEngine can only handle dataset only mounting one dataset")
}
Expand Down Expand Up @@ -193,29 +194,38 @@ func (e *ReferenceDatasetEngine) Shutdown() (err error) {
if err != nil {
return err
}
mountedDataset, err := utils.GetDataset(e.Client, mountedRuntimeInfo.GetName(), mountedRuntimeInfo.GetNamespace())
if err != nil {
return err
}

if utils.ContainsString(mountedDataset.Status.DatasetRef, datasetRefName) {
newDataset := mountedDataset.DeepCopy()
newDataset.Status.DatasetRef = utils.RemoveString(newDataset.Status.DatasetRef, datasetRefName)
err := e.Client.Status().Update(context.TODO(), newDataset)
if mountedRuntimeInfo != nil {
mountedDataset, err := utils.GetDataset(e.Client, mountedRuntimeInfo.GetName(), mountedRuntimeInfo.GetNamespace())
if err != nil {
return err
}

if utils.ContainsString(mountedDataset.Status.DatasetRef, datasetRefName) {
newDataset := mountedDataset.DeepCopy()
newDataset.Status.DatasetRef = utils.RemoveString(newDataset.Status.DatasetRef, datasetRefName)
err := e.Client.Status().Update(context.TODO(), newDataset)
if err != nil {
return err
}
}
}
return
}

func (e *ReferenceDatasetEngine) checkDatasetMountSupport() error {
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
if err != nil {
return err
// not found dataset error indicates the runtime is deleting, pass checkDatasetMountSupport
if utils.IgnoreNotFound(err) == nil {
e.Log.Info("The dataset is not found, pass checkDatasetMountSupport because runtime is deleting")
return nil
} else {
return err
}
}

mountedNamespacedName := base.GetMountedDatasetNamespacedName(dataset)
mountedNamespacedName := base.GetPhysicalDatasetFromMounts(dataset.Spec.Mounts)
mountedSize := len(mountedNamespacedName)

// currently only support dataset mounting only one dataset
Expand All @@ -234,7 +244,7 @@ func (e *ReferenceDatasetEngine) checkDatasetMountSupport() error {
}

// currently not support mounted dataset mounting another dataset
if len(base.GetMountedDatasetNamespacedName(mountedDataset)) != 0 {
if len(base.GetPhysicalDatasetFromMounts(mountedDataset.Spec.Mounts)) != 0 {
return fmt.Errorf("ThinRuntime with no profile name can only handle dataset only mounting one dataset")
}

Expand Down
28 changes: 26 additions & 2 deletions pkg/ddc/thin/referencedataset/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (e *ReferenceDatasetEngine) getMountedDatasetRuntimeStatus() (status *datav
return status, err
}

// if mountedRuntimeInfo is nil and no err, the runtime is deleting.
if mountedRuntimeInfo == nil {
return nil, nil
}

return base.GetRuntimeStatus(e.Client, mountedRuntimeInfo.GetRuntimeType(),
mountedRuntimeInfo.GetName(), mountedRuntimeInfo.GetNamespace())
}
Expand Down Expand Up @@ -91,20 +96,39 @@ func (e *ReferenceDatasetEngine) getRuntimeInfo() (base.RuntimeInfoInterface, er
e.Log.Info("Setup with dataset done", "exclusive", e.runtimeInfo.IsExclusive())

return e.runtimeInfo, nil

}

// getMountedRuntimeInfo get mountedRuntimeInfo from dataset.
// If could not get dataset, getMountedRuntimeInfo try to get mountedRuntimeInfo from runtime status.
// And if dataset is deleted and no status.mounts, it returns nil, nil to continue runtimeDeletion.
func (e *ReferenceDatasetEngine) getMountedRuntimeInfo() (base.RuntimeInfoInterface, error) {
if e.mountedRuntimeInfo != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment: // If already have mountedRuntimeInfo, return it directly

return e.mountedRuntimeInfo, nil
}

dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
// if err is not found, try to get mountedRuntimeInfo from runtime status, don't return here.
if err != nil && utils.IgnoreNotFound(err) != nil {
return e.mountedRuntimeInfo, err
}

runtime, err := e.getRuntime()
if err != nil {
return e.mountedRuntimeInfo, err
}

mountedNameSpacedNames := base.GetMountedDatasetNamespacedName(dataset)
var mountedNameSpacedNames []types.NamespacedName
if dataset != nil {
// get mountedRuntimeInfo from dataset first
mountedNameSpacedNames = base.GetPhysicalDatasetFromMounts(dataset.Spec.Mounts)
} else if runtime.Status.Mounts != nil && len(runtime.Status.Mounts) != 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check runtime.Status.Mounts != nil here because len(nil) == 0. We can remove runtime.Status.Mounts != nil to avoid code redundancy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// then try to get mountedRuntimeInfo from runtime status
mountedNameSpacedNames = base.GetPhysicalDatasetFromMounts(runtime.Status.Mounts)
} else {
// The dataset is not found and no status.mounts, in this case, the runtime is deleting, return nil, nil
e.Log.Info("The dataset is deleted and no runtime.Status.Mounts, in this case, the runtime is deleting, return nil, nil")
return nil, nil
}
if len(mountedNameSpacedNames) != 1 {
return e.mountedRuntimeInfo, fmt.Errorf("ThinEngine with no profile name can only handle dataset only mounting one dataset")
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/ddc/thin/referencedataset/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,13 @@ func (e *ReferenceDatasetEngine) Sync(ctx cruntime.ReconcileRequestContext) (err
if err != nil {
return err
}
// status copy, include cacheStates, conditions, selector, valueFile, current*, desired*, fuse*, master*, worker* ...
// TODO: Are there some fields should not copy?
runtimeToUpdate.Status = *mountedRuntimeStatus.DeepCopy()
if mountedRuntimeStatus != nil {
// status copy, include cacheStates, conditions, selector, valueFile, current*, desired*, fuse*, master*, worker* ...
// TODO: Are there some fields should not copy?
runtimeToUpdate.Status = *mountedRuntimeStatus.DeepCopy()
}
// update status.mounts to dataset mounts
runtimeToUpdate.Status.Mounts = virtualDatasetToUpdate.Spec.Mounts

if !reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) {
err = e.Client.Status().Update(context.TODO(), runtimeToUpdate)
Expand Down
Loading