diff --git a/src/client/Client.cc b/src/client/Client.cc index 4e6eb8452caf2..1a2a4e6202b6d 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -2719,6 +2719,10 @@ void Client::put_cap_ref(Inode *in, int cap) int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff) { + int r = check_pool_perm(in, need); + if (r < 0) + return r; + while (1) { if (!in->is_any_caps()) return -ESTALE; @@ -10740,6 +10744,102 @@ bool Client::is_quota_bytes_approaching(Inode *in) return false; } +enum { + POOL_CHECKED = 1, + POOL_CHECKING = 2, + POOL_READ = 4, + POOL_WRITE = 8, +}; + +int Client::check_pool_perm(Inode *in, int need) +{ + if (!cct->_conf->client_check_pool_perm) + return 0; + + int64_t pool = in->layout.fl_pg_pool; + int have = 0; + while (true) { + std::map::iterator it = pool_perms.find(pool); + if (it == pool_perms.end()) + break; + if (it->second == POOL_CHECKING) { + // avoid concurrent checkings + wait_on_list(waiting_for_pool_perm); + } else { + have = it->second; + assert(have & POOL_CHECKED); + break; + } + } + + if (!have) { + pool_perms[pool] = POOL_CHECKING; + + char oid_buf[32]; + snprintf(oid_buf, sizeof(oid_buf), "%llx.00000000", (unsigned long long)in->ino); + object_t oid = oid_buf; + + Mutex lock("Client::check_pool_perm::lock"); + Cond cond; + bool done[2] = {false, false}; + int rd_ret; + int wr_ret; + + ObjectOperation rd_op; + rd_op.stat(NULL, (utime_t*)NULL, NULL); + + objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), rd_op, + in->snaprealm->get_snap_context(), ceph_clock_now(cct), 0, + new C_SafeCond(&lock, &cond, &done[0], &rd_ret), NULL); + + ObjectOperation wr_op; + wr_op.create(true); + + objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), wr_op, + in->snaprealm->get_snap_context(), ceph_clock_now(cct), 0, + new C_SafeCond(&lock, &cond, &done[1], &wr_ret), NULL); + + client_lock.Unlock(); + lock.Lock(); + while (!done[0] || !done[1]) + cond.Wait(lock); + lock.Unlock(); + client_lock.Lock(); + + if (rd_ret == 0 || rd_ret == -ENOENT) + have |= POOL_READ; + else if (rd_ret != -EPERM) { + ldout(cct, 10) << "check_pool_perm on pool " << pool + << " rd_err = " << rd_ret << " wr_err = " << wr_ret << dendl; + return rd_ret; + } + + if (wr_ret == 0 || wr_ret == -EEXIST) + have |= POOL_WRITE; + else if (wr_ret != -EPERM) { + ldout(cct, 10) << "check_pool_perm on pool " << pool + << " rd_err = " << rd_ret << " wr_err = " << wr_ret << dendl; + return wr_ret; + } + + pool_perms[pool] = have | POOL_CHECKED; + signal_cond_list(waiting_for_pool_perm); + } + + if ((need & CEPH_CAP_FILE_RD) && !(have & POOL_READ)) { + ldout(cct, 10) << "check_pool_perm on pool " << pool + << " need " << ccap_string(need) << ", but no read perm" << dendl; + return -EPERM; + } + if ((need & CEPH_CAP_FILE_WR) && !(have & POOL_WRITE)) { + ldout(cct, 10) << "check_pool_perm on pool " << pool + << " need " << ccap_string(need) << ", but no write perm" << dendl; + return -EPERM; + } + + return 0; +} + void Client::set_filer_flags(int flags) { Mutex::Locker l(client_lock); diff --git a/src/client/Client.h b/src/client/Client.h index ab24153bc4b2b..4430e565faca7 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -478,6 +478,10 @@ class Client : public Dispatcher { bool is_quota_bytes_exceeded(Inode *in, int64_t new_bytes); bool is_quota_bytes_approaching(Inode *in); + std::map pool_perms; + list waiting_for_pool_perm; + int check_pool_perm(Inode *in, int need); + public: void set_filer_flags(int flags); void clear_filer_flags(int flags); diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 97cd83ed2228a..573f9e88383ec 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -331,6 +331,7 @@ OPTION(fuse_debug, OPT_BOOL, false) OPTION(fuse_multithreaded, OPT_BOOL, true) OPTION(client_try_dentry_invalidate, OPT_BOOL, true) // the client should try to use dentry invaldation instead of remounting, on kernels it believes that will work for OPTION(client_die_on_failed_remount, OPT_BOOL, true) +OPTION(client_check_pool_perm, OPT_BOOL, true) OPTION(crush_location, OPT_STR, "") // whitespace-separated list of key=value pairs describing crush location