Skip to content

Commit

Permalink
Modify CephFs provisioner to use the ceph mgr commands
Browse files Browse the repository at this point in the history
Currently CephFs provisioner mounts the ceph filesystem
and creates a subdirectory as a part of provisioning the
volume. Ceph now supports commands to provision fs subvolumes,
hance modify the provisioner to use ceph mgr commands to
(de)provision fs subvolumes.

Signed-off-by: Poornima G <pgurusid@redhat.com>
  • Loading branch information
Poornima G committed Jun 8, 2019
1 parent 0e61098 commit 7e9f509
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 125 deletions.
8 changes: 4 additions & 4 deletions pkg/cephfs/controllerserver.go
Expand Up @@ -53,13 +53,13 @@ func (cs *ControllerServer) createBackingVolume(volOptions *volumeOptions, vID *
return status.Error(codes.InvalidArgument, err.Error())
}

if err = createVolume(volOptions, cr, volumeID(vID.FsSubvolName), volOptions.Size); err != nil {
if err = createVolume(volOptions, volumeID(vID.FsSubvolName), volOptions.Size); err != nil {
klog.Errorf("failed to create volume %s: %v", volOptions.RequestName, err)
return status.Error(codes.Internal, err.Error())
}
defer func() {
if err != nil {
if errDefer := purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); errDefer != nil {
if errDefer := purgeVolume(volumeID(vID.FsSubvolName), volOptions); errDefer != nil {
klog.Warningf("failed purging volume: %s (%s)", volOptions.RequestName, errDefer)
}
}
Expand Down Expand Up @@ -184,7 +184,7 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest)
mtxControllerVolumeID.LockKey(string(volID))
defer mustUnlock(mtxControllerVolumeID, string(volID))

if err = purgeVolume(volID, cr, &ce.VolOptions); err != nil {
if err = purgeVolume(volID, &ce.VolOptions); err != nil {
klog.Errorf("failed to delete volume %s: %v", volID, err)
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down Expand Up @@ -243,7 +243,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
mtxControllerVolumeName.LockKey(volOptions.RequestName)
defer mustUnlock(mtxControllerVolumeName, volOptions.RequestName)

if err = purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); err != nil {
if err = purgeVolume(volumeID(vID.FsSubvolName), volOptions); err != nil {
klog.Errorf("failed to delete volume %s: %v", volID, err)
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/cephfs/util.go
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"os/exec"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -94,11 +93,6 @@ func isMountPoint(p string) (bool, error) {
return !notMnt, nil
}

func pathExists(p string) bool {
_, err := os.Stat(p)
return err == nil
}

// Controller service request validation
func (cs *ControllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
Expand Down
136 changes: 21 additions & 115 deletions pkg/cephfs/volume.go
Expand Up @@ -17,27 +17,17 @@ limitations under the License.
package cephfs

import (
"fmt"
"os"
"path"

"k8s.io/klog"
"github.com/ceph/ceph-csi/pkg/util"
)

const (
cephVolumesRoot = "csi-volumes"

namespacePrefix = "ns-"
namespacePrefix = "fsvolumens-"
)

func getCephRootPathLocal(volID volumeID) string {
return fmt.Sprintf("%s/controller/volumes/root-%s", PluginFolder, string(volID))
}

func getCephRootVolumePathLocal(volID volumeID) string {
return path.Join(getCephRootPathLocal(volID), cephVolumesRoot, string(volID))
}

func getVolumeRootPathCeph(volID volumeID) string {
return path.Join("/", cephVolumesRoot, string(volID))
}
Expand All @@ -46,111 +36,27 @@ func getVolumeNamespace(volID volumeID) string {
return namespacePrefix + string(volID)
}

func setVolumeAttribute(root, attrName, attrValue string) error {
return execCommandErr("setfattr", "-n", attrName, "-v", attrValue, root)
}

func createVolume(volOptions *volumeOptions, adminCr *credentials, volID volumeID, bytesQuota int64) error {
if err := mountCephRoot(volID, volOptions, adminCr); err != nil {
return err
}
defer unmountCephRoot(volID)

var (
volRoot = getCephRootVolumePathLocal(volID)
volRootCreating = volRoot + "-creating"
)

if pathExists(volRoot) {
klog.V(4).Infof("cephfs: volume %s already exists, skipping creation", volID)
return nil
}

if err := createMountPoint(volRootCreating); err != nil {
return err
}

if bytesQuota > 0 {
if err := setVolumeAttribute(volRootCreating, "ceph.quota.max_bytes", fmt.Sprintf("%d", bytesQuota)); err != nil {
return err
}
}

if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool", volOptions.Pool); err != nil {
return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool)
}

if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volID)); err != nil {
return err
}

if err := os.Rename(volRootCreating, volRoot); err != nil {
return fmt.Errorf("couldn't mark volume %s as created: %v", volID, err)
}

return nil
}

func purgeVolume(volID volumeID, adminCr *credentials, volOptions *volumeOptions) error {
if err := mountCephRoot(volID, volOptions, adminCr); err != nil {
return err
}
defer unmountCephRoot(volID)

var (
volRoot = getCephRootVolumePathLocal(volID)
volRootDeleting = volRoot + "-deleting"
)

if pathExists(volRoot) {
if err := os.Rename(volRoot, volRootDeleting); err != nil {
return fmt.Errorf("couldn't mark volume %s for deletion: %v", volID, err)
}
} else {
if !pathExists(volRootDeleting) {
klog.V(4).Infof("cephfs: volume %s not found, assuming it to be already deleted", volID)
return nil
}
}

if err := os.RemoveAll(volRootDeleting); err != nil {
return fmt.Errorf("failed to delete volume %s: %v", volID, err)
}

return nil
}

func mountCephRoot(volID volumeID, volOptions *volumeOptions, adminCr *credentials) error {
cephRoot := getCephRootPathLocal(volID)

// Root path is not set for dynamically provisioned volumes
// Access to cephfs's / is required
volOptions.RootPath = "/"

if err := createMountPoint(cephRoot); err != nil {
return err
}

m, err := newMounter(volOptions)
if err != nil {
return fmt.Errorf("failed to create mounter: %v", err)
}

if err = m.mount(cephRoot, adminCr, volOptions); err != nil {
return fmt.Errorf("error mounting ceph root: %v", err)
}
func createVolume(volOptions *volumeOptions, volID volumeID, bytesQuota int64) error {
_, _, err := util.ExecCommand(
"ceph",
"fs",
"subvolume",
"create",
volOptions.FsName,
string(volID),
string(bytesQuota))

return nil
return err
}

func unmountCephRoot(volID volumeID) {
cephRoot := getCephRootPathLocal(volID)
func purgeVolume(volID volumeID, volOptions *volumeOptions) error {
_, _, err := util.ExecCommand(
"ceph",
"fs",
"subvolume",
"rm",
volOptions.FsName,
string(volID))

if err := unmountVolume(cephRoot); err != nil {
klog.Errorf("failed to unmount %s with error %s", cephRoot, err)
} else {
if err := os.Remove(cephRoot); err != nil {
klog.Errorf("failed to remove %s with error %s", cephRoot, err)
}
}
return err
}

0 comments on commit 7e9f509

Please sign in to comment.