Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: reconcile thinruntime failed when dataset is deleted #3300

Merged
merged 13 commits into from
Jun 29, 2023
2 changes: 1 addition & 1 deletion pkg/controllers/runtime_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (r *RuntimeReconciler) GetDataset(ctx cruntime.ReconcileRequestContext) (*d
}

func (r *RuntimeReconciler) CheckIfReferenceDatasetIsSupported(ctx cruntime.ReconcileRequestContext) (bool, string) {
mounted := base.GetMountedDatasetNamespacedName(ctx.Dataset)
mounted := base.GetPhysicalDatasetFromMounts(ctx.Dataset.Spec.Mounts)

if len(mounted) > 0 && ctx.RuntimeType != common.ThinRuntime {
return false, "dataset mounting another dataset can only use thin runtime"
Expand Down
8 changes: 4 additions & 4 deletions pkg/ddc/base/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func GetDatasetRefName(name, namespace string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

func GetMountedDatasetNamespacedName(virtualDataset *datav1alpha1.Dataset) []types.NamespacedName {
func GetPhysicalDatasetFromMounts(mounts []datav1alpha1.Mount) []types.NamespacedName {
// virtual dataset can only mount dataset
var physicalNameSpacedName []types.NamespacedName
for _, mount := range virtualDataset.Spec.Mounts {
for _, mount := range mounts {
if common.IsFluidRefSchema(mount.MountPoint) {
datasetPath := strings.TrimPrefix(mount.MountPoint, string(common.RefSchema))
namespaceAndName := strings.Split(datasetPath, "/")
Expand All @@ -47,7 +47,7 @@ func GetMountedDatasetNamespacedName(virtualDataset *datav1alpha1.Dataset) []typ
return physicalNameSpacedName
}

func GetMountedDatasetSubPath(virtualDataset *datav1alpha1.Dataset) []string {
func GetPhysicalDatasetSubPath(virtualDataset *datav1alpha1.Dataset) []string {
var paths []string
for _, mount := range virtualDataset.Spec.Mounts {
if common.IsFluidRefSchema(mount.MountPoint) {
Expand All @@ -62,7 +62,7 @@ func GetMountedDatasetSubPath(virtualDataset *datav1alpha1.Dataset) []string {
}

func CheckReferenceDataset(dataset *datav1alpha1.Dataset) (check bool, err error) {
mounts := len(GetMountedDatasetNamespacedName(dataset))
mounts := len(GetPhysicalDatasetFromMounts(dataset.Spec.Mounts))
totalMounts := len(dataset.Spec.Mounts)
switch {
case mounts == 1:
Expand Down
12 changes: 6 additions & 6 deletions pkg/ddc/base/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestGetMountedDatasetNamespacedName(t *testing.T) {
func TestGetPhysicalDatasetFromMounts(t *testing.T) {
tests := []struct {
virtualDataset *datav1alpha1.Dataset
want int
Expand Down Expand Up @@ -79,8 +79,8 @@ func TestGetMountedDatasetNamespacedName(t *testing.T) {
},
}
for _, tt := range tests {
if got := GetMountedDatasetNamespacedName(tt.virtualDataset); len(got) != tt.want {
t.Errorf("GetMountedDatasetNamespacedName() len = %v, want %v", got, tt.want)
if got := GetPhysicalDatasetFromMounts(tt.virtualDataset.Spec.Mounts); len(got) != tt.want {
t.Errorf("GetPhysicalDatasetFromMounts() len = %v, want %v", got, tt.want)
}
}
}
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestCheckReferenceDataset(t *testing.T) {
}
}

func TestGetMountedDatasetSubPath(t *testing.T) {
func TestGetPhysicalDatasetSubPath(t *testing.T) {
type args struct {
dataset *datav1alpha1.Dataset
}
Expand Down Expand Up @@ -260,8 +260,8 @@ func TestGetMountedDatasetSubPath(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetMountedDatasetSubPath(tt.args.dataset); !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetMountedDatasetSubPath() = %v, want %v", got, tt.want)
if got := GetPhysicalDatasetSubPath(tt.args.dataset); !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetPhysicalDatasetSubPath() = %v, want %v", got, tt.want)
}
})
}
Expand Down
27 changes: 18 additions & 9 deletions pkg/ddc/thin/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Build(id string, ctx cruntime.ReconcileRequestContext) (base.Engine, error)
return nil, fmt.Errorf("engine %s is failed due to type conversion", ctx.Name)
}

isRef, err := CheckReferenceDatasetRuntime(ctx.Client, runtime)
isRef, err := CheckReferenceDatasetRuntime(ctx, runtime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in future we can simply check len(runtime.profileName) == 0 to indicate whether it is a VirtualRuntime or ThinRuntime instead of checking all the dataset mounts.

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,17 +114,26 @@ func Precheck(client client.Client, key types.NamespacedName) (found bool, err e
}

// CheckReferenceDatasetRuntime judge if this runtime is used for handling dataset mounting another dataset.
func CheckReferenceDatasetRuntime(client client.Client, runtime *datav1alpha1.ThinRuntime) (bool, error) {
dataset, err := utils.GetDataset(client, runtime.Name, runtime.Namespace)
func CheckReferenceDatasetRuntime(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.ThinRuntime) (bool, error) {
if len(runtime.Status.Mounts) != 0 {
// get physical dataset from runtime mounts
ctx.Log.V(1).Info("Get physical dataset from runtime mounts")
physicalDataset := base.GetPhysicalDatasetFromMounts(runtime.Status.Mounts)
if len(physicalDataset) != 0 {
return true, nil
}
}

dataset, err := utils.GetDataset(ctx.Client, runtime.Name, runtime.Namespace)
if err != nil {
return false, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not return every error here because in cases where len(runtime.Status.Mounts) == 0 && Dataset not found, the func will return error to keep engine building failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, CheckReferenceDatasetRuntime can't judge whether this dataset is a reference dataset, so we raise the error now. And this case will be repaired by next PR.

}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if dataset is not found and the length of runtime mounts is 0? How will the user handle this situation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case will be protected by checking existence of reference datasets before removing any physical dataset. This can be done in the next PR.


mounted := base.GetMountedDatasetNamespacedName(dataset)
// not mount other datasets
if len(mounted) == 0 {
return false, nil
// get physicalDataset from dataset
ctx.Log.V(1).Info("Get physical dataset from virtual dataset mounts")
physicalDataset := base.GetPhysicalDatasetFromMounts(dataset.Spec.Mounts)
if len(physicalDataset) != 0 {
return true, nil
}

return true, nil
return false, nil
}
41 changes: 37 additions & 4 deletions pkg/ddc/thin/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestBuild(t *testing.T) {
Expand Down Expand Up @@ -239,7 +238,7 @@ func TestCheckReferenceDatasetRuntime(t *testing.T) {
name string
dataset *datav1alpha1.Dataset
runtime *datav1alpha1.ThinRuntime
client client.Client
ctx cruntime.ReconcileRequestContext
want bool
wantErr bool
}{
Expand Down Expand Up @@ -308,12 +307,46 @@ func TestCheckReferenceDatasetRuntime(t *testing.T) {
want: false,
wantErr: true,
},
{
name: "dataset-not-exist-but-get-physical-dataset-from-runtime",
dataset: &datav1alpha1.Dataset{
ObjectMeta: metav1.ObjectMeta{
Name: "hbase-no-use",
Namespace: "fluid",
},
},
runtime: &datav1alpha1.ThinRuntime{
ObjectMeta: metav1.ObjectMeta{
Name: "hbase",
Namespace: "fluid",
},
Spec: datav1alpha1.ThinRuntimeSpec{
ThinRuntimeProfileName: "1",
},
Status: datav1alpha1.RuntimeStatus{
Mounts: []datav1alpha1.Mount{{
MountPoint: "dataset://ns-a/n-a",
}},
},
},
want: true,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fakeClient := fake.NewFakeClientWithScheme(testScheme, tt.dataset, tt.runtime)

isRef, err := CheckReferenceDatasetRuntime(fakeClient, tt.runtime)
var ctx = cruntime.ReconcileRequestContext{
NamespacedName: types.NamespacedName{
Name: "hbase",
Namespace: "fluid",
},
Client: fakeClient,
Log: fake.NullLogger(),
RuntimeType: "thin",
Runtime: tt.runtime,
}
isRef, err := CheckReferenceDatasetRuntime(ctx, tt.runtime)

if (err != nil) != tt.wantErr {
t.Errorf("expect has error %t, but get error %v", tt.wantErr, err)
Expand Down
48 changes: 24 additions & 24 deletions pkg/ddc/thin/referencedataset/cm.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ import (
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
)

func copyFuseDaemonSetForRefDataset(client client.Client, refDataset *datav1alpha1.Dataset, mountedRuntime base.RuntimeInfoInterface) error {
func copyFuseDaemonSetForRefDataset(client client.Client, refDataset *datav1alpha1.Dataset, physicalRuntimeInfo base.RuntimeInfoInterface) error {
var fuseName string
switch mountedRuntime.GetRuntimeType() {
switch physicalRuntimeInfo.GetRuntimeType() {
case common.JindoRuntime:
fuseName = mountedRuntime.GetName() + "-" + common.JindoChartName + "-fuse"
fuseName = physicalRuntimeInfo.GetName() + "-" + common.JindoChartName + "-fuse"
default:
fuseName = mountedRuntime.GetName() + "-fuse"
fuseName = physicalRuntimeInfo.GetName() + "-fuse"
}
ds, err := kubeclient.GetDaemonset(client, fuseName, mountedRuntime.GetNamespace())
ds, err := kubeclient.GetDaemonset(client, fuseName, physicalRuntimeInfo.GetNamespace())
if err != nil {
return err
}
Expand Down Expand Up @@ -71,10 +71,10 @@ func copyFuseDaemonSetForRefDataset(client client.Client, refDataset *datav1alph
return nil
}

func (e *ReferenceDatasetEngine) createConfigMapForRefDataset(client client.Client, refDataset *datav1alpha1.Dataset, mountedRuntime base.RuntimeInfoInterface) error {
mountedRuntimeType := mountedRuntime.GetRuntimeType()
mountedRuntimeName := mountedRuntime.GetName()
mountedRuntimeNamespace := mountedRuntime.GetNamespace()
func (e *ReferenceDatasetEngine) createConfigMapForRefDataset(client client.Client, refDataset *datav1alpha1.Dataset, physicalRuntimeInfo base.RuntimeInfoInterface) error {
physicalRuntimeType := physicalRuntimeInfo.GetRuntimeType()
physicalRuntimeName := physicalRuntimeInfo.GetName()
physicalRuntimeNamespace := physicalRuntimeInfo.GetNamespace()

refNameSpace := refDataset.GetNamespace()

Expand All @@ -92,56 +92,56 @@ func (e *ReferenceDatasetEngine) createConfigMapForRefDataset(client client.Clie
// Note: values configmap is not needed for fuse sidecar container.

// TODO: decoupling the switch-case, too fragile
switch mountedRuntimeType {
switch physicalRuntimeType {
// TODO: currently the dst configmap name is the same as src configmap name to avoid modify the fuse init container filed,
// but duplicated name error can occurs if the dst namespace has same named runtime.
case common.AlluxioRuntime:
configMapName := mountedRuntimeName + "-config"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: configMapName, Namespace: mountedRuntimeNamespace},
configMapName := physicalRuntimeName + "-config"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: configMapName, Namespace: physicalRuntimeNamespace},
types.NamespacedName{Name: configMapName, Namespace: refNameSpace}, ownerReference)
if err != nil {
return err
}
case common.JuiceFSRuntime:
fuseScriptConfigMapName := mountedRuntimeName + "-fuse-script"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: fuseScriptConfigMapName, Namespace: mountedRuntimeNamespace},
fuseScriptConfigMapName := physicalRuntimeName + "-fuse-script"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: fuseScriptConfigMapName, Namespace: physicalRuntimeNamespace},
types.NamespacedName{Name: fuseScriptConfigMapName, Namespace: refNameSpace}, ownerReference)
if err != nil {
return err
}
case common.GooseFSRuntime:
configMapName := mountedRuntimeName + "-config"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: configMapName, Namespace: mountedRuntimeNamespace},
configMapName := physicalRuntimeName + "-config"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: configMapName, Namespace: physicalRuntimeNamespace},
types.NamespacedName{Name: configMapName, Namespace: refNameSpace}, ownerReference)
if err != nil {
return err
}
case common.JindoRuntime:
clientConfigMapName := mountedRuntimeName + "-jindofs-client-config"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: clientConfigMapName, Namespace: mountedRuntimeNamespace},
clientConfigMapName := physicalRuntimeName + "-jindofs-client-config"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: clientConfigMapName, Namespace: physicalRuntimeNamespace},
types.NamespacedName{Name: clientConfigMapName, Namespace: refNameSpace}, ownerReference)
if err != nil {
return err
}
configMapName := mountedRuntimeName + "-jindofs-config"
err = kubeclient.CopyConfigMap(client, types.NamespacedName{Name: configMapName, Namespace: mountedRuntimeNamespace},
configMapName := physicalRuntimeName + "-jindofs-config"
err = kubeclient.CopyConfigMap(client, types.NamespacedName{Name: configMapName, Namespace: physicalRuntimeNamespace},
types.NamespacedName{Name: configMapName, Namespace: refNameSpace}, ownerReference)
if err != nil {
return err
}
case common.EFCRuntime:
// TODO: EFCRuntime needs worker-endpoint configmap which should be synced timely for ECI mode.
// Currently EFCRuntime only supports CSI mode, so do nothing here.
e.Log.Info("Skip createConfigMapForRefDataset because the mountedRuntimeType=EFC", "name", e.name, "namespace", e.namespace)
e.Log.Info("Skip createConfigMapForRefDataset because the physicalRuntimeType=EFC", "name", e.name, "namespace", e.namespace)
case common.ThinRuntime:
runtimesetConfigMapName := mountedRuntimeName + "-runtimeset"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: runtimesetConfigMapName, Namespace: mountedRuntimeNamespace},
runtimesetConfigMapName := physicalRuntimeName + "-runtimeset"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: runtimesetConfigMapName, Namespace: physicalRuntimeNamespace},
types.NamespacedName{Name: runtimesetConfigMapName, Namespace: refNameSpace}, ownerReference)
if err != nil {
return err
}
default:
err := fmt.Errorf("fail to get configmap for runtime type: %s", mountedRuntimeType)
err := fmt.Errorf("fail to get configmap for runtime type: %s", physicalRuntimeType)
return err
}

Expand Down
Loading
Loading