Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: fix csi plugin concurrency issue on FuseRecovery and NodeUnpublishVolume #3448

Merged
19 changes: 11 additions & 8 deletions cmd/csi/app/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/csi"
"github.com/fluid-cloudnative/fluid/pkg/csi/config"
"github.com/fluid-cloudnative/fluid/pkg/utils"
utilfeature "github.com/fluid-cloudnative/fluid/pkg/utils/feature"
"github.com/golang/glog"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -110,15 +111,17 @@ func handle() {
panic(fmt.Sprintf("csi: unable to create controller manager due to error %v", err))
}

config := config.Config{
NodeId: nodeID,
Endpoint: endpoint,
PruneFs: pruneFs,
PrunePath: prunePath,
KubeletConfigPath: kubeletKubeConfigPath,
runningContext := config.RunningContext{
Config: config.Config{
NodeId: nodeID,
Endpoint: endpoint,
PruneFs: pruneFs,
PrunePath: prunePath,
KubeletConfigPath: kubeletKubeConfigPath,
},
VolumeLocks: utils.NewVolumeLocks(),
}

if err = csi.SetupWithManager(mgr, config); err != nil {
if err = csi.SetupWithManager(mgr, runningContext); err != nil {
panic(fmt.Sprintf("unable to set up manager due to error %v", err))
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/csi/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ limitations under the License.

package config

import "github.com/fluid-cloudnative/fluid/pkg/utils"

type RunningContext struct {
Config
VolumeLocks *utils.VolumeLocks
}

type Config struct {
NodeId string
Endpoint string
Expand Down
7 changes: 5 additions & 2 deletions pkg/csi/plugins/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ type driver struct {
nodeAuthorizedClient *kubernetes.Clientset
csiDriver *csicommon.CSIDriver
nodeId, endpoint string

locks *utils.VolumeLocks
}

var _ manager.Runnable = &driver{}

func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.Reader, nodeAuthorizedClient *kubernetes.Clientset) *driver {
func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.Reader, nodeAuthorizedClient *kubernetes.Clientset, locks *utils.VolumeLocks) *driver {
glog.Infof("Driver: %v version: %v", driverName, version)

proto, addr := utils.SplitSchemaAddr(endpoint)
Expand Down Expand Up @@ -76,6 +78,7 @@ func NewDriver(nodeID, endpoint string, client client.Client, apiReader client.R
client: client,
nodeAuthorizedClient: nodeAuthorizedClient,
apiReader: apiReader,
locks: locks,
}
}

Expand All @@ -92,7 +95,7 @@ func (d *driver) newNodeServer() *nodeServer {
client: d.client,
apiReader: d.apiReader,
nodeAuthorizedClient: d.nodeAuthorizedClient,
locks: utils.NewVolumeLocks(),
locks: d.locks,
}
}

Expand Down
73 changes: 38 additions & 35 deletions pkg/csi/plugins/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
if err := os.MkdirAll(targetPath, 0750); err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else {
glog.Infof("MkdirAll successful. %v", targetPath)
glog.Infof("NodePublishVolume: MkdirAll successful on %v", targetPath)
}
//isMount = true
} else {
Expand All @@ -91,21 +91,20 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}

if isMount {
glog.Infof("It's already mounted to %v", targetPath)
glog.Infof("NodePublishVolume: already mounted to %v, do nothing", targetPath)
return &csi.NodePublishVolumeResponse{}, nil
} else {
glog.Infof("Try to mount to %v", targetPath)
}

glog.Infof("NodePublishVolume: start mounting staging path to %v", targetPath)
// 0. check if read only
readOnly := false
if req.GetVolumeCapability() == nil {
glog.Infoln("Volume Capability is nil")
glog.Infoln("NodePublishVolume: found volume capability is nil")
} else {
mode := req.GetVolumeCapability().GetAccessMode().GetMode()
if mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY {
readOnly = true
glog.Infof("Set the mount option readonly=%v", readOnly)
glog.Infof("NodePublishVolume: set the mount option readonly=%v", readOnly)
}
}

Expand Down Expand Up @@ -155,7 +154,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
command := exec.Command("mount", args...)

glog.V(4).Infoln(command)
glog.V(3).Infof("NodePublishVolume: exec command %v", command)
stdoutStderr, err := command.CombinedOutput()
glog.V(4).Infoln(string(stdoutStderr))
if err != nil {
Expand All @@ -167,66 +166,70 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
return nil, status.Error(codes.Internal, err.Error())
} else {
glog.V(4).Infof("Succeed in binding %s to %s", mountPath, targetPath)
glog.V(3).Infof("NodePublishVolume: succeed in binding %s to %s", mountPath, targetPath)
}

return &csi.NodePublishVolumeResponse{}, nil
}

// NodeUnpublishVolume umounts every mounted file systems on the given req.GetTargetPath() until it's cleaned up.
// If anything unexpected happened during the umount process, it returns error and wait for retries.
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
targetPath := req.GetTargetPath()
// check targetpath validity
if len(targetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume operation requires targetPath but is not provided")
}

// The lock is to avoid race condition
// The lock is to avoid race condition, make sure only one goroutine(including the FUSE Recovery goroutine) is handling the targetPath
if lock := ns.locks.TryAcquire(targetPath); !lock {
return nil, status.Errorf(codes.Aborted, "NodeUnpublishVolume operation on targetPath %s already exists", targetPath)
}
defer ns.locks.Release(targetPath)

// check path existence
_, err := os.Stat(targetPath)
// No need to unmount non-existing targetPath
if os.IsNotExist(err) {
glog.V(3).Infof("NodeUnpublishVolume: targetPath %s has been cleaned up, so it doesn't need to be unmounted", targetPath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
if err != nil && !errors.Is(err, syscall.ENOTCONN) {
// if error is "transport endpoint is not connected", ingore and do umount
return nil, errors.Wrapf(err, "NodeUnpublishVolume: stat targetPath %s error %v", targetPath, err)
}

// targetPath may be mount bind many times when mount point recovered.
// targetPath may be bind mount many times when mount point recovered.
// umount until it's not mounted.
mounter := mount.New("")
for {
notMount, err := mounter.IsLikelyNotMountPoint(targetPath)
if os.IsNotExist(err) {
glog.V(3).Infof("NodeUnpublishVolume: targetPath %s has been cleaned up, so it doesn't need to be unmounted", targetPath)
break
}
if err != nil {
glog.V(3).Infoln(err)
if corrupted := mount.IsCorruptedMnt(err); !corrupted {
return nil, errors.Wrapf(err, "NodeUnpublishVolume: stat targetPath %s error %v", targetPath, err)
if !mount.IsCorruptedMnt(err) {
// stat targetPath with unexpected error
glog.Errorf("NodeUnpublishVolume: stat targetPath %s with error: %v", targetPath, err)
return nil, status.Errorf(codes.Internal, "NodeUnpublishVolume: stat targetPath %s: %v", targetPath, err)
} else {
// targetPath is corrupted
glog.V(3).Infof("NodeUnpublishVolume: detected corrupted mountpoint on path %s with error %v", targetPath, err)
}
}

if notMount {
glog.V(3).Infof("umount:%s success", targetPath)
glog.V(3).Infof("NodeUnpublishVolume: umount %s success", targetPath)
break
}

glog.V(3).Infof("umount:%s", targetPath)
glog.V(3).Infof("NodeUnpublishVolume: exec umount %s", targetPath)
err = mounter.Unmount(targetPath)
if os.IsNotExist(err) {
glog.V(3).Infof("NodeUnpublishVolume: targetPath %s has been cleaned up when umounting it", targetPath)
break
}
if err != nil {
glog.V(3).Infoln(err)
return nil, errors.Wrapf(err, "NodeUnpublishVolume: umount targetPath %s error %v", targetPath, err)
glog.Errorf("NodeUnpublishVolume: umount targetPath %s with error: %v", targetPath, err)
return nil, status.Errorf(codes.Internal, "NodeUnpublishVolume: umount targetPath %s: %v", targetPath, err)
}
}

err = mount.CleanupMountPoint(req.GetTargetPath(), mount.New(""), false)
err := mount.CleanupMountPoint(targetPath, mounter, false)
if err != nil {
glog.V(3).Infoln(err)
glog.Errorf("NodeUnpublishVolume: failed when cleanupMountPoint on path %s: %v", targetPath, err)
return nil, status.Errorf(codes.Internal, "NodeUnpublishVolume: failed when cleanupMountPoint on path %s: %v", targetPath, err)
} else {
glog.V(4).Infof("Succeed in umounting %s", targetPath)
glog.V(4).Infof("NodeUnpublishVolume: succeed in umounting %s", targetPath)
}

return &csi.NodeUnpublishVolumeResponse{}, nil
Expand Down Expand Up @@ -272,14 +275,14 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag

var shouldCleanFuse bool
cleanPolicy := runtimeInfo.GetFuseCleanPolicy()
glog.Infof("Using %s clean policy for runtime %s in namespace %s", cleanPolicy, runtimeInfo.GetName(), runtimeInfo.GetNamespace())
glog.Infof("NodeUnstageVolume: Using %s clean policy for runtime %s in namespace %s", cleanPolicy, runtimeInfo.GetName(), runtimeInfo.GetNamespace())
switch cleanPolicy {
case v1alpha1.OnDemandCleanPolicy:
shouldCleanFuse = true
case v1alpha1.OnRuntimeDeletedCleanPolicy:
shouldCleanFuse = false
default:
return nil, errors.Errorf("Unknown Fuse clean policy: %s", cleanPolicy)
return nil, errors.Errorf("NodeUnstageVolume: unknown Fuse clean policy: %s", cleanPolicy)
}

if !shouldCleanFuse {
Expand Down Expand Up @@ -344,7 +347,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
// 2. clean up broken mount point
fluidPath := req.GetVolumeContext()[common.VolumeAttrFluidPath]
if ignoredErr := cleanUpBrokenMountPoint(fluidPath); ignoredErr != nil {
glog.Warningf("Ignoring error when cleaning up broken mount point %v: %v", fluidPath, ignoredErr)
glog.Warningf("NodeStageVolume: Ignoring error when cleaning up broken mount point %v: %v", fluidPath, ignoredErr)
}

// 3. get runtime namespace and name
Expand Down
6 changes: 3 additions & 3 deletions pkg/csi/plugins/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
)

// Register initializes the csi driver and registers it to the controller manager.
func Register(mgr manager.Manager, cfg config.Config) error {
client, err := kubelet.InitNodeAuthorizedClient(cfg.KubeletConfigPath)
func Register(mgr manager.Manager, ctx config.RunningContext) error {
client, err := kubelet.InitNodeAuthorizedClient(ctx.KubeletConfigPath)
if err != nil {
return err
}

csiDriver := NewDriver(cfg.NodeId, cfg.Endpoint, mgr.GetClient(), mgr.GetAPIReader(), client)
csiDriver := NewDriver(ctx.NodeId, ctx.Endpoint, mgr.GetClient(), mgr.GetAPIReader(), client, ctx.VolumeLocks)

if err := mgr.Add(csiDriver); err != nil {
return err
Expand Down
Loading
Loading