Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick to 0.9: fix csi plugin concurrency issue on FuseRecovery and NodeUnpublishVolume (#3448) #3453

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions cmd/csi/app/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/csi"
"github.com/fluid-cloudnative/fluid/pkg/csi/config"
"github.com/fluid-cloudnative/fluid/pkg/utils"
utilfeature "github.com/fluid-cloudnative/fluid/pkg/utils/feature"
"github.com/golang/glog"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -110,15 +111,17 @@ func handle() {
panic(fmt.Sprintf("csi: unable to create controller manager due to error %v", err))
}

config := config.Config{
NodeId: nodeID,
Endpoint: endpoint,
PruneFs: pruneFs,
PrunePath: prunePath,
KubeletConfigPath: kubeletKubeConfigPath,
runningContext := config.RunningContext{
Config: config.Config{
NodeId: nodeID,
Endpoint: endpoint,
PruneFs: pruneFs,
PrunePath: prunePath,
KubeletConfigPath: kubeletKubeConfigPath,
},
VolumeLocks: utils.NewVolumeLocks(),
}

if err = csi.SetupWithManager(mgr, config); err != nil {
if err = csi.SetupWithManager(mgr, runningContext); err != nil {
panic(fmt.Sprintf("unable to set up manager due to error %v", err))
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/csi/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ limitations under the License.

package config

import "github.com/fluid-cloudnative/fluid/pkg/utils"

type RunningContext struct {
Config
VolumeLocks *utils.VolumeLocks
}

type Config struct {
NodeId string
Endpoint string
Expand Down
7 changes: 5 additions & 2 deletions pkg/csi/plugins/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ type driver struct {
nodeAuthorizedClient *kubernetes.Clientset
csiDriver *csicommon.CSIDriver
nodeId, endpoint string

locks *utils.VolumeLocks
}

var _ manager.Runnable = &driver{}

func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.Reader, nodeAuthorizedClient *kubernetes.Clientset) *driver {
func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.Reader, nodeAuthorizedClient *kubernetes.Clientset, locks *utils.VolumeLocks) *driver {
glog.Infof("Driver: %v version: %v", driverName, version)

proto, addr := utils.SplitSchemaAddr(endpoint)
Expand Down Expand Up @@ -76,6 +78,7 @@ func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.R
client: client,
nodeAuthorizedClient: nodeAuthorizedClient,
apiReader: apiReader,
locks: locks,
}
}

Expand All @@ -92,7 +95,7 @@ func (d *driver) newNodeServer() *nodeServer {
client: d.client,
apiReader: d.apiReader,
nodeAuthorizedClient: d.nodeAuthorizedClient,
locks: utils.NewVolumeLocks(),
locks: d.locks,
}
}

Expand Down
72 changes: 38 additions & 34 deletions pkg/csi/plugins/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
if err := os.MkdirAll(targetPath, 0750); err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else {
glog.Infof("MkdirAll successful. %v", targetPath)
glog.Infof("NodePublishVolume: MkdirAll successful on %v", targetPath)
}
//isMount = true
} else {
Expand All @@ -91,21 +91,20 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}

if isMount {
glog.Infof("It's already mounted to %v", targetPath)
glog.Infof("NodePublishVolume: already mounted to %v, do nothing", targetPath)
return &csi.NodePublishVolumeResponse{}, nil
} else {
glog.Infof("Try to mount to %v", targetPath)
}

glog.Infof("NodePublishVolume: start mounting staging path to %v", targetPath)
// 0. check if read only
readOnly := false
if req.GetVolumeCapability() == nil {
glog.Infoln("Volume Capability is nil")
glog.Infoln("NodePublishVolume: found volume capability is nil")
} else {
mode := req.GetVolumeCapability().GetAccessMode().GetMode()
if mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY {
readOnly = true
glog.Infof("Set the mount option readonly=%v", readOnly)
glog.Infof("NodePublishVolume: set the mount option readonly=%v", readOnly)
}
}

Expand Down Expand Up @@ -155,7 +154,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
command := exec.Command("mount", args...)

glog.V(4).Infoln(command)
glog.V(3).Infof("NodePublishVolume: exec command %v", command)
stdoutStderr, err := command.CombinedOutput()
glog.V(4).Infoln(string(stdoutStderr))
if err != nil {
Expand All @@ -167,65 +166,70 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
return nil, status.Error(codes.Internal, err.Error())
} else {
glog.V(4).Infof("Succeed in binding %s to %s", mountPath, targetPath)
glog.V(3).Infof("NodePublishVolume: succeed in binding %s to %s", mountPath, targetPath)
}

return &csi.NodePublishVolumeResponse{}, nil
}

// NodeUnpublishVolume umounts every mounted file systems on the given req.GetTargetPath() until it's cleaned up.
// If anything unexpected happened during the umount process, it returns error and wait for retries.
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
targetPath := req.GetTargetPath()
// check targetpath validity
if len(targetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume operation requires targetPath but is not provided")
}

// The lock is to avoid race condition
// The lock is to avoid race condition, make sure only one goroutine(including the FUSE Recovery goroutine) is handling the targetPath
if lock := ns.locks.TryAcquire(targetPath); !lock {
return nil, status.Errorf(codes.Aborted, "NodeUnpublishVolume operation on targetPath %s already exists", targetPath)
}
defer ns.locks.Release(targetPath)

// check path existence
_, err := os.Stat(targetPath)
// No need to unmount non-existing targetPath
if os.IsNotExist(err) {
glog.V(3).Infof("NodeUnpublishVolume: targetPath %s has been cleaned up, so it doesn't need to be unmounted", targetPath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
if err != nil {
return nil, errors.Wrapf(err, "NodeUnpublishVolume: stat targetPath %s error %v", targetPath, err)
}

// targetPath may be mount bind many times when mount point recovered.
// targetPath may be bind mount many times when mount point recovered.
// umount until it's not mounted.
mounter := mount.New("")
for {
notMount, err := mounter.IsLikelyNotMountPoint(targetPath)
if os.IsNotExist(err) {
glog.V(3).Infof("NodeUnpublishVolume: targetPath %s has been cleaned up, so it doesn't need to be unmounted", targetPath)
break
}
if err != nil {
glog.V(3).Infoln(err)
if corrupted := mount.IsCorruptedMnt(err); !corrupted {
return nil, errors.Wrapf(err, "NodeUnpublishVolume: stat targetPath %s error %v", targetPath, err)
if !mount.IsCorruptedMnt(err) {
// stat targetPath with unexpected error
glog.Errorf("NodeUnpublishVolume: stat targetPath %s with error: %v", targetPath, err)
return nil, status.Errorf(codes.Internal, "NodeUnpublishVolume: stat targetPath %s: %v", targetPath, err)
} else {
// targetPath is corrupted
glog.V(3).Infof("NodeUnpublishVolume: detected corrupted mountpoint on path %s with error %v", targetPath, err)
}
}

if notMount {
glog.V(3).Infof("umount:%s success", targetPath)
glog.V(3).Infof("NodeUnpublishVolume: umount %s success", targetPath)
break
}

glog.V(3).Infof("umount:%s", targetPath)
glog.V(3).Infof("NodeUnpublishVolume: exec umount %s", targetPath)
err = mounter.Unmount(targetPath)
if os.IsNotExist(err) {
glog.V(3).Infof("NodeUnpublishVolume: targetPath %s has been cleaned up when umounting it", targetPath)
break
}
if err != nil {
glog.V(3).Infoln(err)
return nil, errors.Wrapf(err, "NodeUnpublishVolume: umount targetPath %s error %v", targetPath, err)
glog.Errorf("NodeUnpublishVolume: umount targetPath %s with error: %v", targetPath, err)
return nil, status.Errorf(codes.Internal, "NodeUnpublishVolume: umount targetPath %s: %v", targetPath, err)
}
}

err = mount.CleanupMountPoint(req.GetTargetPath(), mount.New(""), false)
err := mount.CleanupMountPoint(targetPath, mounter, false)
if err != nil {
glog.V(3).Infoln(err)
glog.Errorf("NodeUnpublishVolume: failed when cleanupMountPoint on path %s: %v", targetPath, err)
return nil, status.Errorf(codes.Internal, "NodeUnpublishVolume: failed when cleanupMountPoint on path %s: %v", targetPath, err)
} else {
glog.V(4).Infof("Succeed in umounting %s", targetPath)
glog.V(4).Infof("NodeUnpublishVolume: succeed in umounting %s", targetPath)
}

return &csi.NodeUnpublishVolumeResponse{}, nil
Expand Down Expand Up @@ -271,14 +275,14 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag

var shouldCleanFuse bool
cleanPolicy := runtimeInfo.GetFuseCleanPolicy()
glog.Infof("Using %s clean policy for runtime %s in namespace %s", cleanPolicy, runtimeInfo.GetName(), runtimeInfo.GetNamespace())
glog.Infof("NodeUnstageVolume: Using %s clean policy for runtime %s in namespace %s", cleanPolicy, runtimeInfo.GetName(), runtimeInfo.GetNamespace())
switch cleanPolicy {
case v1alpha1.OnDemandCleanPolicy:
shouldCleanFuse = true
case v1alpha1.OnRuntimeDeletedCleanPolicy:
shouldCleanFuse = false
default:
return nil, errors.Errorf("Unknown Fuse clean policy: %s", cleanPolicy)
return nil, errors.Errorf("NodeUnstageVolume: unknown Fuse clean policy: %s", cleanPolicy)
}

if !shouldCleanFuse {
Expand Down Expand Up @@ -343,7 +347,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
// 2. clean up broken mount point
fluidPath := req.GetVolumeContext()[common.VolumeAttrFluidPath]
if ignoredErr := cleanUpBrokenMountPoint(fluidPath); ignoredErr != nil {
glog.Warningf("Ignoring error when cleaning up broken mount point %v: %v", fluidPath, ignoredErr)
glog.Warningf("NodeStageVolume: Ignoring error when cleaning up broken mount point %v: %v", fluidPath, ignoredErr)
}

// 3. get runtime namespace and name
Expand Down
6 changes: 3 additions & 3 deletions pkg/csi/plugins/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
)

// Register initializes the csi driver and registers it to the controller manager.
func Register(mgr manager.Manager, cfg config.Config) error {
client, err := kubelet.InitNodeAuthorizedClient(cfg.KubeletConfigPath)
func Register(mgr manager.Manager, ctx config.RunningContext) error {
client, err := kubelet.InitNodeAuthorizedClient(ctx.KubeletConfigPath)
if err != nil {
return err
}

csiDriver := NewDriver(cfg.NodeId, cfg.Endpoint, mgr.GetClient(), mgr.GetAPIReader(), client)
csiDriver := NewDriver(ctx.NodeId, ctx.Endpoint, mgr.GetClient(), mgr.GetAPIReader(), client, ctx.VolumeLocks)

if err := mgr.Add(csiDriver); err != nil {
return err
Expand Down
80 changes: 59 additions & 21 deletions pkg/csi/recover/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type FuseRecover struct {

recoverFusePeriod time.Duration
recoverWarningThreshold int

locks *utils.VolumeLocks
}

func initializeKubeletClient() (*kubelet.KubeletClient, error) {
Expand Down Expand Up @@ -102,7 +104,7 @@ func initializeKubeletClient() (*kubelet.KubeletClient, error) {
return kubeletClient, nil
}

func NewFuseRecover(kubeClient client.Client, recorder record.EventRecorder, apiReader client.Reader) (*FuseRecover, error) {
func NewFuseRecover(kubeClient client.Client, recorder record.EventRecorder, apiReader client.Reader, locks *utils.VolumeLocks) (*FuseRecover, error) {
glog.V(3).Infoln("start csi recover")
mountRoot, err := utils.GetMountRoot()
if err != nil {
Expand All @@ -129,6 +131,7 @@ func NewFuseRecover(kubeClient client.Client, recorder record.EventRecorder, api
Recorder: recorder,
recoverFusePeriod: recoverFusePeriod,
recoverWarningThreshold: recoverWarningThreshold,
locks: locks,
}, nil
}

Expand All @@ -151,42 +154,28 @@ func (r *FuseRecover) runOnce() {
r.recover()
}

func (r FuseRecover) recover() {
func (r *FuseRecover) recover() {
brokenMounts, err := mountinfo.GetBrokenMountPoints()
if err != nil {
glog.Error(err)
return
}

for _, point := range brokenMounts {
glog.V(4).Infof("Get broken mount point: %v", point)
// if app container restart, umount duplicate mount may lead to recover successed but can not access data
// so we only umountDuplicate when it has mounted more than the recoverWarningThreshold
// please refer to https://github.com/fluid-cloudnative/fluid/issues/3399 for more information
if point.Count > r.recoverWarningThreshold {
glog.Warningf("Mountpoint %s has been mounted %v times, exceeding the recoveryWarningThreshold %v, unmount duplicate mountpoint to avoid large /proc/self/mountinfo file, this may potential make data access connection broken", point.MountPath, point.Count, r.recoverWarningThreshold)
r.eventRecord(point, corev1.EventTypeWarning, common.FuseUmountDuplicate)
r.umountDuplicate(point)
}
if err := r.recoverBrokenMount(point); err != nil {
r.eventRecord(point, corev1.EventTypeWarning, common.FuseRecoverFailed)
continue
}
r.eventRecord(point, corev1.EventTypeNormal, common.FuseRecoverSucceed)
r.doRecover(point)
}
}

func (r *FuseRecover) recoverBrokenMount(point mountinfo.MountPoint) (err error) {
glog.V(3).Infof("Start recovery: [%s], source path: [%s]", point.MountPath, point.SourcePath)
// recovery for each bind mount path
mountOption := []string{"bind"}
if point.ReadOnly {
mountOption = append(mountOption, "ro")
}

glog.V(3).Infof("Start exec cmd: mount %s %s -o %v \n", point.SourcePath, point.MountPath, mountOption)
glog.V(3).Infof("FuseRecovery: Start exec cmd: mount %s %s -o %v \n", point.SourcePath, point.MountPath, mountOption)
if err := r.Mount(point.SourcePath, point.MountPath, "none", mountOption); err != nil {
glog.Errorf("exec cmd: mount -o bind %s %s err :%v", point.SourcePath, point.MountPath, err)
glog.Errorf("FuseRecovery: exec cmd: mount -o bind %s %s with err :%v", point.SourcePath, point.MountPath, err)
}
return
}
Expand All @@ -196,9 +185,9 @@ func (r *FuseRecover) recoverBrokenMount(point mountinfo.MountPoint) (err error)
// don't umount all item, 'mountPropagation' will lose efficacy.
func (r *FuseRecover) umountDuplicate(point mountinfo.MountPoint) {
for i := point.Count; i > 1; i-- {
glog.V(3).Infof("count: %d, start exec cmd: umount %s", i, point.MountPath)
glog.V(3).Infof("FuseRecovery: count: %d, start exec cmd: umount %s", i, point.MountPath)
if err := r.Unmount(point.MountPath); err != nil {
glog.Errorf("exec cmd: umount %s err: %v", point.MountPath, err)
glog.Errorf("FuseRecovery: exec cmd: umount %s with err: %v", point.MountPath, err)
}
}
}
Expand Down Expand Up @@ -231,3 +220,52 @@ func (r *FuseRecover) eventRecord(point mountinfo.MountPoint, eventType, eventRe
r.Recorder.Eventf(dataset, eventType, eventReason, "Mountpoint %s has been mounted %v times, unmount duplicate mountpoint to avoid large /proc/self/mountinfo file, this may potential make data access connection broken", point.MountPath, point.Count)
}
}

func (r *FuseRecover) shouldRecover(mountPath string) (should bool, err error) {
mounter := mount.New("")
notMount, err := mounter.IsLikelyNotMountPoint(mountPath)
if os.IsNotExist(err) || (err == nil && notMount) {
// Perhaps the mountPath has been cleaned up in other goroutine
return false, nil
}
if err != nil && !mount.IsCorruptedMnt(err) {
// unexpected error
return false, err
}

return true, nil
}

func (r *FuseRecover) doRecover(point mountinfo.MountPoint) {
if lock := r.locks.TryAcquire(point.MountPath); !lock {
glog.V(4).Infof("FuseRecovery: fail to acquire lock on path %s, skip recovering it", point.MountPath)
return
}
defer r.locks.Release(point.MountPath)

should, err := r.shouldRecover(point.MountPath)
if err != nil {
glog.Warningf("FuseRecovery: found path %s which is unable to recover due to error %v, skip it", point.MountPath, err)
return
}

if !should {
glog.V(3).Infof("FuseRecovery: path %s has already been cleaned up, skip recovering it", point.MountPath)
return
}

glog.V(3).Infof("FuseRecovery: recovering broken mount point: %v", point)
// if app container restart, umount duplicate mount may lead to recover successed but can not access data
// so we only umountDuplicate when it has mounted more than the recoverWarningThreshold
// please refer to https://github.com/fluid-cloudnative/fluid/issues/3399 for more information
if point.Count > r.recoverWarningThreshold {
glog.Warningf("FuseRecovery: Mountpoint %s has been mounted %v times, exceeding the recoveryWarningThreshold %v, unmount duplicate mountpoint to avoid large /proc/self/mountinfo file, this may potentially make data access connection broken", point.MountPath, point.Count, r.recoverWarningThreshold)
r.eventRecord(point, corev1.EventTypeWarning, common.FuseUmountDuplicate)
r.umountDuplicate(point)
}
if err := r.recoverBrokenMount(point); err != nil {
r.eventRecord(point, corev1.EventTypeWarning, common.FuseRecoverFailed)
return
}
r.eventRecord(point, corev1.EventTypeNormal, common.FuseRecoverSucceed)
}
Loading
Loading