From 74e76b7284363d28801f4859694c74f0b833e68f Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 8 Oct 2015 14:22:31 +0100 Subject: [PATCH] pybind: add ceph_volume_client This is for FSaaS interfaces like Manila. Signed-off-by: John Spray --- src/pybind/ceph_volume_client.py | 690 +++++++++++++++++++++++++++++++ 1 file changed, 690 insertions(+) create mode 100644 src/pybind/ceph_volume_client.py diff --git a/src/pybind/ceph_volume_client.py b/src/pybind/ceph_volume_client.py new file mode 100644 index 0000000000000..6734e4072aa61 --- /dev/null +++ b/src/pybind/ceph_volume_client.py @@ -0,0 +1,690 @@ +""" +Copyright (C) 2015 Red Hat, Inc. + +LGPL2. See file COPYING. +""" + +import json +import logging +import os +import threading +import errno +import time + +import rados +import cephfs +from ceph_argparse import json_command + +# Generate missing lib errors at load time, rather than the +# first time someone tries to use the FS +try: + cephfs.load_libcephfs() +except EnvironmentError as e: + raise ImportError(e.__str__()) + + + +class RadosError(Exception): + """ + Something went wrong talking to Ceph with librados + """ + pass + + +RADOS_TIMEOUT = 10 +SNAP_DIR = ".snap" + +log = logging.getLogger(__name__) + + +# Reserved volume group name which we use in paths for volumes +# that +NO_GROUP_NAME = "_nogroup" + + +class VolumePath(object): + """ + Identify a volume's path as group->volume + The Volume ID is a unique identifier, but this is a much more + helpful thing to pass around. + """ + def __init__(self, group_id, volume_id): + self.group_id = group_id + self.volume_id = volume_id + assert self.group_id != NO_GROUP_NAME + + def __str__(self): + return "{0}/{1}".format(self.group_id, self.volume_id) + + +class ClusterTimeout(Exception): + """ + Exception indicating that we timed out trying to talk to the Ceph cluster, + either to the mons, or to any individual daemon that the mons indicate ought + to be up but isn't responding to us. + """ + pass + + +class ClusterError(Exception): + """ + Exception indicating that the cluster returned an error to a command that + we thought should be successful based on our last knowledge of the cluster + state. + """ + def __init__(self, action, result_code, result_str): + self._action = action + self._result_code = result_code + self._result_str = result_str + + def __str__(self): + return "Error {0} (\"{1}\") while {2}".format( + self._result_code, self._result_str, self._action) + + +class RankEvicter(threading.Thread): + """ + Thread for evicting client(s) from a particular MDS daemon instance. + + This is more complex than simply sending a command, because we have to + handle cases where MDS daemons might not be fully up yet, and/or might + be transiently unresponsive to commands. + """ + class GidGone(Exception): + pass + + POLL_PERIOD = 5 + + def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout): + """ + :param client_spec: list of strings, used as filter arguments to "session evict" + pass ["id=123"] to evict a single client with session id 123. + """ + self.rank = rank + self.gid = gid + self._mds_map = mds_map + self._client_spec = client_spec + self._volume_client = volume_client + self._ready_timeout = ready_timeout + self._ready_waited = 0 + + self.success = False + self.exception = None + + super(RankEvicter, self).__init__() + + def _ready_to_evict(self): + if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid: + log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format( + self._client_spec, + )) + raise RankEvicter.GidGone() + + info = self._mds_map['info']["gid_{0}".format(self.gid)] + log.debug("_ready_to_evict: state={0}".format(info['state'])) + return info['state'] in ["up:active", "up:clientreplay"] + + def _wait_for_ready(self): + """ + Wait for that MDS rank to reach an active or clientreplay state, and + not be laggy. + """ + while not self._ready_to_evict(): + if self._ready_waited > self._ready_timeout: + raise ClusterTimeout() + + time.sleep(self.POLL_PERIOD) + self._ready_waited += self.POLL_PERIOD + + self._mds_map = self._volume_client._rados_command("mds dump", {}) + + def _evict(self): + """ + Run the eviction procedure. Return true on success, false on errors. + """ + + # Wait til the MDS is believed by the mon to be available for commands + try: + self._wait_for_ready() + except self.GidGone: + return True + + # Then send it an evict + ret = errno.ETIMEDOUT + while ret == errno.ETIMEDOUT: + log.debug("mds_command: {0}, {1}".format( + "%s" % self.gid, ["session", "evict"] + self._client_spec + )) + ret, outb, outs = self._volume_client.fs.mds_command( + "%s" % self.gid, + [json.dumps({ + "prefix": "session evict", + "filters": self._client_spec + })], "") + log.debug("mds_command: complete {0} {1}".format(ret, outs)) + + # If we get a clean response, great, it's gone from that rank. + if ret == 0: + return True + elif ret == errno.ETIMEDOUT: + # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error) + self._mds_map = self._volume_client._rados_command("mds dump", {}) + try: + self._wait_for_ready() + except self.GidGone: + return True + else: + raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs) + + def run(self): + try: + self._evict() + except Exception, e: + self.success = False + self.exception = e + else: + self.success = True + + +class EvictionError(Exception): + pass + + +class CephFSVolumeClient(object): + """ + Combine libcephfs and librados interfaces to implement a + 'Volume' concept implemented as a cephfs directory and + client capabilities which restrict mount access to this + directory. + + Additionally, volumes may be in a 'Group'. Conveniently, + volumes are a lot like manila shares, and groups are a lot + like manila consistency groups. + + Refer to volumes with VolumePath, which specifies the + volume and group IDs (both strings). The group ID may + be None. + + In general, functions in this class are allowed raise rados.Error + or cephfs.Error exceptions in unexpected situations. + """ + + # Where shall we create our volumes? + VOLUME_PREFIX = "/volumes" + POOL_PREFIX = "fsvolume_" + + def __init__(self, auth_id, conf_path, cluster_name): + self.fs = None + self.rados = None + self.connected = False + self.conf_path = conf_path + self.cluster_name = cluster_name + self.auth_id = auth_id + + def evict(self, auth_id, timeout=30): + """ + Evict all clients using this authorization ID. Assumes that the + authorisation key has been revoked prior to calling this function. + + This operation can throw an exception if the mon cluster is unresponsive, or + any individual MDS daemon is unresponsive for longer than the timeout passed in. + """ + + log.info("evict: {0}".format(auth_id)) + + mds_map = self._rados_command("mds dump", {}) + + up = {} + for name, gid in mds_map['up'].items(): + # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0" + assert name.startswith("mds_") + up[int(name[4:])] = gid + + # For all MDS ranks held by a daemon + # Do the parallelism in python instead of using "tell mds.*", because + # the latter doesn't give us per-mds output + threads = [] + for rank, gid in up.items(): + thread = RankEvicter(self, ["auth_name={0}".format(auth_id)], rank, gid, mds_map, timeout) + thread.start() + threads.append(thread) + + for t in threads: + t.join() + # + # threads = [] + # for rank, gid in up.items(): + # thread = RankEvicter(self, ["auth_name={0}".format(auth_id)], rank, gid, mds_map, timeout) + # thread.run() + # threads.append(thread) + + log.info("evict: joined all") + + for t in threads: + if not t.success: + msg = "Failed to evict client {0} from mds {1}/{2}: {3}".format( + auth_id, t.rank, t.gid, t.exception + ) + log.error(msg) + raise EvictionError(msg) + + def _get_path(self, volume_path): + """ + Determine the path within CephFS where this volume will live + :return: absolute path (string) + """ + return os.path.join( + self.VOLUME_PREFIX, + volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME, + volume_path.volume_id) + + def _get_group_path(self, group_id): + return os.path.join( + self.VOLUME_PREFIX, + group_id + ) + + def connect(self, premount_evict = None): + """ + + :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers + may want to use this to specify their own auth ID if they expect + to be a unique instance and don't want to wait for caps to time + out after failure of another instance of themselves. + """ + log.debug("Connecting to RADOS with config {0}...".format(self.conf_path)) + self.rados = rados.Rados( + name="client.{0}".format(self.auth_id), + clustername=self.cluster_name, + conffile=self.conf_path, + conf={} + ) + self.rados.connect() + + log.debug("Connection to RADOS complete") + + log.debug("Connecting to cephfs...") + self.fs = cephfs.LibCephFS(rados_inst=self.rados) + log.debug("CephFS initializing...") + self.fs.init() + if premount_evict is not None: + log.debug("Premount eviction of {0} starting".format(premount_evict)) + self.evict(premount_evict) + log.debug("Premount eviction of {0} completes".format(premount_evict)) + log.debug("CephFS mounting...") + self.fs.mount() + log.debug("Connection to cephfs complete") + + def get_mon_addrs(self): + log.info("get_mon_addrs") + result = [] + mon_map = self._rados_command("mon dump") + for mon in mon_map['mons']: + ip_port = mon['addr'].split("/")[0] + result.append(ip_port) + + return result + + def disconnect(self): + log.info("disconnect") + if self.fs: + log.debug("Disconnecting cephfs...") + self.fs.shutdown() + self.fs = None + log.debug("Disconnecting cephfs complete") + + if self.rados: + log.debug("Disconnecting rados...") + self.rados.shutdown() + self.rados = None + log.debug("Disconnecting rados complete") + + def __del__(self): + self.disconnect() + + def _get_pool_id(self, osd_map, pool_name): + # Maybe borrow the OSDMap wrapper class from calamari if more helpers like this aren needed. + for pool in osd_map['pools']: + if pool['pool_name'] == pool_name: + return pool['pool'] + + return None + + def _create_volume_pool(self, pool_name): + """ + Idempotently create a pool for use as a CephFS data pool, with the given name + + :return The ID of the created pool + """ + osd_map = self._rados_command('osd dump', {}) + + existing_id = self._get_pool_id(osd_map, pool_name) + if existing_id is not None: + log.info("Pool {0} already exists".format(pool_name)) + return existing_id + + osd_count = len(osd_map['osds']) + + # We can't query the actual cluster config remotely, but since this is + # just a heuristic we'll assume that the ceph.conf we have locally reflects + # that in use in the rest of the cluster. + pg_warn_max_per_osd = int(self.rados.conf_get('mon_pg_warn_max_per_osd')) + + other_pgs = 0 + for pool in osd_map['pools']: + if not pool['pool_name'].startswith(self.POOL_PREFIX): + other_pgs += pool['pg_num'] + + # A basic heuristic for picking pg_num: work out the max number of + # PGs we can have without tripping a warning, then subtract the number + # of PGs already created by non-manila pools, then divide by ten. That'll + # give you a reasonable result on a system where you have "a few" manila + # shares. + pg_num = ((pg_warn_max_per_osd * osd_count) - other_pgs) / 10 + # TODO Alternatively, respect an override set by the user. + + self._rados_command( + 'osd pool create', + { + 'pool': pool_name, + 'pg_num': pg_num + } + ) + + osd_map = self._rados_command('osd dump', {}) + pool_id = self._get_pool_id(osd_map, pool_name) + + if pool_id is None: + # If the pool isn't there, that's either a ceph bug, or it's some outside influence + # removing it right after we created it. + log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format( + pool_name, json.dumps(osd_map, indent=2) + )) + raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name)) + else: + return pool_id + + def create_group(self, group_id): + path = self._get_group_path(group_id) + self._mkdir_p(path) + + def destroy_group(self, group_id): + path = self._get_group_path(group_id) + try: + self.fs.stat(self.VOLUME_PREFIX) + except cephfs.ObjectNotFound: + pass + else: + self.fs.rmdir(path) + + def _mkdir_p(self, path): + try: + self.fs.stat(path) + except cephfs.ObjectNotFound: + pass + else: + return + + parts = os.path.split(path) + for i in range(1, len(parts) + 1): + subpath = os.path.join(*parts[0:i]) + try: + self.fs.stat(subpath) + except cephfs.ObjectNotFound: + self.fs.mkdir(subpath, 0755) + + def create_volume(self, volume_path, size=None, data_isolated=False): + """ + Set up metadata, pools and auth for a volume. + + This function is idempotent. It is safe to call this again + for an already-created volume, even if it is in use. + + :param volume_path: VolumePath instance + :param size: In bytes, or None for no size limit + :param data_isolated: If true, create a separate OSD pool for this volume + :return: + """ + log.info("create_volume: {0}".format(volume_path)) + path = self._get_path(volume_path) + + # # Fast pre-check: if the auth key (last thing created) exists, then + # # the rest of the volume already exists too. + # try: + # auth = self._rados_command("auth get", {"entity": client_entity})[0] + # except rados.Error: + # pass + # else: + # log.info("create_volume: {0} already exists, returning") + # return { + # 'volume_key': auth['key'], + # 'mount_path': path + # } + + self._mkdir_p(path) + + if size is not None: + self.fs.setxattr(path, 'ceph.quota.max_bytes', size.__str__(), 0) + + if data_isolated: + pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) + pool_id = self._create_volume_pool(pool_name) + mds_map = self._rados_command("mds dump", {}) + if pool_id not in mds_map['data_pools']: + self._rados_command("mds add_data_pool", { + 'pool': pool_name + }) + self.fs.setxattr(path, 'ceph.dir.layout.pool', pool_name, 0) + + return { + 'mount_path': path + } + + def delete_volume(self, volume_path, data_isolated=False): + """ + Remove all trace of a volume from the Ceph cluster. This function is + idempotent. + + :param volume_path: Same identifier used in create_volume + :return: + """ + + log.info("delete_volume: {0}".format(volume_path)) + + # Create the trash folder if it doesn't already exist + trash = os.path.join(self.VOLUME_PREFIX, "_deleting") + self._mkdir_p(trash) + + # We'll move it to here + trashed_volume = os.path.join(trash, volume_path.volume_id) + + # Move the volume's data to the trash folder + path = self._get_path(volume_path) + try: + self.fs.stat(path) + except cephfs.ObjectNotFound: + log.warning("Trying to delete volume '{0}' but it's already gone".format( + path)) + else: + self.fs.rename(path, trashed_volume) + + try: + self.fs.stat(trashed_volume) + except cephfs.ObjectNotFound: + log.warning("Trying to purge volume '{0}' but it's already been purged".format( + trashed_volume)) + return + + def rmtree(root_path): + log.debug("rmtree {0}".format(root_path)) + dir_handle = self.fs.opendir(root_path) + d = self.fs.readdir(dir_handle) + while d: + if d.d_name not in [".", ".."]: + d_full = os.path.join(root_path, d.d_name) + if d.is_dir(): + rmtree(d_full) + else: + self.fs.unlink(d_full) + + d = self.fs.readdir(dir_handle) + self.fs.closedir(dir_handle) + + self.fs.rmdir(root_path) + + rmtree(trashed_volume) + + if data_isolated: + pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id) + osd_map = self._rados_command("osd dump", {}) + pool_id = self._get_pool_id(osd_map, pool_name) + mds_map = self._rados_command("mds dump", {}) + if pool_id in mds_map['data_pools']: + self._rados_command("mds remove_data_pool", { + 'pool': pool_name + }) + self._rados_command("osd pool delete", + { + "pool": pool_name, + "pool2": pool_name, + "sure": "--yes-i-really-really-mean-it" + }) + + def authorize(self, volume_path, auth_id): + """ + Get-or-create a Ceph auth identity for `auth_id` and grant them access + to + :param volume_path: + :param auth_id: + :return: + """ + + path = self._get_path(volume_path) + client_entity = "client.{0}".format(auth_id) + caps = self._rados_command( + 'auth get-or-create', + { + 'entity': client_entity, + 'caps': [ + 'mds', 'allow rw path={0}'.format(path), + 'osd', 'allow rw', + 'mon', 'allow r'] + }) + # Result expected like this: + # [ + # { + # "entity": "client.foobar", + # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==", + # "caps": { + # "mds": "allow *", + # "mon": "allow *" + # } + # } + # ] + + assert len(caps) == 1 + assert caps[0]['entity'] == client_entity + key = caps[0]['key'] + + return key + + def deauthorize(self, volume_path, auth_id): + client_entity = "client.{0}".format(auth_id) + + # Remove the auth key for this volume + self._rados_command('auth del', {'entity': client_entity}, decode=False) + + def _rados_command(self, prefix, args=None, decode=True): + """ + Safer wrapper for ceph_argparse.json_command, which raises + Error exception instead of relying on caller to check return + codes. + + Error exception can result from: + * Timeout + * Actual legitimate errors + * Malformed JSON output + + return: Decoded object from ceph, or None if empty string returned. + If decode is False, return a string (the data returned by + ceph command) + """ + if args is None: + args = {} + + argdict = args.copy() + argdict['format'] = 'json' + + ret, outbuf, outs = json_command(self.rados, + prefix=prefix, + argdict=argdict, + timeout=RADOS_TIMEOUT) + if ret != 0: + raise rados.Error(outs) + else: + if decode: + if outbuf: + try: + return json.loads(outbuf) + except (ValueError, TypeError): + raise RadosError("Invalid JSON output for command {0}".format(argdict)) + else: + return None + else: + return outbuf + + def get_used_bytes(self, volume_path): + return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir.rbytes")) + + def set_max_bytes(self, volume_path, max_bytes): + self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes', + max_bytes.__str__() if max_bytes is not None else "0", + 0) + + def _snapshot_path(self, dir_path, snapshot_name): + return os.path.join( + dir_path, SNAP_DIR, snapshot_name + ) + + def _snapshot_create(self, dir_path, snapshot_name): + # TODO: raise intelligible exception for clusters where snaps are disabled + self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), 0755) + + def _snapshot_destroy(self, dir_path, snapshot_name): + """ + Remove a snapshot, or do nothing if it already doesn't exist. + """ + try: + self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name)) + except cephfs.ObjectNotFound: + log.warn("Snapshot was already gone: {0}".format(snapshot_name)) + + def create_snapshot_volume(self, volume_path, snapshot_name): + self._snapshot_create(self._get_path(volume_path), snapshot_name) + + def destroy_snapshot_volume(self, volume_path, snapshot_name): + self._snapshot_destroy(self._get_path(volume_path), snapshot_name) + + def create_snapshot_group(self, group_id, snapshot_name): + if group_id is None: + raise RuntimeError("Group ID may not be None") + + return self._snapshot_create(self._get_group_path(group_id), snapshot_name) + + def destroy_snapshot_group(self, group_id, snapshot_name): + if group_id is None: + raise RuntimeError("Group ID may not be None") + if snapshot_name is None: + raise RuntimeError("Snapshot name may not be None") + + return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name) + + def _cp_r(self, src, dst): + # TODO + raise NotImplementedError() + + def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name): + dest_fs_path = self._get_path(dest_volume_path) + src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name) + + self._cp_r(src_snapshot_path, dest_fs_path)