Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: unlock client_lock when copying data and do more check for the client_lock #36730

Merged
merged 4 commits into from Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
100 changes: 64 additions & 36 deletions src/client/Client.cc
Expand Up @@ -335,11 +335,8 @@ Client::~Client()
void Client::tear_down_cache()
{
// fd's
for (ceph::unordered_map<int, Fh*>::iterator it = fd_map.begin();
it != fd_map.end();
++it) {
Fh *fh = it->second;
ldout(cct, 1) << __func__ << " forcing close of fh " << it->first << " ino " << fh->inode->ino << dendl;
for (auto &[fd, fh] : fd_map) {
ldout(cct, 1) << __func__ << " forcing close of fh " << fd << " ino " << fh->inode->ino << dendl;
batrick marked this conversation as resolved.
Show resolved Hide resolved
_release_fh(fh);
}
fd_map.clear();
Expand Down Expand Up @@ -5831,6 +5828,8 @@ int Client::authenticate()

int Client::fetch_fsmap(bool user)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
batrick marked this conversation as resolved.
Show resolved Hide resolved

// Retrieve FSMap to enable looking up daemon addresses. We need FSMap
// rather than MDSMap because no one MDSMap contains all the daemons, and
// a `tell` can address any daemon.
Expand Down Expand Up @@ -8417,7 +8416,7 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
if (!mref_reader.is_state_satisfied())
return -ENOTCONN;

std::scoped_lock lock(client_lock);
std::unique_lock cl(client_lock);

dir_result_t *dirp = static_cast<dir_result_t*>(d);

Expand Down Expand Up @@ -8454,9 +8453,9 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
_ll_get(inode);
}

client_lock.unlock();
cl.unlock();
r = cb(p, &de, &stx, next_off, inode);
client_lock.lock();
cl.lock();
if (r < 0)
return r;

Expand Down Expand Up @@ -8487,9 +8486,9 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
_ll_get(inode);
}

client_lock.unlock();
cl.unlock();
r = cb(p, &de, &stx, next_off, inode);
client_lock.lock();
cl.lock();
if (r < 0)
return r;

Expand Down Expand Up @@ -8554,9 +8553,9 @@ int Client::readdir_r_cb(dir_result_t *d, add_dirent_cb_t cb, void *p,
_ll_get(inode);
}

client_lock.unlock();
cl.unlock();
r = cb(p, &de, &stx, next_off, inode); // _next_ offset
client_lock.lock();
cl.lock();
batrick marked this conversation as resolved.
Show resolved Hide resolved

ldout(cct, 15) << " de " << de.d_name << " off " << hex << next_off - 1 << dec
<< " = " << r << dendl;
Expand Down Expand Up @@ -9435,6 +9434,8 @@ int Client::preadv(int fd, const struct iovec *iov, int iovcnt, loff_t offset)

int64_t Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));

int want, have = 0;
bool movepos = false;
std::unique_ptr<C_SaferCond> onuninline;
Expand Down Expand Up @@ -9591,6 +9592,8 @@ void Client::C_Readahead::finish(int r) {

int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));

const auto& conf = cct->_conf;
Inode *in = f->inode.get();

Expand Down Expand Up @@ -9647,31 +9650,25 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl)
int Client::_read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
bool *checkeof)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));

Inode *in = f->inode.get();
uint64_t pos = off;
int left = len;
int read = 0;

ldout(cct, 10) << __func__ << " " << *in << " " << off << "~" << len << dendl;

while (left > 0) {
C_SaferCond onfinish("Client::_read_sync flock");
bufferlist tbl;

int wanted = left;
filer->read_trunc(in->ino, &in->layout, in->snapid,
pos, left, &tbl, 0,
in->truncate_size, in->truncate_seq,
&onfinish);
client_lock.unlock();
// 0 success, 1 continue and < 0 error happen.
auto wait_and_copy = [&](C_SaferCond &onfinish, bufferlist &tbl, int wanted) {
int r = onfinish.wait();
client_lock.lock();

// if we get ENOENT from OSD, assume 0 bytes returned
if (r == -ENOENT)
r = 0;
if (r < 0)
return r;

if (tbl.length()) {
r = tbl.length();

Expand All @@ -9694,12 +9691,31 @@ int Client::_read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
pos += some;
left -= some;
if (left == 0)
return read;
return 0;
}

*checkeof = true;
return read;
return 0;
}
return 1;
};

while (left > 0) {
C_SaferCond onfinish("Client::_read_sync flock");
bufferlist tbl;

int wanted = left;
filer->read_trunc(in->ino, &in->layout, in->snapid,
pos, left, &tbl, 0,
in->truncate_size, in->truncate_seq,
&onfinish);
client_lock.unlock();
int r = wait_and_copy(onfinish, tbl, wanted);
client_lock.lock();
if (!r)
return read;
if (r < 0)
return r;
}
return read;
}
Expand Down Expand Up @@ -9739,7 +9755,7 @@ int Client::pwritev(int fd, const struct iovec *iov, int iovcnt, int64_t offset)

int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
unsigned iovcnt, int64_t offset, bool write,
bool clamp_to_int)
bool clamp_to_int, std::unique_lock<ceph::mutex> &cl)
{
#if defined(__linux__) && defined(O_PATH)
if (fh->flags & O_PATH)
Expand Down Expand Up @@ -9769,18 +9785,20 @@ int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
if (r <= 0)
return r;

cl.unlock();
auto iter = bl.cbegin();
for (unsigned j = 0, resid = r; j < iovcnt && resid > 0; j++) {
/*
* This piece of code aims to handle the case that bufferlist does not have enough data
* to fill in the iov
* This piece of code aims to handle the case that bufferlist
* does not have enough data to fill in the iov
*/
const auto round_size = std::min<unsigned>(resid, iov[j].iov_len);
iter.copy(round_size, reinterpret_cast<char*>(iov[j].iov_base));
resid -= round_size;
/* iter is self-updating */
}
return r;
cl.lock();
return r;
batrick marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -9793,16 +9811,18 @@ int Client::_preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt, in
tout(cct) << fd << std::endl;
tout(cct) << offset << std::endl;

std::scoped_lock lock(client_lock);
std::unique_lock cl(client_lock);
Fh *fh = get_filehandle(fd);
if (!fh)
return -EBADF;
return _preadv_pwritev_locked(fh, iov, iovcnt, offset, write, true);
return -EBADF;
return _preadv_pwritev_locked(fh, iov, iovcnt, offset, write, true, cl);
}

int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
const struct iovec *iov, int iovcnt)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));

uint64_t fpos = 0;

if ((uint64_t)(offset+size) > mdsmap->get_max_filesize()) //too large!
Expand Down Expand Up @@ -10115,6 +10135,8 @@ int Client::fsync(int fd, bool syncdataonly)

int Client::_fsync(Inode *in, bool syncdataonly)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));

int r = 0;
std::unique_ptr<C_SaferCond> object_cacher_completion = nullptr;
ceph_tid_t flush_tid = 0;
Expand Down Expand Up @@ -10807,6 +10829,8 @@ int Client::test_dentry_handling(bool can_invalidate)

int Client::_sync_fs()
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));

ldout(cct, 10) << __func__ << dendl;

// flush file data
Expand Down Expand Up @@ -13822,8 +13846,8 @@ int64_t Client::ll_writev(struct Fh *fh, const struct iovec *iov, int iovcnt, in
if (!mref_reader.is_state_satisfied())
return -ENOTCONN;

std::scoped_lock lock(client_lock);
return _preadv_pwritev_locked(fh, iov, iovcnt, off, true, false);
std::unique_lock cl(client_lock);
return _preadv_pwritev_locked(fh, iov, iovcnt, off, true, false, cl);
}

int64_t Client::ll_readv(struct Fh *fh, const struct iovec *iov, int iovcnt, int64_t off)
Expand All @@ -13832,8 +13856,8 @@ int64_t Client::ll_readv(struct Fh *fh, const struct iovec *iov, int iovcnt, int
if (!mref_reader.is_state_satisfied())
return -ENOTCONN;

std::scoped_lock lock(client_lock);
return _preadv_pwritev_locked(fh, iov, iovcnt, off, false, false);
std::unique_lock cl(client_lock);
return _preadv_pwritev_locked(fh, iov, iovcnt, off, false, false, cl);
}

int Client::ll_flush(Fh *fh)
Expand Down Expand Up @@ -13887,6 +13911,8 @@ int Client::ll_sync_inode(Inode *in, bool syncdataonly)

int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));

if (offset < 0 || length <= 0)
return -EINVAL;

Expand Down Expand Up @@ -14598,6 +14624,8 @@ enum {

int Client::check_pool_perm(Inode *in, int need)
{
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));

if (!cct->_conf->client_check_pool_perm)
return 0;

Expand Down
12 changes: 7 additions & 5 deletions src/client/Client.h
Expand Up @@ -852,10 +852,10 @@ class Client : public Dispatcher, public md_config_obs_t {
* Resolve file descriptor, or return NULL.
*/
Fh *get_filehandle(int fd) {
ceph::unordered_map<int, Fh*>::iterator p = fd_map.find(fd);
if (p == fd_map.end())
auto it = fd_map.find(fd);
if (it == fd_map.end())
return NULL;
return p->second;
return it->second;
batrick marked this conversation as resolved.
Show resolved Hide resolved
}

// helpers
Expand Down Expand Up @@ -1234,8 +1234,10 @@ class Client : public Dispatcher, public md_config_obs_t {
int64_t _read(Fh *fh, int64_t offset, uint64_t size, bufferlist *bl);
int64_t _write(Fh *fh, int64_t offset, uint64_t size, const char *buf,
const struct iovec *iov, int iovcnt);
int64_t _preadv_pwritev_locked(Fh *f, const struct iovec *iov,
unsigned iovcnt, int64_t offset, bool write, bool clamp_to_int);
int64_t _preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
unsigned iovcnt, int64_t offset,
bool write, bool clamp_to_int,
std::unique_lock<ceph::mutex> &cl);
int _preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt, int64_t offset, bool write);
int _flush(Fh *fh);
int _fsync(Fh *fh, bool syncdataonly);
Expand Down