Skip to content

Commit

Permalink
client: check OSD caps before read/write
Browse files Browse the repository at this point in the history
Signed-off-by: Yan, Zheng <zyan@redhat.com>
  • Loading branch information
ukernel committed Apr 27, 2015
1 parent 66e90d0 commit 3c4028e
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 0 deletions.
100 changes: 100 additions & 0 deletions src/client/Client.cc
Expand Up @@ -2719,6 +2719,10 @@ void Client::put_cap_ref(Inode *in, int cap)

int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff)
{
int r = check_pool_perm(in, need);
if (r < 0)
return r;

while (1) {
if (!in->is_any_caps())
return -ESTALE;
Expand Down Expand Up @@ -10740,6 +10744,102 @@ bool Client::is_quota_bytes_approaching(Inode *in)
return false;
}

enum {
POOL_CHECKED = 1,
POOL_CHECKING = 2,
POOL_READ = 4,
POOL_WRITE = 8,
};

int Client::check_pool_perm(Inode *in, int need)
{
if (!cct->_conf->client_check_pool_perm)
return 0;

int64_t pool = in->layout.fl_pg_pool;
int have = 0;
while (true) {
std::map<int64_t, int>::iterator it = pool_perms.find(pool);
if (it == pool_perms.end())
break;
if (it->second == POOL_CHECKING) {
// avoid concurrent checkings
wait_on_list(waiting_for_pool_perm);
} else {
have = it->second;
assert(have & POOL_CHECKED);
break;
}
}

if (!have) {
pool_perms[pool] = POOL_CHECKING;

char oid_buf[32];
snprintf(oid_buf, sizeof(oid_buf), "%llx.00000000", (unsigned long long)in->ino);
object_t oid = oid_buf;

Mutex lock("Client::check_pool_perm::lock");
Cond cond;
bool done[2] = {false, false};
int rd_ret;
int wr_ret;

ObjectOperation rd_op;
rd_op.stat(NULL, (utime_t*)NULL, NULL);

objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), rd_op,
in->snaprealm->get_snap_context(), ceph_clock_now(cct), 0,
new C_SafeCond(&lock, &cond, &done[0], &rd_ret), NULL);

ObjectOperation wr_op;
wr_op.create(true);

objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), wr_op,
in->snaprealm->get_snap_context(), ceph_clock_now(cct), 0,
new C_SafeCond(&lock, &cond, &done[1], &wr_ret), NULL);

client_lock.Unlock();
lock.Lock();
while (!done[0] || !done[1])
cond.Wait(lock);
lock.Unlock();
client_lock.Lock();

if (rd_ret == 0 || rd_ret == -ENOENT)
have |= POOL_READ;
else if (rd_ret != -EPERM) {
ldout(cct, 10) << "check_pool_perm on pool " << pool
<< " rd_err = " << rd_ret << " wr_err = " << wr_ret << dendl;
return rd_ret;
}

if (wr_ret == 0 || wr_ret == -EEXIST)
have |= POOL_WRITE;
else if (wr_ret != -EPERM) {
ldout(cct, 10) << "check_pool_perm on pool " << pool
<< " rd_err = " << rd_ret << " wr_err = " << wr_ret << dendl;
return wr_ret;
}

pool_perms[pool] = have | POOL_CHECKED;
signal_cond_list(waiting_for_pool_perm);
}

if ((need & CEPH_CAP_FILE_RD) && !(have & POOL_READ)) {
ldout(cct, 10) << "check_pool_perm on pool " << pool
<< " need " << ccap_string(need) << ", but no read perm" << dendl;
return -EPERM;
}
if ((need & CEPH_CAP_FILE_WR) && !(have & POOL_WRITE)) {
ldout(cct, 10) << "check_pool_perm on pool " << pool
<< " need " << ccap_string(need) << ", but no write perm" << dendl;
return -EPERM;
}

return 0;
}

void Client::set_filer_flags(int flags)
{
Mutex::Locker l(client_lock);
Expand Down
4 changes: 4 additions & 0 deletions src/client/Client.h
Expand Up @@ -478,6 +478,10 @@ class Client : public Dispatcher {
bool is_quota_bytes_exceeded(Inode *in, int64_t new_bytes);
bool is_quota_bytes_approaching(Inode *in);

std::map<int64_t, int> pool_perms;
list<Cond*> waiting_for_pool_perm;
int check_pool_perm(Inode *in, int need);

public:
void set_filer_flags(int flags);
void clear_filer_flags(int flags);
Expand Down
1 change: 1 addition & 0 deletions src/common/config_opts.h
Expand Up @@ -331,6 +331,7 @@ OPTION(fuse_debug, OPT_BOOL, false)
OPTION(fuse_multithreaded, OPT_BOOL, true)
OPTION(client_try_dentry_invalidate, OPT_BOOL, true) // the client should try to use dentry invaldation instead of remounting, on kernels it believes that will work for
OPTION(client_die_on_failed_remount, OPT_BOOL, true)
OPTION(client_check_pool_perm, OPT_BOOL, true)

OPTION(crush_location, OPT_STR, "") // whitespace-separated list of key=value pairs describing crush location

Expand Down

0 comments on commit 3c4028e

Please sign in to comment.