Skip to content

Commit

Permalink
bugfix: reconcile thinruntime failed when dataset is deleted (#3300)
Browse files Browse the repository at this point in the history
* bugfix: reconcile thinruntime failed when dataset is deleted

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

* fix checkDatasetMountSupport return statements

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

* recover reconcileInternal order

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

* rename GetMountedDatasetNamespacedName to GetPhysicalDatasetFromMounts

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

* fix CheckReferenceDatasetRuntime

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

* add logs in CheckReferenceDatasetRuntime

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

* fix getMountedRuntimeInfo return nil pointer

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

* refactor getPhysicalRuntimeInfo

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

* rename mounted dataset/runtime to physical dataset/runtime for referencedataset

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

* add ut

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

* refactor CheckReferenceDatasetRuntime

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

* Add logs

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

* update log

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

---------

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>
  • Loading branch information
wangshli committed Jun 29, 2023
1 parent 9d460aa commit b56249f
Show file tree
Hide file tree
Showing 12 changed files with 494 additions and 116 deletions.
2 changes: 1 addition & 1 deletion pkg/controllers/runtime_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (r *RuntimeReconciler) GetDataset(ctx cruntime.ReconcileRequestContext) (*d
}

func (r *RuntimeReconciler) CheckIfReferenceDatasetIsSupported(ctx cruntime.ReconcileRequestContext) (bool, string) {
mounted := base.GetMountedDatasetNamespacedName(ctx.Dataset)
mounted := base.GetPhysicalDatasetFromMounts(ctx.Dataset.Spec.Mounts)

if len(mounted) > 0 && ctx.RuntimeType != common.ThinRuntime {
return false, "dataset mounting another dataset can only use thin runtime"
Expand Down
8 changes: 4 additions & 4 deletions pkg/ddc/base/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func GetDatasetRefName(name, namespace string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

func GetMountedDatasetNamespacedName(virtualDataset *datav1alpha1.Dataset) []types.NamespacedName {
func GetPhysicalDatasetFromMounts(mounts []datav1alpha1.Mount) []types.NamespacedName {
// virtual dataset can only mount dataset
var physicalNameSpacedName []types.NamespacedName
for _, mount := range virtualDataset.Spec.Mounts {
for _, mount := range mounts {
if common.IsFluidRefSchema(mount.MountPoint) {
datasetPath := strings.TrimPrefix(mount.MountPoint, string(common.RefSchema))
namespaceAndName := strings.Split(datasetPath, "/")
Expand All @@ -47,7 +47,7 @@ func GetMountedDatasetNamespacedName(virtualDataset *datav1alpha1.Dataset) []typ
return physicalNameSpacedName
}

func GetMountedDatasetSubPath(virtualDataset *datav1alpha1.Dataset) []string {
func GetPhysicalDatasetSubPath(virtualDataset *datav1alpha1.Dataset) []string {
var paths []string
for _, mount := range virtualDataset.Spec.Mounts {
if common.IsFluidRefSchema(mount.MountPoint) {
Expand All @@ -62,7 +62,7 @@ func GetMountedDatasetSubPath(virtualDataset *datav1alpha1.Dataset) []string {
}

func CheckReferenceDataset(dataset *datav1alpha1.Dataset) (check bool, err error) {
mounts := len(GetMountedDatasetNamespacedName(dataset))
mounts := len(GetPhysicalDatasetFromMounts(dataset.Spec.Mounts))
totalMounts := len(dataset.Spec.Mounts)
switch {
case mounts == 1:
Expand Down
12 changes: 6 additions & 6 deletions pkg/ddc/base/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestGetMountedDatasetNamespacedName(t *testing.T) {
func TestGetPhysicalDatasetFromMounts(t *testing.T) {
tests := []struct {
virtualDataset *datav1alpha1.Dataset
want int
Expand Down Expand Up @@ -79,8 +79,8 @@ func TestGetMountedDatasetNamespacedName(t *testing.T) {
},
}
for _, tt := range tests {
if got := GetMountedDatasetNamespacedName(tt.virtualDataset); len(got) != tt.want {
t.Errorf("GetMountedDatasetNamespacedName() len = %v, want %v", got, tt.want)
if got := GetPhysicalDatasetFromMounts(tt.virtualDataset.Spec.Mounts); len(got) != tt.want {
t.Errorf("GetPhysicalDatasetFromMounts() len = %v, want %v", got, tt.want)
}
}
}
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestCheckReferenceDataset(t *testing.T) {
}
}

func TestGetMountedDatasetSubPath(t *testing.T) {
func TestGetPhysicalDatasetSubPath(t *testing.T) {
type args struct {
dataset *datav1alpha1.Dataset
}
Expand Down Expand Up @@ -260,8 +260,8 @@ func TestGetMountedDatasetSubPath(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetMountedDatasetSubPath(tt.args.dataset); !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetMountedDatasetSubPath() = %v, want %v", got, tt.want)
if got := GetPhysicalDatasetSubPath(tt.args.dataset); !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetPhysicalDatasetSubPath() = %v, want %v", got, tt.want)
}
})
}
Expand Down
27 changes: 18 additions & 9 deletions pkg/ddc/thin/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Build(id string, ctx cruntime.ReconcileRequestContext) (base.Engine, error)
return nil, fmt.Errorf("engine %s is failed due to type conversion", ctx.Name)
}

isRef, err := CheckReferenceDatasetRuntime(ctx.Client, runtime)
isRef, err := CheckReferenceDatasetRuntime(ctx, runtime)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,17 +114,26 @@ func Precheck(client client.Client, key types.NamespacedName) (found bool, err e
}

// CheckReferenceDatasetRuntime judge if this runtime is used for handling dataset mounting another dataset.
func CheckReferenceDatasetRuntime(client client.Client, runtime *datav1alpha1.ThinRuntime) (bool, error) {
dataset, err := utils.GetDataset(client, runtime.Name, runtime.Namespace)
func CheckReferenceDatasetRuntime(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.ThinRuntime) (bool, error) {
if len(runtime.Status.Mounts) != 0 {
// get physical dataset from runtime mounts
ctx.Log.V(1).Info("Get physical dataset from runtime mounts")
physicalDataset := base.GetPhysicalDatasetFromMounts(runtime.Status.Mounts)
if len(physicalDataset) != 0 {
return true, nil
}
}

dataset, err := utils.GetDataset(ctx.Client, runtime.Name, runtime.Namespace)
if err != nil {
return false, err
}

mounted := base.GetMountedDatasetNamespacedName(dataset)
// not mount other datasets
if len(mounted) == 0 {
return false, nil
// get physicalDataset from dataset
ctx.Log.V(1).Info("Get physical dataset from virtual dataset mounts")
physicalDataset := base.GetPhysicalDatasetFromMounts(dataset.Spec.Mounts)
if len(physicalDataset) != 0 {
return true, nil
}

return true, nil
return false, nil
}
41 changes: 37 additions & 4 deletions pkg/ddc/thin/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestBuild(t *testing.T) {
Expand Down Expand Up @@ -239,7 +238,7 @@ func TestCheckReferenceDatasetRuntime(t *testing.T) {
name string
dataset *datav1alpha1.Dataset
runtime *datav1alpha1.ThinRuntime
client client.Client
ctx cruntime.ReconcileRequestContext
want bool
wantErr bool
}{
Expand Down Expand Up @@ -308,12 +307,46 @@ func TestCheckReferenceDatasetRuntime(t *testing.T) {
want: false,
wantErr: true,
},
{
name: "dataset-not-exist-but-get-physical-dataset-from-runtime",
dataset: &datav1alpha1.Dataset{
ObjectMeta: metav1.ObjectMeta{
Name: "hbase-no-use",
Namespace: "fluid",
},
},
runtime: &datav1alpha1.ThinRuntime{
ObjectMeta: metav1.ObjectMeta{
Name: "hbase",
Namespace: "fluid",
},
Spec: datav1alpha1.ThinRuntimeSpec{
ThinRuntimeProfileName: "1",
},
Status: datav1alpha1.RuntimeStatus{
Mounts: []datav1alpha1.Mount{{
MountPoint: "dataset://ns-a/n-a",
}},
},
},
want: true,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fakeClient := fake.NewFakeClientWithScheme(testScheme, tt.dataset, tt.runtime)

isRef, err := CheckReferenceDatasetRuntime(fakeClient, tt.runtime)
var ctx = cruntime.ReconcileRequestContext{
NamespacedName: types.NamespacedName{
Name: "hbase",
Namespace: "fluid",
},
Client: fakeClient,
Log: fake.NullLogger(),
RuntimeType: "thin",
Runtime: tt.runtime,
}
isRef, err := CheckReferenceDatasetRuntime(ctx, tt.runtime)

if (err != nil) != tt.wantErr {
t.Errorf("expect has error %t, but get error %v", tt.wantErr, err)
Expand Down
48 changes: 24 additions & 24 deletions pkg/ddc/thin/referencedataset/cm.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ import (
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
)

func copyFuseDaemonSetForRefDataset(client client.Client, refDataset *datav1alpha1.Dataset, mountedRuntime base.RuntimeInfoInterface) error {
func copyFuseDaemonSetForRefDataset(client client.Client, refDataset *datav1alpha1.Dataset, physicalRuntimeInfo base.RuntimeInfoInterface) error {
var fuseName string
switch mountedRuntime.GetRuntimeType() {
switch physicalRuntimeInfo.GetRuntimeType() {
case common.JindoRuntime:
fuseName = mountedRuntime.GetName() + "-" + common.JindoChartName + "-fuse"
fuseName = physicalRuntimeInfo.GetName() + "-" + common.JindoChartName + "-fuse"
default:
fuseName = mountedRuntime.GetName() + "-fuse"
fuseName = physicalRuntimeInfo.GetName() + "-fuse"
}
ds, err := kubeclient.GetDaemonset(client, fuseName, mountedRuntime.GetNamespace())
ds, err := kubeclient.GetDaemonset(client, fuseName, physicalRuntimeInfo.GetNamespace())
if err != nil {
return err
}
Expand Down Expand Up @@ -71,10 +71,10 @@ func copyFuseDaemonSetForRefDataset(client client.Client, refDataset *datav1alph
return nil
}

func (e *ReferenceDatasetEngine) createConfigMapForRefDataset(client client.Client, refDataset *datav1alpha1.Dataset, mountedRuntime base.RuntimeInfoInterface) error {
mountedRuntimeType := mountedRuntime.GetRuntimeType()
mountedRuntimeName := mountedRuntime.GetName()
mountedRuntimeNamespace := mountedRuntime.GetNamespace()
func (e *ReferenceDatasetEngine) createConfigMapForRefDataset(client client.Client, refDataset *datav1alpha1.Dataset, physicalRuntimeInfo base.RuntimeInfoInterface) error {
physicalRuntimeType := physicalRuntimeInfo.GetRuntimeType()
physicalRuntimeName := physicalRuntimeInfo.GetName()
physicalRuntimeNamespace := physicalRuntimeInfo.GetNamespace()

refNameSpace := refDataset.GetNamespace()

Expand All @@ -92,56 +92,56 @@ func (e *ReferenceDatasetEngine) createConfigMapForRefDataset(client client.Clie
// Note: values configmap is not needed for fuse sidecar container.

// TODO: decoupling the switch-case, too fragile
switch mountedRuntimeType {
switch physicalRuntimeType {
// TODO: currently the dst configmap name is the same as src configmap name to avoid modify the fuse init container filed,
// but duplicated name error can occurs if the dst namespace has same named runtime.
case common.AlluxioRuntime:
configMapName := mountedRuntimeName + "-config"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: configMapName, Namespace: mountedRuntimeNamespace},
configMapName := physicalRuntimeName + "-config"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: configMapName, Namespace: physicalRuntimeNamespace},
types.NamespacedName{Name: configMapName, Namespace: refNameSpace}, ownerReference)
if err != nil {
return err
}
case common.JuiceFSRuntime:
fuseScriptConfigMapName := mountedRuntimeName + "-fuse-script"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: fuseScriptConfigMapName, Namespace: mountedRuntimeNamespace},
fuseScriptConfigMapName := physicalRuntimeName + "-fuse-script"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: fuseScriptConfigMapName, Namespace: physicalRuntimeNamespace},
types.NamespacedName{Name: fuseScriptConfigMapName, Namespace: refNameSpace}, ownerReference)
if err != nil {
return err
}
case common.GooseFSRuntime:
configMapName := mountedRuntimeName + "-config"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: configMapName, Namespace: mountedRuntimeNamespace},
configMapName := physicalRuntimeName + "-config"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: configMapName, Namespace: physicalRuntimeNamespace},
types.NamespacedName{Name: configMapName, Namespace: refNameSpace}, ownerReference)
if err != nil {
return err
}
case common.JindoRuntime:
clientConfigMapName := mountedRuntimeName + "-jindofs-client-config"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: clientConfigMapName, Namespace: mountedRuntimeNamespace},
clientConfigMapName := physicalRuntimeName + "-jindofs-client-config"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: clientConfigMapName, Namespace: physicalRuntimeNamespace},
types.NamespacedName{Name: clientConfigMapName, Namespace: refNameSpace}, ownerReference)
if err != nil {
return err
}
configMapName := mountedRuntimeName + "-jindofs-config"
err = kubeclient.CopyConfigMap(client, types.NamespacedName{Name: configMapName, Namespace: mountedRuntimeNamespace},
configMapName := physicalRuntimeName + "-jindofs-config"
err = kubeclient.CopyConfigMap(client, types.NamespacedName{Name: configMapName, Namespace: physicalRuntimeNamespace},
types.NamespacedName{Name: configMapName, Namespace: refNameSpace}, ownerReference)
if err != nil {
return err
}
case common.EFCRuntime:
// TODO: EFCRuntime needs worker-endpoint configmap which should be synced timely for ECI mode.
// Currently EFCRuntime only supports CSI mode, so do nothing here.
e.Log.Info("Skip createConfigMapForRefDataset because the mountedRuntimeType=EFC", "name", e.name, "namespace", e.namespace)
e.Log.Info("Skip createConfigMapForRefDataset because the physicalRuntimeType=EFC", "name", e.name, "namespace", e.namespace)
case common.ThinRuntime:
runtimesetConfigMapName := mountedRuntimeName + "-runtimeset"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: runtimesetConfigMapName, Namespace: mountedRuntimeNamespace},
runtimesetConfigMapName := physicalRuntimeName + "-runtimeset"
err := kubeclient.CopyConfigMap(client, types.NamespacedName{Name: runtimesetConfigMapName, Namespace: physicalRuntimeNamespace},
types.NamespacedName{Name: runtimesetConfigMapName, Namespace: refNameSpace}, ownerReference)
if err != nil {
return err
}
default:
err := fmt.Errorf("fail to get configmap for runtime type: %s", mountedRuntimeType)
err := fmt.Errorf("fail to get configmap for runtime type: %s", physicalRuntimeType)
return err
}

Expand Down
Loading

0 comments on commit b56249f

Please sign in to comment.