Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hammer uclient checking #4629

Merged
3 commits merged into from May 11, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
105 changes: 105 additions & 0 deletions src/client/Client.cc
Expand Up @@ -2677,6 +2677,10 @@ void Client::put_cap_ref(Inode *in, int cap)

int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff)
{
int r = check_pool_perm(in, need);
if (r < 0)
return r;

while (1) {
if (!in->is_any_caps())
return -ESTALE;
Expand Down Expand Up @@ -10608,6 +10612,107 @@ bool Client::is_quota_bytes_approaching(Inode *in)
return false;
}

enum {
POOL_CHECKED = 1,
POOL_CHECKING = 2,
POOL_READ = 4,
POOL_WRITE = 8,
};

int Client::check_pool_perm(Inode *in, int need)
{
if (!cct->_conf->client_check_pool_perm)
return 0;

int64_t pool = in->layout.fl_pg_pool;
int have = 0;
while (true) {
std::map<int64_t, int>::iterator it = pool_perms.find(pool);
if (it == pool_perms.end())
break;
if (it->second == POOL_CHECKING) {
// avoid concurrent checkings
wait_on_list(waiting_for_pool_perm);
} else {
have = it->second;
assert(have & POOL_CHECKED);
break;
}
}

if (!have) {
pool_perms[pool] = POOL_CHECKING;

char oid_buf[32];
snprintf(oid_buf, sizeof(oid_buf), "%llx.00000000", (unsigned long long)in->ino);
object_t oid = oid_buf;

C_SaferCond rd_cond;
ObjectOperation rd_op;
rd_op.stat(NULL, (utime_t*)NULL, NULL);

objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), rd_op,
in->snaprealm->get_snap_context(), ceph_clock_now(cct), 0,
&rd_cond, NULL);

C_SaferCond wr_cond;
ObjectOperation wr_op;
wr_op.create(true);

objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), wr_op,
in->snaprealm->get_snap_context(), ceph_clock_now(cct), 0,
&wr_cond, NULL);

client_lock.Unlock();
int rd_ret = rd_cond.wait();
int wr_ret = wr_cond.wait();
client_lock.Lock();

bool errored = false;

if (rd_ret == 0 || rd_ret == -ENOENT)
have |= POOL_READ;
else if (rd_ret != -EPERM) {
ldout(cct, 10) << "check_pool_perm on pool " << pool
<< " rd_err = " << rd_ret << " wr_err = " << wr_ret << dendl;
errored = true;
}

if (wr_ret == 0 || wr_ret == -EEXIST)
have |= POOL_WRITE;
else if (wr_ret != -EPERM) {
ldout(cct, 10) << "check_pool_perm on pool " << pool
<< " rd_err = " << rd_ret << " wr_err = " << wr_ret << dendl;
errored = true;
}

if (errored) {
// Indeterminate: erase CHECKING state so that subsequent calls re-check.
// Raise EIO because actual error code might be misleading for
// userspace filesystem user.
pool_perms.erase(pool);
signal_cond_list(waiting_for_pool_perm);
return -EIO;
}

pool_perms[pool] = have | POOL_CHECKED;
signal_cond_list(waiting_for_pool_perm);
}

if ((need & CEPH_CAP_FILE_RD) && !(have & POOL_READ)) {
ldout(cct, 10) << "check_pool_perm on pool " << pool
<< " need " << ccap_string(need) << ", but no read perm" << dendl;
return -EPERM;
}
if ((need & CEPH_CAP_FILE_WR) && !(have & POOL_WRITE)) {
ldout(cct, 10) << "check_pool_perm on pool " << pool
<< " need " << ccap_string(need) << ", but no write perm" << dendl;
return -EPERM;
}

return 0;
}

void Client::set_filer_flags(int flags)
{
Mutex::Locker l(client_lock);
Expand Down
4 changes: 4 additions & 0 deletions src/client/Client.h
Expand Up @@ -480,6 +480,10 @@ class Client : public Dispatcher {
bool is_quota_bytes_exceeded(Inode *in, int64_t new_bytes);
bool is_quota_bytes_approaching(Inode *in);

std::map<int64_t, int> pool_perms;
list<Cond*> waiting_for_pool_perm;
int check_pool_perm(Inode *in, int need);

public:
void set_filer_flags(int flags);
void clear_filer_flags(int flags);
Expand Down
1 change: 1 addition & 0 deletions src/common/config_opts.h
Expand Up @@ -329,6 +329,7 @@ OPTION(fuse_debug, OPT_BOOL, false)
OPTION(fuse_multithreaded, OPT_BOOL, true)
OPTION(client_try_dentry_invalidate, OPT_BOOL, true) // the client should try to use dentry invaldation instead of remounting, on kernels it believes that will work for
OPTION(client_die_on_failed_remount, OPT_BOOL, true)
OPTION(client_check_pool_perm, OPT_BOOL, true)

OPTION(crush_location, OPT_STR, "") // whitespace-separated list of key=value pairs describing crush location

Expand Down