Skip to content

Commit

Permalink
Cherry-pick to 0.9: fix csi plugin concurrency issue on FuseRecovery …
Browse files Browse the repository at this point in the history
…and NodeUnpublishVolume (#3448) (#3453)

* Bugfix: ignore not connected error in NodeUnpublishVolume (#3445)

* ignore not connected error in NodeUnpublishVolume

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

* fix check nil error

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

* simplify error judgment

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

---------

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>

* bugfix: fix csi plugin concurrency issue on FuseRecovery and NodeUnpublishVolume (#3448)

* Add comments for NodeUnpublishVolume

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

* Refactor NodeUnpublishVolume code

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

* FuseRecovery uses volume locks to avoid race conditions

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

* Refactor node server with codes.Internal error code

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

* Rename CSI Config to RunningContext

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

* Fix github actions checks

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

* Fix lock release

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

* Refactor recover logic

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

---------

Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>

---------

Signed-off-by: wangshulin <wangshulin@smail.nju.edu.cn>
Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>
Co-authored-by: wangshulin <89928606+wangshli@users.noreply.github.com>
  • Loading branch information
TrafalgarZZZ and wangshli committed Sep 13, 2023
1 parent 6acbbcb commit 7b01a4c
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 76 deletions.
19 changes: 11 additions & 8 deletions cmd/csi/app/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/csi"
"github.com/fluid-cloudnative/fluid/pkg/csi/config"
"github.com/fluid-cloudnative/fluid/pkg/utils"
utilfeature "github.com/fluid-cloudnative/fluid/pkg/utils/feature"
"github.com/golang/glog"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -110,15 +111,17 @@ func handle() {
panic(fmt.Sprintf("csi: unable to create controller manager due to error %v", err))
}

config := config.Config{
NodeId: nodeID,
Endpoint: endpoint,
PruneFs: pruneFs,
PrunePath: prunePath,
KubeletConfigPath: kubeletKubeConfigPath,
runningContext := config.RunningContext{
Config: config.Config{
NodeId: nodeID,
Endpoint: endpoint,
PruneFs: pruneFs,
PrunePath: prunePath,
KubeletConfigPath: kubeletKubeConfigPath,
},
VolumeLocks: utils.NewVolumeLocks(),
}

if err = csi.SetupWithManager(mgr, config); err != nil {
if err = csi.SetupWithManager(mgr, runningContext); err != nil {
panic(fmt.Sprintf("unable to set up manager due to error %v", err))
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/csi/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ limitations under the License.

package config

import "github.com/fluid-cloudnative/fluid/pkg/utils"

type RunningContext struct {
Config
VolumeLocks *utils.VolumeLocks
}

type Config struct {
NodeId string
Endpoint string
Expand Down
7 changes: 5 additions & 2 deletions pkg/csi/plugins/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ type driver struct {
nodeAuthorizedClient *kubernetes.Clientset
csiDriver *csicommon.CSIDriver
nodeId, endpoint string

locks *utils.VolumeLocks
}

var _ manager.Runnable = &driver{}

func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.Reader, nodeAuthorizedClient *kubernetes.Clientset) *driver {
func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.Reader, nodeAuthorizedClient *kubernetes.Clientset, locks *utils.VolumeLocks) *driver {
glog.Infof("Driver: %v version: %v", driverName, version)

proto, addr := utils.SplitSchemaAddr(endpoint)
Expand Down Expand Up @@ -76,6 +78,7 @@ func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.R
client: client,
nodeAuthorizedClient: nodeAuthorizedClient,
apiReader: apiReader,
locks: locks,
}
}

Expand All @@ -92,7 +95,7 @@ func (d *driver) newNodeServer() *nodeServer {
client: d.client,
apiReader: d.apiReader,
nodeAuthorizedClient: d.nodeAuthorizedClient,
locks: utils.NewVolumeLocks(),
locks: d.locks,
}
}

Expand Down
72 changes: 38 additions & 34 deletions pkg/csi/plugins/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
if err := os.MkdirAll(targetPath, 0750); err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else {
glog.Infof("MkdirAll successful. %v", targetPath)
glog.Infof("NodePublishVolume: MkdirAll successful on %v", targetPath)
}
//isMount = true
} else {
Expand All @@ -91,21 +91,20 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}

if isMount {
glog.Infof("It's already mounted to %v", targetPath)
glog.Infof("NodePublishVolume: already mounted to %v, do nothing", targetPath)
return &csi.NodePublishVolumeResponse{}, nil
} else {
glog.Infof("Try to mount to %v", targetPath)
}

glog.Infof("NodePublishVolume: start mounting staging path to %v", targetPath)
// 0. check if read only
readOnly := false
if req.GetVolumeCapability() == nil {
glog.Infoln("Volume Capability is nil")
glog.Infoln("NodePublishVolume: found volume capability is nil")
} else {
mode := req.GetVolumeCapability().GetAccessMode().GetMode()
if mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY {
readOnly = true
glog.Infof("Set the mount option readonly=%v", readOnly)
glog.Infof("NodePublishVolume: set the mount option readonly=%v", readOnly)
}
}

Expand Down Expand Up @@ -155,7 +154,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
command := exec.Command("mount", args...)

glog.V(4).Infoln(command)
glog.V(3).Infof("NodePublishVolume: exec command %v", command)
stdoutStderr, err := command.CombinedOutput()
glog.V(4).Infoln(string(stdoutStderr))
if err != nil {
Expand All @@ -167,65 +166,70 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
return nil, status.Error(codes.Internal, err.Error())
} else {
glog.V(4).Infof("Succeed in binding %s to %s", mountPath, targetPath)
glog.V(3).Infof("NodePublishVolume: succeed in binding %s to %s", mountPath, targetPath)
}

return &csi.NodePublishVolumeResponse{}, nil
}

// NodeUnpublishVolume umounts every mounted file systems on the given req.GetTargetPath() until it's cleaned up.
// If anything unexpected happened during the umount process, it returns error and wait for retries.
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
targetPath := req.GetTargetPath()
// check targetpath validity
if len(targetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume operation requires targetPath but is not provided")
}

// The lock is to avoid race condition
// The lock is to avoid race condition, make sure only one goroutine(including the FUSE Recovery goroutine) is handling the targetPath
if lock := ns.locks.TryAcquire(targetPath); !lock {
return nil, status.Errorf(codes.Aborted, "NodeUnpublishVolume operation on targetPath %s already exists", targetPath)
}
defer ns.locks.Release(targetPath)

// check path existence
_, err := os.Stat(targetPath)
// No need to unmount non-existing targetPath
if os.IsNotExist(err) {
glog.V(3).Infof("NodeUnpublishVolume: targetPath %s has been cleaned up, so it doesn't need to be unmounted", targetPath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
if err != nil {
return nil, errors.Wrapf(err, "NodeUnpublishVolume: stat targetPath %s error %v", targetPath, err)
}

// targetPath may be mount bind many times when mount point recovered.
// targetPath may be bind mount many times when mount point recovered.
// umount until it's not mounted.
mounter := mount.New("")
for {
notMount, err := mounter.IsLikelyNotMountPoint(targetPath)
if os.IsNotExist(err) {
glog.V(3).Infof("NodeUnpublishVolume: targetPath %s has been cleaned up, so it doesn't need to be unmounted", targetPath)
break
}
if err != nil {
glog.V(3).Infoln(err)
if corrupted := mount.IsCorruptedMnt(err); !corrupted {
return nil, errors.Wrapf(err, "NodeUnpublishVolume: stat targetPath %s error %v", targetPath, err)
if !mount.IsCorruptedMnt(err) {
// stat targetPath with unexpected error
glog.Errorf("NodeUnpublishVolume: stat targetPath %s with error: %v", targetPath, err)
return nil, status.Errorf(codes.Internal, "NodeUnpublishVolume: stat targetPath %s: %v", targetPath, err)
} else {
// targetPath is corrupted
glog.V(3).Infof("NodeUnpublishVolume: detected corrupted mountpoint on path %s with error %v", targetPath, err)
}
}

if notMount {
glog.V(3).Infof("umount:%s success", targetPath)
glog.V(3).Infof("NodeUnpublishVolume: umount %s success", targetPath)
break
}

glog.V(3).Infof("umount:%s", targetPath)
glog.V(3).Infof("NodeUnpublishVolume: exec umount %s", targetPath)
err = mounter.Unmount(targetPath)
if os.IsNotExist(err) {
glog.V(3).Infof("NodeUnpublishVolume: targetPath %s has been cleaned up when umounting it", targetPath)
break
}
if err != nil {
glog.V(3).Infoln(err)
return nil, errors.Wrapf(err, "NodeUnpublishVolume: umount targetPath %s error %v", targetPath, err)
glog.Errorf("NodeUnpublishVolume: umount targetPath %s with error: %v", targetPath, err)
return nil, status.Errorf(codes.Internal, "NodeUnpublishVolume: umount targetPath %s: %v", targetPath, err)
}
}

err = mount.CleanupMountPoint(req.GetTargetPath(), mount.New(""), false)
err := mount.CleanupMountPoint(targetPath, mounter, false)
if err != nil {
glog.V(3).Infoln(err)
glog.Errorf("NodeUnpublishVolume: failed when cleanupMountPoint on path %s: %v", targetPath, err)
return nil, status.Errorf(codes.Internal, "NodeUnpublishVolume: failed when cleanupMountPoint on path %s: %v", targetPath, err)
} else {
glog.V(4).Infof("Succeed in umounting %s", targetPath)
glog.V(4).Infof("NodeUnpublishVolume: succeed in umounting %s", targetPath)
}

return &csi.NodeUnpublishVolumeResponse{}, nil
Expand Down Expand Up @@ -271,14 +275,14 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag

var shouldCleanFuse bool
cleanPolicy := runtimeInfo.GetFuseCleanPolicy()
glog.Infof("Using %s clean policy for runtime %s in namespace %s", cleanPolicy, runtimeInfo.GetName(), runtimeInfo.GetNamespace())
glog.Infof("NodeUnstageVolume: Using %s clean policy for runtime %s in namespace %s", cleanPolicy, runtimeInfo.GetName(), runtimeInfo.GetNamespace())
switch cleanPolicy {
case v1alpha1.OnDemandCleanPolicy:
shouldCleanFuse = true
case v1alpha1.OnRuntimeDeletedCleanPolicy:
shouldCleanFuse = false
default:
return nil, errors.Errorf("Unknown Fuse clean policy: %s", cleanPolicy)
return nil, errors.Errorf("NodeUnstageVolume: unknown Fuse clean policy: %s", cleanPolicy)
}

if !shouldCleanFuse {
Expand Down Expand Up @@ -343,7 +347,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
// 2. clean up broken mount point
fluidPath := req.GetVolumeContext()[common.VolumeAttrFluidPath]
if ignoredErr := cleanUpBrokenMountPoint(fluidPath); ignoredErr != nil {
glog.Warningf("Ignoring error when cleaning up broken mount point %v: %v", fluidPath, ignoredErr)
glog.Warningf("NodeStageVolume: Ignoring error when cleaning up broken mount point %v: %v", fluidPath, ignoredErr)
}

// 3. get runtime namespace and name
Expand Down
6 changes: 3 additions & 3 deletions pkg/csi/plugins/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
)

// Register initializes the csi driver and registers it to the controller manager.
func Register(mgr manager.Manager, cfg config.Config) error {
client, err := kubelet.InitNodeAuthorizedClient(cfg.KubeletConfigPath)
func Register(mgr manager.Manager, ctx config.RunningContext) error {
client, err := kubelet.InitNodeAuthorizedClient(ctx.KubeletConfigPath)
if err != nil {
return err
}

csiDriver := NewDriver(cfg.NodeId, cfg.Endpoint, mgr.GetClient(), mgr.GetAPIReader(), client)
csiDriver := NewDriver(ctx.NodeId, ctx.Endpoint, mgr.GetClient(), mgr.GetAPIReader(), client, ctx.VolumeLocks)

if err := mgr.Add(csiDriver); err != nil {
return err
Expand Down
80 changes: 59 additions & 21 deletions pkg/csi/recover/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type FuseRecover struct {

recoverFusePeriod time.Duration
recoverWarningThreshold int

locks *utils.VolumeLocks
}

func initializeKubeletClient() (*kubelet.KubeletClient, error) {
Expand Down Expand Up @@ -102,7 +104,7 @@ func initializeKubeletClient() (*kubelet.KubeletClient, error) {
return kubeletClient, nil
}

func NewFuseRecover(kubeClient client.Client, recorder record.EventRecorder, apiReader client.Reader) (*FuseRecover, error) {
func NewFuseRecover(kubeClient client.Client, recorder record.EventRecorder, apiReader client.Reader, locks *utils.VolumeLocks) (*FuseRecover, error) {
glog.V(3).Infoln("start csi recover")
mountRoot, err := utils.GetMountRoot()
if err != nil {
Expand All @@ -129,6 +131,7 @@ func NewFuseRecover(kubeClient client.Client, recorder record.EventRecorder, api
Recorder: recorder,
recoverFusePeriod: recoverFusePeriod,
recoverWarningThreshold: recoverWarningThreshold,
locks: locks,
}, nil
}

Expand All @@ -151,42 +154,28 @@ func (r *FuseRecover) runOnce() {
r.recover()
}

func (r FuseRecover) recover() {
func (r *FuseRecover) recover() {
brokenMounts, err := mountinfo.GetBrokenMountPoints()
if err != nil {
glog.Error(err)
return
}

for _, point := range brokenMounts {
glog.V(4).Infof("Get broken mount point: %v", point)
// if app container restart, umount duplicate mount may lead to recover successed but can not access data
// so we only umountDuplicate when it has mounted more than the recoverWarningThreshold
// please refer to https://github.com/fluid-cloudnative/fluid/issues/3399 for more information
if point.Count > r.recoverWarningThreshold {
glog.Warningf("Mountpoint %s has been mounted %v times, exceeding the recoveryWarningThreshold %v, unmount duplicate mountpoint to avoid large /proc/self/mountinfo file, this may potential make data access connection broken", point.MountPath, point.Count, r.recoverWarningThreshold)
r.eventRecord(point, corev1.EventTypeWarning, common.FuseUmountDuplicate)
r.umountDuplicate(point)
}
if err := r.recoverBrokenMount(point); err != nil {
r.eventRecord(point, corev1.EventTypeWarning, common.FuseRecoverFailed)
continue
}
r.eventRecord(point, corev1.EventTypeNormal, common.FuseRecoverSucceed)
r.doRecover(point)
}
}

func (r *FuseRecover) recoverBrokenMount(point mountinfo.MountPoint) (err error) {
glog.V(3).Infof("Start recovery: [%s], source path: [%s]", point.MountPath, point.SourcePath)
// recovery for each bind mount path
mountOption := []string{"bind"}
if point.ReadOnly {
mountOption = append(mountOption, "ro")
}

glog.V(3).Infof("Start exec cmd: mount %s %s -o %v \n", point.SourcePath, point.MountPath, mountOption)
glog.V(3).Infof("FuseRecovery: Start exec cmd: mount %s %s -o %v \n", point.SourcePath, point.MountPath, mountOption)
if err := r.Mount(point.SourcePath, point.MountPath, "none", mountOption); err != nil {
glog.Errorf("exec cmd: mount -o bind %s %s err :%v", point.SourcePath, point.MountPath, err)
glog.Errorf("FuseRecovery: exec cmd: mount -o bind %s %s with err :%v", point.SourcePath, point.MountPath, err)
}
return
}
Expand All @@ -196,9 +185,9 @@ func (r *FuseRecover) recoverBrokenMount(point mountinfo.MountPoint) (err error)
// don't umount all item, 'mountPropagation' will lose efficacy.
func (r *FuseRecover) umountDuplicate(point mountinfo.MountPoint) {
for i := point.Count; i > 1; i-- {
glog.V(3).Infof("count: %d, start exec cmd: umount %s", i, point.MountPath)
glog.V(3).Infof("FuseRecovery: count: %d, start exec cmd: umount %s", i, point.MountPath)
if err := r.Unmount(point.MountPath); err != nil {
glog.Errorf("exec cmd: umount %s err: %v", point.MountPath, err)
glog.Errorf("FuseRecovery: exec cmd: umount %s with err: %v", point.MountPath, err)
}
}
}
Expand Down Expand Up @@ -231,3 +220,52 @@ func (r *FuseRecover) eventRecord(point mountinfo.MountPoint, eventType, eventRe
r.Recorder.Eventf(dataset, eventType, eventReason, "Mountpoint %s has been mounted %v times, unmount duplicate mountpoint to avoid large /proc/self/mountinfo file, this may potential make data access connection broken", point.MountPath, point.Count)
}
}

func (r *FuseRecover) shouldRecover(mountPath string) (should bool, err error) {
mounter := mount.New("")
notMount, err := mounter.IsLikelyNotMountPoint(mountPath)
if os.IsNotExist(err) || (err == nil && notMount) {
// Perhaps the mountPath has been cleaned up in other goroutine
return false, nil
}
if err != nil && !mount.IsCorruptedMnt(err) {
// unexpected error
return false, err
}

return true, nil
}

func (r *FuseRecover) doRecover(point mountinfo.MountPoint) {
if lock := r.locks.TryAcquire(point.MountPath); !lock {
glog.V(4).Infof("FuseRecovery: fail to acquire lock on path %s, skip recovering it", point.MountPath)
return
}
defer r.locks.Release(point.MountPath)

should, err := r.shouldRecover(point.MountPath)
if err != nil {
glog.Warningf("FuseRecovery: found path %s which is unable to recover due to error %v, skip it", point.MountPath, err)
return
}

if !should {
glog.V(3).Infof("FuseRecovery: path %s has already been cleaned up, skip recovering it", point.MountPath)
return
}

glog.V(3).Infof("FuseRecovery: recovering broken mount point: %v", point)
// if app container restart, umount duplicate mount may lead to recover successed but can not access data
// so we only umountDuplicate when it has mounted more than the recoverWarningThreshold
// please refer to https://github.com/fluid-cloudnative/fluid/issues/3399 for more information
if point.Count > r.recoverWarningThreshold {
glog.Warningf("FuseRecovery: Mountpoint %s has been mounted %v times, exceeding the recoveryWarningThreshold %v, unmount duplicate mountpoint to avoid large /proc/self/mountinfo file, this may potentially make data access connection broken", point.MountPath, point.Count, r.recoverWarningThreshold)
r.eventRecord(point, corev1.EventTypeWarning, common.FuseUmountDuplicate)
r.umountDuplicate(point)
}
if err := r.recoverBrokenMount(point); err != nil {
r.eventRecord(point, corev1.EventTypeWarning, common.FuseRecoverFailed)
return
}
r.eventRecord(point, corev1.EventTypeNormal, common.FuseRecoverSucceed)
}
Loading

0 comments on commit 7b01a4c

Please sign in to comment.