Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: reconcile thinruntime failed when dataset is deleted #3300

Merged
merged 13 commits into from
Jun 29, 2023
2 changes: 1 addition & 1 deletion pkg/controllers/runtime_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (r *RuntimeReconciler) GetDataset(ctx cruntime.ReconcileRequestContext) (*d
}

func (r *RuntimeReconciler) CheckIfReferenceDatasetIsSupported(ctx cruntime.ReconcileRequestContext) (bool, string) {
mounted := base.GetMountedDatasetNamespacedName(ctx.Dataset)
mounted := base.GetPhysicalDatasetFromMounts(ctx.Dataset.Spec.Mounts)

if len(mounted) > 0 && ctx.RuntimeType != common.ThinRuntime {
return false, "dataset mounting another dataset can only use thin runtime"
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddc/base/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func GetDatasetRefName(name, namespace string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

func GetMountedDatasetNamespacedName(virtualDataset *datav1alpha1.Dataset) []types.NamespacedName {
func GetPhysicalDatasetFromMounts(mounts []datav1alpha1.Mount) []types.NamespacedName {
// virtual dataset can only mount dataset
var physicalNameSpacedName []types.NamespacedName
for _, mount := range virtualDataset.Spec.Mounts {
for _, mount := range mounts {
if common.IsFluidRefSchema(mount.MountPoint) {
datasetPath := strings.TrimPrefix(mount.MountPoint, string(common.RefSchema))
namespaceAndName := strings.Split(datasetPath, "/")
Expand Down Expand Up @@ -62,7 +62,7 @@ func GetMountedDatasetSubPath(virtualDataset *datav1alpha1.Dataset) []string {
}

func CheckReferenceDataset(dataset *datav1alpha1.Dataset) (check bool, err error) {
mounts := len(GetMountedDatasetNamespacedName(dataset))
mounts := len(GetPhysicalDatasetFromMounts(dataset.Spec.Mounts))
totalMounts := len(dataset.Spec.Mounts)
switch {
case mounts == 1:
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddc/base/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestGetMountedDatasetNamespacedName(t *testing.T) {
func TestGetPhysicalDatasetFromMounts(t *testing.T) {
tests := []struct {
virtualDataset *datav1alpha1.Dataset
want int
Expand Down Expand Up @@ -79,8 +79,8 @@ func TestGetMountedDatasetNamespacedName(t *testing.T) {
},
}
for _, tt := range tests {
if got := GetMountedDatasetNamespacedName(tt.virtualDataset); len(got) != tt.want {
t.Errorf("GetMountedDatasetNamespacedName() len = %v, want %v", got, tt.want)
if got := GetPhysicalDatasetFromMounts(tt.virtualDataset.Spec.Mounts); len(got) != tt.want {
t.Errorf("GetPhysicalDatasetFromMounts() len = %v, want %v", got, tt.want)
}
}
}
Expand Down
20 changes: 15 additions & 5 deletions pkg/ddc/thin/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Build(id string, ctx cruntime.ReconcileRequestContext) (base.Engine, error)
return nil, fmt.Errorf("engine %s is failed due to type conversion", ctx.Name)
}

isRef, err := CheckReferenceDatasetRuntime(ctx.Client, runtime)
isRef, err := CheckReferenceDatasetRuntime(ctx, runtime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in future we can simply check len(runtime.profileName) == 0 to indicate whether it is a VirtualRuntime or ThinRuntime instead of checking all the dataset mounts.

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,13 +114,23 @@ func Precheck(client client.Client, key types.NamespacedName) (found bool, err e
}

// CheckReferenceDatasetRuntime judge if this runtime is used for handling dataset mounting another dataset.
func CheckReferenceDatasetRuntime(client client.Client, runtime *datav1alpha1.ThinRuntime) (bool, error) {
dataset, err := utils.GetDataset(client, runtime.Name, runtime.Namespace)
if err != nil {
func CheckReferenceDatasetRuntime(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.ThinRuntime) (bool, error) {
dataset, err := utils.GetDataset(ctx.Client, runtime.Name, runtime.Namespace)
if err != nil && utils.IgnoreNotFound(err) != nil {
// return if it is not a not-found error
return false, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not return every error here because in cases where len(runtime.Status.Mounts) == 0 && Dataset not found, the func will return error to keep engine building failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, CheckReferenceDatasetRuntime can't judge whether this dataset is a reference dataset, so we raise the error now. And this case will be repaired by next PR.

}

mounted := base.GetMountedDatasetNamespacedName(dataset)
var mounted []types.NamespacedName
if dataset != nil {
// getMountedDataset from dataset first
ctx.Log.V(1).Info("Get physical dataset from virtual dataset mounts")
mounted = base.GetPhysicalDatasetFromMounts(dataset.Spec.Mounts)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding more logging info for debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

} else if len(runtime.Status.Mounts) != 0 {
// Virtual dataset not found, try to getMountedDataset from runtime
ctx.Log.V(1).Info("Virtual dataset not found, try to get physical dataset from runtime mounts")
mounted = base.GetPhysicalDatasetFromMounts(runtime.Status.Mounts)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if dataset is not found and the length of runtime mounts is 0? How will the user handle this situation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case will be protected by checking existence of reference datasets before removing any physical dataset. This can be done in the next PR.

// not mount other datasets
if len(mounted) == 0 {
return false, nil
Expand Down
43 changes: 38 additions & 5 deletions pkg/ddc/thin/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestBuild(t *testing.T) {
Expand Down Expand Up @@ -239,7 +238,7 @@ func TestCheckReferenceDatasetRuntime(t *testing.T) {
name string
dataset *datav1alpha1.Dataset
runtime *datav1alpha1.ThinRuntime
client client.Client
ctx cruntime.ReconcileRequestContext
want bool
wantErr bool
}{
Expand Down Expand Up @@ -306,14 +305,48 @@ func TestCheckReferenceDatasetRuntime(t *testing.T) {
},
},
want: false,
wantErr: true,
wantErr: false,
},
{
name: "dataset-not-exist-but-get-physical-dataset-from-runtime",
dataset: &datav1alpha1.Dataset{
ObjectMeta: metav1.ObjectMeta{
Name: "hbase-no-use",
Namespace: "fluid",
},
},
runtime: &datav1alpha1.ThinRuntime{
ObjectMeta: metav1.ObjectMeta{
Name: "hbase",
Namespace: "fluid",
},
Spec: datav1alpha1.ThinRuntimeSpec{
ThinRuntimeProfileName: "1",
},
Status: datav1alpha1.RuntimeStatus{
Mounts: []datav1alpha1.Mount{{
MountPoint: "dataset://ns-a/n-a",
}},
},
},
want: true,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fakeClient := fake.NewFakeClientWithScheme(testScheme, tt.dataset, tt.runtime)

isRef, err := CheckReferenceDatasetRuntime(fakeClient, tt.runtime)
var ctx = cruntime.ReconcileRequestContext{
NamespacedName: types.NamespacedName{
Name: "hbase",
Namespace: "fluid",
},
Client: fakeClient,
Log: fake.NullLogger(),
RuntimeType: "thin",
Runtime: tt.runtime,
}
isRef, err := CheckReferenceDatasetRuntime(ctx, tt.runtime)

if (err != nil) != tt.wantErr {
t.Errorf("expect has error %t, but get error %v", tt.wantErr, err)
Expand Down
43 changes: 28 additions & 15 deletions pkg/ddc/thin/referencedataset/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package referencedataset
import (
"context"
"fmt"
"time"

"github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/dataoperation"
"github.com/fluid-cloudnative/fluid/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
ctrl "sigs.k8s.io/controller-runtime"
"time"

"github.com/go-logr/logr"
"k8s.io/client-go/util/retry"
Expand Down Expand Up @@ -121,7 +122,10 @@ func BuildReferenceDatasetThinEngine(id string, ctx cruntime.ReconcileRequestCon
// get the mountedRuntimeInfo
_, err = engine.getMountedRuntimeInfo()
if err != nil {
return nil, fmt.Errorf("engine %s failed to get mounted dataset's runtime info", ctx.Name)
// return err if the runtime is running or error is not not-found
if utils.IgnoreNotFound(err) != nil || ctx.Runtime.GetDeletionTimestamp().IsZero() {
return nil, fmt.Errorf("engine %s failed to get mounted dataset's runtime info", ctx.Name)
}
}

return engine, err
Expand All @@ -131,7 +135,7 @@ func (e *ReferenceDatasetEngine) Setup(ctx cruntime.ReconcileRequestContext) (re
// 1. get the physical datasets according the virtual dataset
dataset := ctx.Dataset

physicalNameSpacedNames := base.GetMountedDatasetNamespacedName(dataset)
physicalNameSpacedNames := base.GetPhysicalDatasetFromMounts(dataset.Spec.Mounts)
if len(physicalNameSpacedNames) != 1 {
return false, fmt.Errorf("ThinEngine can only handle dataset only mounting one dataset")
}
Expand Down Expand Up @@ -190,32 +194,41 @@ func (e *ReferenceDatasetEngine) Shutdown() (err error) {
datasetRefName := base.GetDatasetRefName(e.name, e.namespace)

mountedRuntimeInfo, err := e.getMountedRuntimeInfo()
if err != nil {
return err
}
mountedDataset, err := utils.GetDataset(e.Client, mountedRuntimeInfo.GetName(), mountedRuntimeInfo.GetNamespace())
if err != nil {
if err != nil && utils.IgnoreNotFound(err) != nil {
return err
}

if utils.ContainsString(mountedDataset.Status.DatasetRef, datasetRefName) {
newDataset := mountedDataset.DeepCopy()
newDataset.Status.DatasetRef = utils.RemoveString(newDataset.Status.DatasetRef, datasetRefName)
err := e.Client.Status().Update(context.TODO(), newDataset)
if mountedRuntimeInfo != nil {
mountedDataset, err := utils.GetDataset(e.Client, mountedRuntimeInfo.GetName(), mountedRuntimeInfo.GetNamespace())
if err != nil {
return err
}

if utils.ContainsString(mountedDataset.Status.DatasetRef, datasetRefName) {
newDataset := mountedDataset.DeepCopy()
newDataset.Status.DatasetRef = utils.RemoveString(newDataset.Status.DatasetRef, datasetRefName)
err := e.Client.Status().Update(context.TODO(), newDataset)
if err != nil {
return err
}
}
}
return
}

func (e *ReferenceDatasetEngine) checkDatasetMountSupport() error {
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
if err != nil {
return err
// not found dataset error indicates the runtime is deleting, pass checkDatasetMountSupport
if utils.IgnoreNotFound(err) == nil {
e.Log.Info("The dataset is not found, pass checkDatasetMountSupport because runtime is deleting")
return nil
} else {
return err
}
}

mountedNamespacedName := base.GetMountedDatasetNamespacedName(dataset)
mountedNamespacedName := base.GetPhysicalDatasetFromMounts(dataset.Spec.Mounts)
mountedSize := len(mountedNamespacedName)

// currently only support dataset mounting only one dataset
Expand All @@ -234,7 +247,7 @@ func (e *ReferenceDatasetEngine) checkDatasetMountSupport() error {
}

// currently not support mounted dataset mounting another dataset
if len(base.GetMountedDatasetNamespacedName(mountedDataset)) != 0 {
if len(base.GetPhysicalDatasetFromMounts(mountedDataset.Spec.Mounts)) != 0 {
return fmt.Errorf("ThinRuntime with no profile name can only handle dataset only mounting one dataset")
}

Expand Down
31 changes: 27 additions & 4 deletions pkg/ddc/thin/referencedataset/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (e *ReferenceDatasetEngine) getMountedDatasetRuntimeStatus() (status *datav
return status, err
}

// if mountedRuntimeInfo is nil and no err, the runtime is deleting.
if mountedRuntimeInfo == nil {
return nil, nil
}

return base.GetRuntimeStatus(e.Client, mountedRuntimeInfo.GetRuntimeType(),
mountedRuntimeInfo.GetName(), mountedRuntimeInfo.GetNamespace())
}
Expand Down Expand Up @@ -91,22 +96,40 @@ func (e *ReferenceDatasetEngine) getRuntimeInfo() (base.RuntimeInfoInterface, er
e.Log.Info("Setup with dataset done", "exclusive", e.runtimeInfo.IsExclusive())

return e.runtimeInfo, nil

}

// getMountedRuntimeInfo get mountedRuntimeInfo from dataset.
// If could not get dataset, getMountedRuntimeInfo try to get mountedRuntimeInfo from runtime status.
func (e *ReferenceDatasetEngine) getMountedRuntimeInfo() (base.RuntimeInfoInterface, error) {
if e.mountedRuntimeInfo != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment: // If already have mountedRuntimeInfo, return it directly

return e.mountedRuntimeInfo, nil
}

dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
runtime, err := e.getRuntime()
if err != nil {
return e.mountedRuntimeInfo, err
}

mountedNameSpacedNames := base.GetMountedDatasetNamespacedName(dataset)
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
if err != nil && utils.IgnoreNotFound(err) != nil {
// return if it is not a not-found error
return e.mountedRuntimeInfo, err
}

var mountedNameSpacedNames []types.NamespacedName
if dataset != nil {
// get mountedRuntimeInfo from dataset first
mountedNameSpacedNames = base.GetPhysicalDatasetFromMounts(dataset.Spec.Mounts)
} else if len(runtime.Status.Mounts) != 0 {
// then try to get mountedRuntimeInfo from runtime status
mountedNameSpacedNames = base.GetPhysicalDatasetFromMounts(runtime.Status.Mounts)
} else {
// err can only be not-found error
return e.mountedRuntimeInfo, err
}

if len(mountedNameSpacedNames) != 1 {
return e.mountedRuntimeInfo, fmt.Errorf("ThinEngine with no profile name can only handle dataset only mounting one dataset")
return e.mountedRuntimeInfo, fmt.Errorf("ThinEngine with no profile name can only handle dataset only mounting one dataset but get %v", len(mountedNameSpacedNames))
}
namespacedName := mountedNameSpacedNames[0]

Expand Down
10 changes: 7 additions & 3 deletions pkg/ddc/thin/referencedataset/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,13 @@ func (e *ReferenceDatasetEngine) Sync(ctx cruntime.ReconcileRequestContext) (err
if err != nil {
return err
}
// status copy, include cacheStates, conditions, selector, valueFile, current*, desired*, fuse*, master*, worker* ...
// TODO: Are there some fields should not copy?
runtimeToUpdate.Status = *mountedRuntimeStatus.DeepCopy()
if mountedRuntimeStatus != nil {
// status copy, include cacheStates, conditions, selector, valueFile, current*, desired*, fuse*, master*, worker* ...
// TODO: Are there some fields should not copy?
runtimeToUpdate.Status = *mountedRuntimeStatus.DeepCopy()
}
// update status.mounts to dataset mounts
runtimeToUpdate.Status.Mounts = virtualDatasetToUpdate.Spec.Mounts

if !reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) {
err = e.Client.Status().Update(context.TODO(), runtimeToUpdate)
Expand Down
Loading