Skip to content

Commit

Permalink
Merge pull request #10386 from ceph/wip-jlayton-nlink
Browse files Browse the repository at this point in the history
Fix attribute handling at lookup time

Reviewed-by: Greg Farnum <gfarnum@redhat.com>
  • Loading branch information
John Spray committed Aug 2, 2016
2 parents af113a4 + 14ee7bc commit 01cd578
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 30 deletions.
94 changes: 94 additions & 0 deletions doc/cephfs/capabilities.rst
@@ -0,0 +1,94 @@
======================
Capabilities in CephFS
======================
When a client wants to operate on an inode, it will query the MDS in various
ways, which will then grant the client a set of **capabilities**. These
grant the client permissions to operate on the inode in various ways. One
of the major differences from other network filesystems (e.g NFS or SMB) is
that the capabilities granted are quite granular, and it's possible that
multiple clients can hold different capabilities on the same inodes.

Types of Capabilities
---------------------
There are several "generic" capability bits. These denote what sort of ability
the capability grants.

::
/* generic cap bits */
#define CEPH_CAP_GSHARED 1 /* client can reads (s) */
#define CEPH_CAP_GEXCL 2 /* client can read and update (x) */
#define CEPH_CAP_GCACHE 4 /* (file) client can cache reads (c) */
#define CEPH_CAP_GRD 8 /* (file) client can read (r) */
#define CEPH_CAP_GWR 16 /* (file) client can write (w) */
#define CEPH_CAP_GBUFFER 32 /* (file) client can buffer writes (b) */
#define CEPH_CAP_GWREXTEND 64 /* (file) client can extend EOF (a) */
#define CEPH_CAP_GLAZYIO 128 /* (file) client can perform lazy io (l) */

These are then shifted by a particular number of bits. These denote a part of
the inode's data or metadata on which the capability is being granted:

::
/* per-lock shift */
#define CEPH_CAP_SAUTH 2 /* A */
#define CEPH_CAP_SLINK 4 /* L */
#define CEPH_CAP_SXATTR 6 /* X */
#define CEPH_CAP_SFILE 8 /* F */

Only certain generic cap types are ever granted for some of those "shifts",
however. In particular, only the FILE shift ever has more than the first two
bits.

::
| AUTH | LINK | XATTR | FILE
2 4 6 8

From the above, we get a number of constants, that are generated by taking
each bit value and shifting to the correct bit in the word:

::
#define CEPH_CAP_AUTH_SHARED (CEPH_CAP_GSHARED << CEPH_CAP_SAUTH)

These bits can then be or'ed together to make a bitmask denoting a set of
capabilities.

There is one exception:

::
#define CEPH_CAP_PIN 1 /* no specific capabilities beyond the pin */

The "pin" just pins the inode into memory, without granting any other caps.

Abilities granted by each cap:
------------------------------
While that is how capabilities are granted (and communicated), the important
bit is what they actually allow the client to do:

* PIN: this just pins the inode into memory. This is sufficient to allow the
client to get to the inode number, as well as other immutable things like
major or minor numbers in a device inode, or symlink contents.

* AUTH: this grants the ability to get to the authentication-related metadata.
In particular, the owner, group and mode. Note that doing a full permission
check may require getting at ACLs as well, which are stored in xattrs.

* LINK: the link count of the inode

* XATTR: ability to access or manipulate xattrs. Note that since ACLs are
stored in xattrs, it's also sometimes necessary to access them when checking
permissions.

* FILE: this is the big one. These allow the client to access and manipulate
file data. It also covers certain metadata relating to file data -- the
size, mtime, atime and ctime, in particular.

Shorthand:
----------
Note that the client logging can also present a compact representation of the
capabilities. For example:

::
pAsLsXsFs

The 'p' represents the pin. Each capital letter corresponds to the shift
values, and the lowercase letters after each shift are for the actual
capabilities granted in each shift.
47 changes: 25 additions & 22 deletions src/client/Client.cc
Expand Up @@ -1550,7 +1550,7 @@ int Client::verify_reply_trace(int r,
<< " got_ino " << got_created_ino
<< " ino " << created_ino
<< dendl;
r = _do_lookup(d->dir->parent_inode, d->name, &target, uid, gid);
r = _do_lookup(d->dir->parent_inode, d->name, request->regetattr_mask, &target, uid, gid);
} else {
// if the dentry is not linked, just do our best. see #5021.
assert(0 == "how did this happen? i want logs!");
Expand Down Expand Up @@ -5208,7 +5208,7 @@ int Client::may_delete(Inode *dir, const char *name, int uid, int gid)
/* 'name == NULL' means rmsnap */
if (uid != 0 && name && (dir->mode & S_ISVTX)) {
InodeRef otherin;
r = _lookup(dir, name, &otherin, uid, gid);
r = _lookup(dir, name, CEPH_CAP_AUTH_SHARED, &otherin, uid, gid);
if (r < 0)
goto out;
if (dir->uid != (uid_t)uid && otherin->uid != (uid_t)uid)
Expand Down Expand Up @@ -5876,7 +5876,7 @@ void Client::renew_caps(MetaSession *session)
// ===============================================================
// high level (POSIXy) interface

int Client::_do_lookup(Inode *dir, const string& name, InodeRef *target,
int Client::_do_lookup(Inode *dir, const string& name, int mask, InodeRef *target,
int uid, int gid)
{
int op = dir->snapid == CEPH_SNAPDIR ? CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
Expand All @@ -5887,9 +5887,8 @@ int Client::_do_lookup(Inode *dir, const string& name, InodeRef *target,
req->set_filepath(path);
req->set_inode(dir);
if (cct->_conf->client_debug_getattr_caps && op == CEPH_MDS_OP_LOOKUP)
req->head.args.getattr.mask = DEBUG_GETATTR_CAPS;
else
req->head.args.getattr.mask = 0;
mask |= DEBUG_GETATTR_CAPS;
req->head.args.getattr.mask = mask;

ldout(cct, 10) << "_do_lookup on " << path << dendl;

Expand All @@ -5898,8 +5897,8 @@ int Client::_do_lookup(Inode *dir, const string& name, InodeRef *target,
return r;
}

int Client::_lookup(Inode *dir, const string& dname, InodeRef *target,
int uid, int gid)
int Client::_lookup(Inode *dir, const string& dname, int mask,
InodeRef *target, int uid, int gid)
{
int r = 0;
Dentry *dn = NULL;
Expand Down Expand Up @@ -5941,7 +5940,7 @@ int Client::_lookup(Inode *dir, const string& dname, InodeRef *target,
<< " seq " << dn->lease_seq
<< dendl;

if (!dn->inode || dn->inode->is_any_caps()) {
if (!dn->inode || dn->inode->caps_issued_mask(mask)) {
// is dn lease valid?
utime_t now = ceph_clock_now(cct);
if (dn->lease_mds >= 0 &&
Expand All @@ -5960,8 +5959,9 @@ int Client::_lookup(Inode *dir, const string& dname, InodeRef *target,
}
// dir lease?
if (dir->caps_issued_mask(CEPH_CAP_FILE_SHARED)) {
if (dn->cap_shared_gen == dir->shared_gen)
goto hit_dn;
if (dn->cap_shared_gen == dir->shared_gen &&
(!dn->inode || dn->inode->caps_issued_mask(mask)))
goto hit_dn;
if (!dn->inode && (dir->flags & I_COMPLETE)) {
ldout(cct, 10) << "_lookup concluded ENOENT locally for "
<< *dir << " dn '" << dname << "'" << dendl;
Expand All @@ -5980,7 +5980,7 @@ int Client::_lookup(Inode *dir, const string& dname, InodeRef *target,
}
}

r = _do_lookup(dir, dname, target, uid, gid);
r = _do_lookup(dir, dname, mask, target, uid, gid);
goto done;

hit_dn:
Expand Down Expand Up @@ -6050,6 +6050,7 @@ int Client::path_walk(const filepath& origpath, InodeRef *end, bool followsym,
ldout(cct, 10) << "path_walk " << path << dendl;

int symlinks = 0;
int caps = 0;

unsigned i=0;
while (i < path.depth() && cur) {
Expand All @@ -6061,8 +6062,9 @@ int Client::path_walk(const filepath& origpath, InodeRef *end, bool followsym,
int r = may_lookup(cur.get(), uid, gid);
if (r < 0)
return r;
caps = CEPH_CAP_AUTH_SHARED;
}
int r = _lookup(cur.get(), dname, &next, uid, gid);
int r = _lookup(cur.get(), dname, caps, &next, uid, gid);
if (r < 0)
return r;
// only follow trailing symlink if followsym. always follow
Expand Down Expand Up @@ -6246,16 +6248,17 @@ int Client::mkdirs(const char *relpath, mode_t mode)
//get through existing parts of path
filepath path(relpath);
unsigned int i;
int r=0;
int r = 0, caps = 0;
InodeRef cur, next;
cur = cwd;
for (i=0; i<path.depth(); ++i) {
if (cct->_conf->client_permissions) {
r = may_lookup(cur.get(), uid, gid);
if (r < 0)
break;
caps = CEPH_CAP_AUTH_SHARED;
}
r = _lookup(cur.get(), path[i].c_str(), &next, uid, gid);
r = _lookup(cur.get(), path[i].c_str(), caps, &next, uid, gid);
if (r < 0)
break;
cur.swap(next);
Expand Down Expand Up @@ -6678,7 +6681,7 @@ int Client::fill_stat(Inode *in, struct stat *st, frag_info_t *dirstat, nest_inf
st->st_nlink = in->nlink;
st->st_uid = in->uid;
st->st_gid = in->gid;
if (in->ctime.sec() > in->mtime.sec()) {
if (in->ctime > in->mtime) {
stat_set_ctime_sec(st, in->ctime.sec());
stat_set_ctime_nsec(st, in->ctime.nsec());
} else {
Expand Down Expand Up @@ -9523,7 +9526,7 @@ int Client::ll_lookup(Inode *parent, const char *name, struct stat *attr,
string dname(name);
InodeRef in;

r = _lookup(parent, dname, &in, uid, gid);
r = _lookup(parent, dname, CEPH_STAT_CAP_INODE_ALL, &in, uid, gid);
if (r < 0) {
attr->st_ino = 0;
goto out;
Expand Down Expand Up @@ -10851,7 +10854,7 @@ int Client::_unlink(Inode *dir, const char *name, int uid, int gid)
req->dentry_drop = CEPH_CAP_FILE_SHARED;
req->dentry_unless = CEPH_CAP_FILE_EXCL;

res = _lookup(dir, name, &otherin, uid, gid);
res = _lookup(dir, name, 0, &otherin, uid, gid);
if (res < 0)
goto fail;
req->set_other_inode(otherin.get());
Expand Down Expand Up @@ -10914,7 +10917,7 @@ int Client::_rmdir(Inode *dir, const char *name, int uid, int gid)
int res = get_or_create(dir, name, &de);
if (res < 0)
goto fail;
res = _lookup(dir, name, &in, uid, gid);
res = _lookup(dir, name, 0, &in, uid, gid);
if (res < 0)
goto fail;
if (req->get_op() == CEPH_MDS_OP_RMDIR) {
Expand Down Expand Up @@ -11011,13 +11014,13 @@ int Client::_rename(Inode *fromdir, const char *fromname, Inode *todir, const ch
req->dentry_unless = CEPH_CAP_FILE_EXCL;

InodeRef oldin, otherin;
res = _lookup(fromdir, fromname, &oldin, uid, gid);
res = _lookup(fromdir, fromname, 0, &oldin, uid, gid);
if (res < 0)
goto fail;
req->set_old_inode(oldin.get());
req->old_inode_drop = CEPH_CAP_LINK_SHARED;

res = _lookup(todir, toname, &otherin, uid, gid);
res = _lookup(todir, toname, 0, &otherin, uid, gid);
if (res != 0 && res != -ENOENT) {
goto fail;
} else if (res == 0) {
Expand Down Expand Up @@ -11353,7 +11356,7 @@ int Client::ll_create(Inode *parent, const char *name, mode_t mode,

bool created = false;
InodeRef in;
int r = _lookup(parent, name, &in, uid, gid);
int r = _lookup(parent, name, CEPH_STAT_CAP_INODE_ALL, &in, uid, gid);

if (r == 0 && (flags & O_CREAT) && (flags & O_EXCL))
return -EEXIST;
Expand Down
4 changes: 2 additions & 2 deletions src/client/Client.h
Expand Up @@ -750,8 +750,8 @@ class Client : public Dispatcher, public md_config_obs_t {

// internal interface
// call these with client_lock held!
int _do_lookup(Inode *dir, const string& name, InodeRef *target, int uid, int gid);
int _lookup(Inode *dir, const string& dname, InodeRef *target, int uid, int gid);
int _do_lookup(Inode *dir, const string& name, int mask, InodeRef *target, int uid, int gid);
int _lookup(Inode *dir, const string& dname, int mask, InodeRef *target, int uid, int gid);

int _link(Inode *in, Inode *dir, const char *name, int uid=-1, int gid=-1, InodeRef *inp = 0);
int _unlink(Inode *dir, const char *name, int uid=-1, int gid=-1);
Expand Down
6 changes: 0 additions & 6 deletions src/include/ceph_fs.h
Expand Up @@ -580,9 +580,6 @@ int ceph_flags_to_mode(int flags);
#define CEPH_CAP_SLINK 4
#define CEPH_CAP_SXATTR 6
#define CEPH_CAP_SFILE 8
#define CEPH_CAP_SFLOCK 20

#define CEPH_CAP_BITS 22

/* composed values */
#define CEPH_CAP_AUTH_SHARED (CEPH_CAP_GSHARED << CEPH_CAP_SAUTH)
Expand All @@ -600,9 +597,6 @@ int ceph_flags_to_mode(int flags);
#define CEPH_CAP_FILE_BUFFER (CEPH_CAP_GBUFFER << CEPH_CAP_SFILE)
#define CEPH_CAP_FILE_WREXTEND (CEPH_CAP_GWREXTEND << CEPH_CAP_SFILE)
#define CEPH_CAP_FILE_LAZYIO (CEPH_CAP_GLAZYIO << CEPH_CAP_SFILE)
#define CEPH_CAP_FLOCK_SHARED (CEPH_CAP_GSHARED << CEPH_CAP_SFLOCK)
#define CEPH_CAP_FLOCK_EXCL (CEPH_CAP_GEXCL << CEPH_CAP_SFLOCK)


/* cap masks (for getattr) */
#define CEPH_STAT_CAP_INODE CEPH_CAP_PIN
Expand Down
34 changes: 34 additions & 0 deletions src/test/libcephfs/test.cc
Expand Up @@ -1374,3 +1374,37 @@ TEST(LibCephFS, OpenNoClose) {
// shutdown should force close opened file/dir
ceph_shutdown(cmount);
}

TEST(LibCephFS, Nlink) {
struct ceph_mount_info *cmount;
ASSERT_EQ(ceph_create(&cmount, NULL), 0);
ASSERT_EQ(ceph_conf_read_file(cmount, NULL), 0);
ASSERT_EQ(0, ceph_conf_parse_env(cmount, NULL));
ASSERT_EQ(ceph_mount(cmount, "/"), 0);

Inode *root, *dir, *file;

ASSERT_EQ(ceph_ll_lookup_root(cmount, &root), 0);

char dirname[32], filename[32], linkname[32];
sprintf(dirname, "nlinkdir%x", getpid());
sprintf(filename, "nlinkorig%x", getpid());
sprintf(linkname, "nlinklink%x", getpid());

struct stat st;
Fh *fh;

ASSERT_EQ(ceph_ll_mkdir(cmount, root, dirname, 0755, &st, &dir, getuid(), getgid()), 0);
ASSERT_EQ(ceph_ll_create(cmount, dir, filename, 0666, O_RDWR|O_CREAT|O_EXCL,
&st, &file, &fh, getuid(), getgid()), 0);
ASSERT_EQ(st.st_nlink, (nlink_t)1);

ASSERT_EQ(ceph_ll_link(cmount, file, dir, linkname, &st, getuid(), getgid()), 0);
ASSERT_EQ(st.st_nlink, (nlink_t)2);

ASSERT_EQ(ceph_ll_unlink(cmount, dir, linkname, getuid(), getgid()), 0);
ASSERT_EQ(ceph_ll_lookup(cmount, dir, filename, &st, &file, getuid(), getgid()), 0);
ASSERT_EQ(st.st_nlink, (nlink_t)1);

ceph_shutdown(cmount);
}

0 comments on commit 01cd578

Please sign in to comment.