Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix long stalls when calling ceph_fsync() #11710

Merged
merged 9 commits into from Nov 11, 2016
103 changes: 68 additions & 35 deletions src/client/Client.cc
Expand Up @@ -3054,7 +3054,7 @@ void Client::put_cap_ref(Inode *in, int cap)
++put_nref;
}
if (drop)
check_caps(in, false);
check_caps(in, 0);
if (put_nref)
put_inode(in, put_nref);
}
Expand Down Expand Up @@ -3088,7 +3088,7 @@ int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff)
endoff > (loff_t)in->wanted_max_size) {
ldout(cct, 10) << "wanted_max_size " << in->wanted_max_size << " -> " << endoff << dendl;
in->wanted_max_size = endoff;
check_caps(in, false);
check_caps(in, 0);
}

if (endoff >= 0 && endoff > (loff_t)in->max_size) {
Expand Down Expand Up @@ -3175,8 +3175,8 @@ void Client::cap_delay_requeue(Inode *in)
}

void Client::send_cap(Inode *in, MetaSession *session, Cap *cap,
int used, int want, int retain, int flush,
ceph_tid_t flush_tid)
bool sync, int used, int want, int retain,
int flush, ceph_tid_t flush_tid)
{
int held = cap->issued | cap->implemented;
int revoking = cap->implemented & ~cap->issued;
Expand All @@ -3186,6 +3186,7 @@ void Client::send_cap(Inode *in, MetaSession *session, Cap *cap,

ldout(cct, 10) << "send_cap " << *in
<< " mds." << session->mds_num << " seq " << cap->seq
<< (sync ? " sync " : " async ")
<< " used " << ccap_string(used)
<< " want " << ccap_string(want)
<< " flush " << ccap_string(flush)
Expand Down Expand Up @@ -3261,6 +3262,8 @@ void Client::send_cap(Inode *in, MetaSession *session, Cap *cap,
m->btime = in->btime;
m->time_warp_seq = in->time_warp_seq;
m->change_attr = in->change_attr;
if (sync)
m->flags |= CLIENT_CAPS_SYNC;

if (flush & CEPH_CAP_FILE_WR) {
m->inline_version = in->inline_version;
Expand All @@ -3283,7 +3286,16 @@ void Client::send_cap(Inode *in, MetaSession *session, Cap *cap,
}


void Client::check_caps(Inode *in, bool is_delayed)
/**
* check_caps
*
* Examine currently used and wanted versus held caps. Release, flush or ack
* revoked caps to the MDS as appropriate.
*
* @param in the inode to check
* @param flags flags to apply to cap check
*/
void Client::check_caps(Inode *in, unsigned flags)
{
unsigned wanted = in->caps_wanted();
unsigned used = get_caps_used(in);
Expand Down Expand Up @@ -3313,7 +3325,7 @@ void Client::check_caps(Inode *in, bool is_delayed)
<< " used " << ccap_string(used)
<< " issued " << ccap_string(issued)
<< " revoking " << ccap_string(revoking)
<< " is_delayed=" << is_delayed
<< " flags=" << flags
<< dendl;

if (in->snapid != CEPH_NOSNAP)
Expand All @@ -3329,10 +3341,10 @@ void Client::check_caps(Inode *in, bool is_delayed)
if (!in->cap_snaps.empty())
flush_snaps(in);

if (!is_delayed)
cap_delay_requeue(in);
else
if (flags & CHECK_CAPS_NODELAY)
in->hold_caps_until = utime_t();
else
cap_delay_requeue(in);

utime_t now = ceph_clock_now(cct);

Expand Down Expand Up @@ -3405,7 +3417,7 @@ void Client::check_caps(Inode *in, bool is_delayed)
if (in->cap_snaps.size())
flush_snaps(in, true);
if (in->flushing_caps)
flush_caps(in, session);
flush_caps(in, session, flags & CHECK_CAPS_SYNCHRONOUS);
}

int flushing;
Expand All @@ -3417,7 +3429,8 @@ void Client::check_caps(Inode *in, bool is_delayed)
flush_tid = 0;
}

send_cap(in, session, cap, cap_used, wanted, retain, flushing, flush_tid);
send_cap(in, session, cap, flags & CHECK_CAPS_SYNCHRONOUS, cap_used, wanted,
retain, flushing, flush_tid);
}
}

Expand Down Expand Up @@ -3847,7 +3860,7 @@ void Client::add_update_cap(Inode *in, MetaSession *mds_session, uint64_t cap_id
if (it->second == cap)
continue;
if (it->second->implemented & ~it->second->issued & issued) {
check_caps(in, true);
check_caps(in, CHECK_CAPS_NODELAY);
break;
}
}
Expand Down Expand Up @@ -4100,27 +4113,40 @@ void Client::adjust_session_flushing_caps(Inode *in, MetaSession *old_s, MetaSe
new_s->flushing_caps.push_back(&in->flushing_cap_item);
}

void Client::flush_caps()
/*
* Flush all caps back to the MDS. Because the callers generally wait on the
* result of this function (syncfs and umount cases), we set
* CHECK_CAPS_SYNCHRONOUS on the last check_caps call.
*/
void Client::flush_caps_sync()
{
ldout(cct, 10) << "flush_caps" << dendl;
ldout(cct, 10) << __func__ << dendl;
xlist<Inode*>::iterator p = delayed_caps.begin();
while (!p.end()) {
unsigned flags = CHECK_CAPS_NODELAY;
Inode *in = *p;

++p;
delayed_caps.pop_front();
check_caps(in, true);
if (p.end() && cap_list.empty())
flags |= CHECK_CAPS_SYNCHRONOUS;
check_caps(in, flags);
}

// other caps, too
p = cap_list.begin();
while (!p.end()) {
unsigned flags = CHECK_CAPS_NODELAY;
Inode *in = *p;

++p;
check_caps(in, true);
if (p.end())
flags |= CHECK_CAPS_SYNCHRONOUS;
check_caps(in, flags);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're here forcing the caps immediately to disk on any invocation of flush_caps(). Can we rename the function to indicate that behavior?
Since we obviously call the per-file/per-session flush_caps() in lots of places where we don't want to force it instantly to disk.

Copy link
Contributor Author

@jtlayton jtlayton Nov 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, this is not the per-file/per-session flush_caps, -- this is the flush_caps function with no argument that flushes all caps back to the mds for unmount or syncfs. Can we rename this one to distinguish it from the other? Absolutely. I'm build-testing a patch for that now.

Build worked, so I went ahead and pushed the patch onto the pile.

}
}

void Client::flush_caps(Inode *in, MetaSession *session)
void Client::flush_caps(Inode *in, MetaSession *session, bool sync)
{
ldout(cct, 10) << "flush_caps " << in << " mds." << session->mds_num << dendl;
Cap *cap = in->auth_cap;
Expand All @@ -4129,7 +4155,14 @@ void Client::flush_caps(Inode *in, MetaSession *session)
for (map<ceph_tid_t,int>::iterator p = in->flushing_cap_tids.begin();
p != in->flushing_cap_tids.end();
++p) {
send_cap(in, session, cap, (get_caps_used(in) | in->caps_dirty()),
bool req_sync = false;

/* If this is a synchronous request, then flush the journal on last one */
if (sync && (p->first == in->flushing_cap_tids.rbegin()->first))
req_sync = true;

send_cap(in, session, cap, req_sync,
(get_caps_used(in) | in->caps_dirty()),
in->caps_wanted(), (cap->issued | cap->implemented),
p->second, p->first);
}
Expand Down Expand Up @@ -4985,7 +5018,7 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient
}

if (check)
check_caps(in, false);
check_caps(in, 0);

// wake up waiters
if (new_caps)
Expand Down Expand Up @@ -5745,7 +5778,7 @@ void Client::unmount()
}
}

flush_caps();
flush_caps_sync();
wait_sync_caps(last_flush_tid);

// empty lru cache
Expand Down Expand Up @@ -5860,7 +5893,7 @@ void Client::tick()
break;
delayed_caps.pop_front();
cap_list.push_back(&in->cap_item);
check_caps(in, true);
check_caps(in, CHECK_CAPS_NODELAY);
}

trim_cache(true);
Expand Down Expand Up @@ -8035,7 +8068,7 @@ int Client::_release_fh(Fh *f)
if (in->snapid == CEPH_NOSNAP) {
if (in->put_open_ref(f->mode)) {
_flush(in, new C_Client_FlushComplete(this, in));
check_caps(in, false);
check_caps(in, 0);
}
} else {
assert(in->snap_cap_refs > 0);
Expand Down Expand Up @@ -8084,7 +8117,7 @@ int Client::_open(Inode *in, int flags, mode_t mode, Fh **fhp,
if ((flags & O_TRUNC) == 0 &&
in->caps_issued_mask(want)) {
// update wanted?
check_caps(in, true);
check_caps(in, CHECK_CAPS_NODELAY);
} else {
MetaRequest *req = new MetaRequest(CEPH_MDS_OP_OPEN);
filepath path;
Expand Down Expand Up @@ -8120,7 +8153,7 @@ int Client::_renew_caps(Inode *in)
int wanted = in->caps_file_wanted();
if (in->is_any_caps() &&
((wanted & CEPH_CAP_ANY_WR) == 0 || in->auth_cap)) {
check_caps(in, true);
check_caps(in, CHECK_CAPS_NODELAY);
return 0;
}

Expand Down Expand Up @@ -8453,7 +8486,7 @@ int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
in->inline_data.clear();
in->inline_version = CEPH_INLINE_NONE;
mark_caps_dirty(in, CEPH_CAP_FILE_WR);
check_caps(in, false);
check_caps(in, 0);
} else
r = uninline_ret;
}
Expand Down Expand Up @@ -8914,11 +8947,11 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
mark_caps_dirty(in, CEPH_CAP_FILE_WR);

if (is_quota_bytes_approaching(in, f->actor_perms)) {
check_caps(in, true);
check_caps(in, CHECK_CAPS_NODELAY);
} else {
if ((in->size << 1) >= in->max_size &&
(in->reported_size << 1) < in->max_size)
check_caps(in, false);
check_caps(in, 0);
}

ldout(cct, 7) << "wrote to " << totalwritten+offset << ", extending file size" << dendl;
Expand All @@ -8945,7 +8978,7 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
in->inline_data.clear();
in->inline_version = CEPH_INLINE_NONE;
mark_caps_dirty(in, CEPH_CAP_FILE_WR);
check_caps(in, false);
check_caps(in, 0);
} else
r = uninline_ret;
}
Expand Down Expand Up @@ -9033,8 +9066,8 @@ int Client::_fsync(Inode *in, bool syncdataonly)
ldout(cct, 15) << "using return-valued form of _fsync" << dendl;
}

if (!syncdataonly && (in->dirty_caps & ~CEPH_CAP_ANY_FILE_WR)) {
check_caps(in, true);
if (!syncdataonly && in->dirty_caps) {
check_caps(in, CHECK_CAPS_NODELAY|CHECK_CAPS_SYNCHRONOUS);
if (in->flushing_caps)
flush_tid = last_flush_tid;
} else ldout(cct, 10) << "no metadata needs to commit" << dendl;
Expand Down Expand Up @@ -9642,7 +9675,7 @@ int Client::_sync_fs()
flush_done = true;

// flush caps
flush_caps();
flush_caps_sync();
ceph_tid_t flush_tid = last_flush_tid;

// wait for unsafe mds requests
Expand Down Expand Up @@ -9705,7 +9738,7 @@ int Client::lazyio_synchronize(int fd, loff_t offset, size_t count)

_fsync(f, true);
if (_release(in))
check_caps(in, false);
check_caps(in, 0);
return 0;
}

Expand Down Expand Up @@ -12262,11 +12295,11 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
mark_caps_dirty(in, CEPH_CAP_FILE_WR);

if (is_quota_bytes_approaching(in, fh->actor_perms)) {
check_caps(in, true);
check_caps(in, CHECK_CAPS_NODELAY);
} else {
if ((in->size << 1) >= in->max_size &&
(in->reported_size << 1) < in->max_size)
check_caps(in, false);
check_caps(in, 0);
}
}
}
Expand All @@ -12283,7 +12316,7 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
in->inline_data.clear();
in->inline_version = CEPH_INLINE_NONE;
mark_caps_dirty(in, CEPH_CAP_FILE_WR);
check_caps(in, false);
check_caps(in, 0);
} else
r = uninline_ret;
}
Expand Down
13 changes: 9 additions & 4 deletions src/client/Client.h
Expand Up @@ -627,8 +627,8 @@ class Client : public Dispatcher, public md_config_obs_t {
void mark_caps_dirty(Inode *in, int caps);
int mark_caps_flushing(Inode *in, ceph_tid_t *ptid);
void adjust_session_flushing_caps(Inode *in, MetaSession *old_s, MetaSession *new_s);
void flush_caps();
void flush_caps(Inode *in, MetaSession *session);
void flush_caps_sync();
void flush_caps(Inode *in, MetaSession *session, bool sync=false);
void kick_flushing_caps(MetaSession *session);
void early_kick_flushing_caps(MetaSession *session);
void kick_maxsize_requests(MetaSession *session);
Expand All @@ -648,10 +648,15 @@ class Client : public Dispatcher, public md_config_obs_t {
void handle_cap_flushsnap_ack(MetaSession *session, Inode *in, class MClientCaps *m);
void handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, class MClientCaps *m);
void cap_delay_requeue(Inode *in);
void send_cap(Inode *in, MetaSession *session, Cap *cap,
void send_cap(Inode *in, MetaSession *session, Cap *cap, bool sync,
int used, int want, int retain, int flush,
ceph_tid_t flush_tid);
void check_caps(Inode *in, bool is_delayed);

/* Flags for check_caps() */
#define CHECK_CAPS_NODELAY (0x1)
#define CHECK_CAPS_SYNCHRONOUS (0x2)

void check_caps(Inode *in, unsigned flags);
void get_cap_ref(Inode *in, int cap);
void put_cap_ref(Inode *in, int cap);
void flush_snaps(Inode *in, bool all_again=false);
Expand Down