Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify CephFs provisioner to use the ceph mgr commands #400

Merged
merged 3 commits into from Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions e2e/deploy-rook.go
Expand Up @@ -69,8 +69,6 @@ func deployOperator(c kubernetes.Interface) {

_, err := framework.RunKubectl("create", "-f", opPath)
Expect(err).Should(BeNil())
err = waitForDaemonSets("rook-ceph-agent", rookNS, c, deployTimeout)
Expect(err).Should(BeNil())
err = waitForDaemonSets("rook-discover", rookNS, c, deployTimeout)
Expect(err).Should(BeNil())
err = waitForDeploymentComplete("rook-ceph-operator", rookNS, c, deployTimeout)
Expand All @@ -80,10 +78,12 @@ func deployOperator(c kubernetes.Interface) {
func deployCluster(c kubernetes.Interface) {
opPath := fmt.Sprintf("%s/%s", rookURL, "cluster-test.yaml")
framework.RunKubectlOrDie("create", "-f", opPath)
err := waitForDaemonSets("rook-ceph-agent", rookNS, c, deployTimeout)
Expect(err).Should(BeNil())
opt := &metav1.ListOptions{
LabelSelector: "app=rook-ceph-mon",
}
err := checkCephPods(rookNS, c, 1, deployTimeout, opt)
err = checkCephPods(rookNS, c, 1, deployTimeout, opt)
Expect(err).Should(BeNil())
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/cephfs/cephuser.go
Expand Up @@ -82,6 +82,10 @@ func getCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID vol

func createCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) (*cephEntity, error) {
adminID, userID := genUserIDs(adminCr, volID)
volRootPath, err := getVolumeRootPathCeph(volOptions, adminCr, volID)
if err != nil {
return nil, err
}

return getSingleCephEntity(
"-m", volOptions.Monitors,
Expand All @@ -91,7 +95,7 @@ func createCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID
"-f", "json",
"auth", "get-or-create", userID,
// User capabilities
"mds", fmt.Sprintf("allow rw path=%s", getVolumeRootPathCeph(volID)),
"mds", fmt.Sprintf("allow rw path=%s", volRootPath),
"mon", "allow r",
"osd", fmt.Sprintf("allow rw pool=%s namespace=%s", volOptions.Pool, getVolumeNamespace(volID)),
)
Expand Down
7 changes: 6 additions & 1 deletion pkg/cephfs/mountcache.go
Expand Up @@ -95,11 +95,16 @@ func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *vo
volID := vid.VolumeID

if volOptions.ProvisionVolume {
volOptions.RootPath = getVolumeRootPathCeph(volumeID(vid.FsSubvolName))
cr, err = util.GetAdminCredentials(decodeCredentials(me.Secrets))
if err != nil {
return err
}

volOptions.RootPath, err = getVolumeRootPathCeph(volOptions, cr, volumeID(vid.FsSubvolName))
if err != nil {
return err
}

var entity *cephEntity
entity, err = getCephUser(volOptions, cr, volumeID(vid.FsSubvolName))
if err != nil {
Expand Down
6 changes: 0 additions & 6 deletions pkg/cephfs/util.go
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"os/exec"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -87,11 +86,6 @@ func isMountPoint(p string) (bool, error) {
return !notMnt, nil
}

func pathExists(p string) bool {
_, err := os.Stat(p)
return err == nil
}

// Controller service request validation
func (cs *ControllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
Expand Down
192 changes: 84 additions & 108 deletions pkg/cephfs/volume.go
Expand Up @@ -17,142 +17,118 @@ limitations under the License.
package cephfs

import (
"fmt"
"os"
"path"
"strconv"
"strings"

"github.com/ceph/ceph-csi/pkg/util"

"k8s.io/klog"
)

const (
cephVolumesRoot = "csi-volumes"

namespacePrefix = "ns-"
namespacePrefix = "fsvolumens_"
csiSubvolumeGroup = "csi"
)

func getCephRootPathLocal(volID volumeID) string {
return fmt.Sprintf("%s/controller/volumes/root-%s", PluginFolder, string(volID))
}
var (
// cephfsInit is used to create "csi" subvolume group for the first time the csi plugin loads.
// Subvolume group create gets called every time the plugin loads, though it doesn't result in error
// its unnecessary
cephfsInit = false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a comment for this one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

)

func getCephRootVolumePathLocal(volID volumeID) string {
return path.Join(getCephRootPathLocal(volID), cephVolumesRoot, string(volID))
}
func getVolumeRootPathCeph(volOptions *volumeOptions, cr *util.Credentials, volID volumeID) (string, error) {
stdout, _, err := util.ExecCommand(
poornimag marked this conversation as resolved.
Show resolved Hide resolved
"ceph",
"fs",
"subvolume",
"getpath",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was expecting a getbasepath command, rather than a getpath command, such that we need to call it only once and can construct the subvolume name as basepath + '/' + volID. It would optimize a call to Ceph to retrieve this data for each Create/Delete/Mount as needed. @ajarr something to think about for the future, or not a viable alternative?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can do this, ceph fs subvolumegroup getpath. Added a tracker ticket,
http://tracker.ceph.com/issues/40617

Copy link
Contributor

@ajarr ajarr Jul 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we breaking the abstraction if we let CSI construct subvolume path based on the subvolumegroup path?
@ShyamsundarR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I look at it the VolID that we pass is the subvolume name, hence subvolbasepath + subvolname (that we set) is the subvolume path. As a result I do not see this as an abstraction break, but maybe I am reading it this way because I want to :)

My intention for the base path is to avoid repeated calls to Ceph to fetch the same, as a result improving per operation performance by that much.

volOptions.FsName,
string(volID),
"--group_name",
csiSubvolumeGroup,
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix+cr.ID,
"--key="+cr.Key)

func getVolumeRootPathCeph(volID volumeID) string {
return path.Join("/", cephVolumesRoot, string(volID))
if err != nil {
klog.Errorf("failed to get the rootpath for the vol %s(%s)", string(volID), err)
return "", err
}
return strings.TrimSuffix(string(stdout), "\n"), nil
}

func getVolumeNamespace(volID volumeID) string {
return namespacePrefix + string(volID)
}

func setVolumeAttribute(root, attrName, attrValue string) error {
return execCommandErr("setfattr", "-n", attrName, "-v", attrValue, root)
}

func createVolume(volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID, bytesQuota int64) error {
if err := mountCephRoot(volID, volOptions, adminCr); err != nil {
return err
}
defer unmountCephRoot(volID)

var (
volRoot = getCephRootVolumePathLocal(volID)
volRootCreating = volRoot + "-creating"
)

if pathExists(volRoot) {
klog.V(4).Infof("cephfs: volume %s already exists, skipping creation", volID)
return nil
}

if err := createMountPoint(volRootCreating); err != nil {
return err
}

if bytesQuota > 0 {
if err := setVolumeAttribute(volRootCreating, "ceph.quota.max_bytes", fmt.Sprintf("%d", bytesQuota)); err != nil {
func createVolume(volOptions *volumeOptions, cr *util.Credentials, volID volumeID, bytesQuota int64) error {
//TODO: When we support multiple fs, need to hande subvolume group create for all fs's
if !cephfsInit {
err := execCommandErr(
"ceph",
"fs",
"subvolumegroup",
"create",
volOptions.FsName,
csiSubvolumeGroup,
"--mode",
"777",
"--pool_layout",
volOptions.Pool,
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix+cr.ID,
"--key="+cr.Key)
if err != nil {
klog.Errorf("failed to create subvolume group csi, for the vol %s(%s)", string(volID), err)
return err
}
klog.V(4).Infof("cephfs: created subvolume group csi")
cephfsInit = true
poornimag marked this conversation as resolved.
Show resolved Hide resolved
}

if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool", volOptions.Pool); err != nil {
poornimag marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool)
}

if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volID)); err != nil {
return err
}

if err := os.Rename(volRootCreating, volRoot); err != nil {
return fmt.Errorf("couldn't mark volume %s as created: %v", volID, err)
}

return nil
}

func purgeVolume(volID volumeID, adminCr *util.Credentials, volOptions *volumeOptions) error {
if err := mountCephRoot(volID, volOptions, adminCr); err != nil {
err := execCommandErr(
"ceph",
"fs",
"subvolume",
"create",
poornimag marked this conversation as resolved.
Show resolved Hide resolved
volOptions.FsName,
string(volID),
strconv.FormatInt(bytesQuota, 10),
"--group_name",
csiSubvolumeGroup,
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix+cr.ID,
"--key="+cr.Key)
if err != nil {
klog.Errorf("failed to create subvolume %s(%s) in fs %s", string(volID), err, volOptions.FsName)
poornimag marked this conversation as resolved.
Show resolved Hide resolved
return err
}
defer unmountCephRoot(volID)

var (
volRoot = getCephRootVolumePathLocal(volID)
volRootDeleting = volRoot + "-deleting"
)

if pathExists(volRoot) {
if err := os.Rename(volRoot, volRootDeleting); err != nil {
return fmt.Errorf("couldn't mark volume %s for deletion: %v", volID, err)
}
} else {
if !pathExists(volRootDeleting) {
klog.V(4).Infof("cephfs: volume %s not found, assuming it to be already deleted", volID)
return nil
}
}

if err := os.RemoveAll(volRootDeleting); err != nil {
return fmt.Errorf("failed to delete volume %s: %v", volID, err)
}

return nil
}

func mountCephRoot(volID volumeID, volOptions *volumeOptions, adminCr *util.Credentials) error {
cephRoot := getCephRootPathLocal(volID)

// Root path is not set for dynamically provisioned volumes
// Access to cephfs's / is required
volOptions.RootPath = "/"

if err := createMountPoint(cephRoot); err != nil {
return err
}

m, err := newMounter(volOptions)
func purgeVolume(volID volumeID, cr *util.Credentials, volOptions *volumeOptions) error {
err := execCommandErr(
"ceph",
"fs",
"subvolume",
"rm",
volOptions.FsName,
string(volID),
"--group_name",
csiSubvolumeGroup,
"--force",
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix+cr.ID,
"--key="+cr.Key)
if err != nil {
return fmt.Errorf("failed to create mounter: %v", err)
}

if err = m.mount(cephRoot, adminCr, volOptions); err != nil {
return fmt.Errorf("error mounting ceph root: %v", err)
klog.Errorf("failed to purge subvolume %s(%s) in fs %s", string(volID), err, volOptions.FsName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid volID to string conversion use %v for printing

return err
}

poornimag marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func unmountCephRoot(volID volumeID) {
cephRoot := getCephRootPathLocal(volID)

if err := unmountVolume(cephRoot); err != nil {
klog.Errorf("failed to unmount %s with error %s", cephRoot, err)
} else {
if err := os.Remove(cephRoot); err != nil {
klog.Errorf("failed to remove %s with error %s", cephRoot, err)
}
}
}
9 changes: 7 additions & 2 deletions pkg/cephfs/volumeoptions.go
Expand Up @@ -18,6 +18,7 @@ package cephfs

import (
"fmt"
"path"
"strconv"

"github.com/pkg/errors"
Expand Down Expand Up @@ -225,7 +226,11 @@ func newVolumeOptionsFromVolID(volID string, volOpt, secrets map[string]string)
}
}

volOptions.RootPath = getVolumeRootPathCeph(volumeID(vid.FsSubvolName))
volOptions.RootPath, err = getVolumeRootPathCeph(&volOptions, cr, volumeID(vid.FsSubvolName))
if err != nil {
return nil, nil, err
}

volOptions.ProvisionVolume = true

return &volOptions, &vid, nil
Expand Down Expand Up @@ -267,7 +272,7 @@ func newVolumeOptionsFromVersion1Context(volID string, options, secrets map[stri
return nil, nil, err
}

opts.RootPath = getVolumeRootPathCeph(volumeID(volID))
opts.RootPath = path.Join("/csi-volumes", string(volumeID(volID)))
poornimag marked this conversation as resolved.
Show resolved Hide resolved
} else {
if err = extractOption(&opts.RootPath, "rootPath", options); err != nil {
return nil, nil, err
Expand Down
9 changes: 5 additions & 4 deletions pkg/util/cephcmds.go
Expand Up @@ -31,17 +31,18 @@ import (
// ExecCommand executes passed in program with args and returns separate stdout and stderr streams
func ExecCommand(program string, args ...string) (stdout, stderr []byte, err error) {
var (
cmd = exec.Command(program, args...) // nolint: gosec
stdoutBuf bytes.Buffer
stderrBuf bytes.Buffer
cmd = exec.Command(program, args...) // nolint: gosec
sanitizedArgs = StripSecretInArgs(args)
stdoutBuf bytes.Buffer
stderrBuf bytes.Buffer
)

cmd.Stdout = &stdoutBuf
cmd.Stderr = &stderrBuf

if err := cmd.Run(); err != nil {
return stdoutBuf.Bytes(), stderrBuf.Bytes(), fmt.Errorf("an error (%v)"+
" occurred while running %s", err, program)
" occurred while running %s args: %v", err, program, sanitizedArgs)
}

return stdoutBuf.Bytes(), nil, nil
Expand Down
2 changes: 1 addition & 1 deletion scripts/travis-functest.sh
Expand Up @@ -11,6 +11,6 @@ sudo scripts/minikube.sh k8s-sidecar
sudo chown -R travis: "$HOME"/.minikube /usr/local/bin/kubectl
# functional tests

go test github.com/ceph/ceph-csi/e2e --rook-version=v1.0.1 --deploy-rook=true --deploy-timeout=10 -timeout=30m -v
go test github.com/ceph/ceph-csi/e2e --rook-version=master --deploy-rook=true --deploy-timeout=10 -timeout=30m -v

sudo scripts/minikube.sh clean