Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

osdc/Journaler: avoid executing on_safe contexts prematurely #15240

Merged
merged 2 commits into from Jun 1, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 28 additions & 3 deletions src/osdc/Journaler.cc
Expand Up @@ -568,8 +568,12 @@ uint64_t Journaler::append_entry(bufferlist& bl)
ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro "
<< write_obj << " flo " << flush_obj << ")" << dendl;
_do_flush(write_buf.length() - write_off);
if (write_off) {
// current entry isn't being flushed, set next_safe_pos to the end of previous entry

// if _do_flush() skips flushing some data, it does do a best effort to
// update next_safe_pos.
if (write_buf.length() > 0 &&
write_buf.length() <= wrote) { // the unflushed data are within this entry
// set next_safe_pos to end of previous entry
next_safe_pos = write_pos - wrote;
}
}
Expand Down Expand Up @@ -634,6 +638,17 @@ void Journaler::_do_flush(unsigned amount)
next_safe_pos = write_pos;
} else {
write_buf.splice(0, len, &write_bl);
// Keys of waitfor_safe map are journal entry boundaries.
// Try finding a journal entry that we are actually flushing
// and set next_safe_pos to end of it. This is best effort.
// The one we found may not be the lastest flushing entry.
auto p = waitfor_safe.lower_bound(flush_pos + len);
if (p != waitfor_safe.end()) {
if (p->first > flush_pos + len && p != waitfor_safe.begin())
--p;
if (p->first <= flush_pos + len && p->first > next_safe_pos)
next_safe_pos = p->first;
}
}

filer.write(ino, &layout, snapc,
Expand Down Expand Up @@ -952,7 +967,16 @@ void Journaler::_issue_read(uint64_t len)
if (pending_safe.empty()) {
_flush(NULL);
}
waitfor_safe[flush_pos].push_back(new C_RetryRead(this));

// Make sure keys of waitfor_safe map are journal entry boundaries.
// The key we used here is either next_safe_pos or old value of
// next_safe_pos. next_safe_pos is always set to journal entry
// boundary.
auto p = pending_safe.rbegin();
if (p != pending_safe.rend())
waitfor_safe[p->second].push_back(new C_RetryRead(this));
else
waitfor_safe[next_safe_pos].push_back(new C_RetryRead(this));
return;
}

Expand Down Expand Up @@ -1062,6 +1086,7 @@ bool Journaler::_is_readable()
// adjust write_pos
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
assert(write_buf.length() == 0);
assert(waitfor_safe.empty());

// reset read state
requested_pos = received_pos = read_pos;
Expand Down