Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pacific: mds: md_log_replay thread blocks waiting to be woken up #49671

Merged
merged 1 commit into from Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 2 additions & 18 deletions src/mds/MDLog.cc
Expand Up @@ -1158,17 +1158,7 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa
// state doesn't change in between.
uint32_t events_transcribed = 0;
while (1) {
while (!old_journal->is_readable() &&
old_journal->get_read_pos() < old_journal->get_write_pos() &&
!old_journal->get_error()) {

// Issue a journal prefetch
C_SaferCond readable_waiter;
old_journal->wait_for_readable(&readable_waiter);

// Wait for a journal prefetch to complete
readable_waiter.wait();
}
old_journal->check_isreadable();
if (old_journal->get_error()) {
r = old_journal->get_error();
dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
Expand Down Expand Up @@ -1308,13 +1298,7 @@ void MDLog::_replay_thread()
int r = 0;
while (1) {
// wait for read?
while (!journaler->is_readable() &&
journaler->get_read_pos() < journaler->get_write_pos() &&
!journaler->get_error()) {
C_SaferCond readable_waiter;
journaler->wait_for_readable(&readable_waiter);
r = readable_waiter.wait();
}
journaler->check_isreadable();
if (journaler->get_error()) {
r = journaler->get_error();
dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
Expand Down
41 changes: 32 additions & 9 deletions src/osdc/Journaler.cc
Expand Up @@ -990,7 +990,7 @@ void Journaler::_assimilate_prefetch()

// Update readability (this will also hit any decode errors resulting
// from bad data)
readable = _is_readable();
readable = _have_next_entry();
}

if ((got_any && !was_readable && readable) || read_pos == write_pos) {
Expand Down Expand Up @@ -1114,9 +1114,9 @@ void Journaler::_prefetch()


/*
* _is_readable() - return true if next entry is ready.
* _have_next_entry() - return true if next entry is ready.
*/
bool Journaler::_is_readable()
bool Journaler::_have_next_entry()
{
// anything to read?
if (read_pos == write_pos)
Expand All @@ -1128,13 +1128,13 @@ bool Journaler::_is_readable()
return true;
}

ldout (cct, 10) << "_is_readable read_buf.length() == " << read_buf.length()
ldout (cct, 10) << "_have_next_entry read_buf.length() == " << read_buf.length()
<< ", but need " << need << " for next entry; fetch_len is "
<< fetch_len << dendl;

// partial fragment at the end?
if (received_pos == write_pos) {
ldout(cct, 10) << "is_readable() detected partial entry at tail, "
ldout(cct, 10) << "_have_next_entry() detected partial entry at tail, "
"adjusting write_pos to " << read_pos << dendl;

// adjust write_pos
Expand All @@ -1153,11 +1153,11 @@ bool Journaler::_is_readable()

if (need > fetch_len) {
temp_fetch_len = need;
ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
ldout(cct, 10) << "_have_next_entry noting temp_fetch_len " << temp_fetch_len
<< dendl;
}

ldout(cct, 10) << "_is_readable: not readable, returning false" << dendl;
ldout(cct, 10) << "_have_next_entry: not readable, returning false" << dendl;
return false;
}

Expand All @@ -1167,7 +1167,11 @@ bool Journaler::_is_readable()
bool Journaler::is_readable()
{
lock_guard l(lock);
return _is_readable();
}

bool Journaler::_is_readable()
{
if (error != 0) {
return false;
}
Expand Down Expand Up @@ -1263,9 +1267,9 @@ bool Journaler::try_read_entry(bufferlist& bl)
read_pos += consumed;
try {
// We were readable, we might not be any more
readable = _is_readable();
readable = _have_next_entry();
} catch (const buffer::error &e) {
lderr(cct) << __func__ << ": decode error from _is_readable" << dendl;
lderr(cct) << __func__ << ": decode error from _have_next_entry" << dendl;
error = -EINVAL;
return false;
}
Expand All @@ -1285,6 +1289,11 @@ bool Journaler::try_read_entry(bufferlist& bl)
void Journaler::wait_for_readable(Context *onreadable)
{
lock_guard l(lock);
_wait_for_readable(onreadable);
}

void Journaler::_wait_for_readable(Context *onreadable)
{
if (is_stopping()) {
finisher->queue(onreadable, -EAGAIN);
return;
Expand Down Expand Up @@ -1605,3 +1614,17 @@ void Journaler::shutdown()
waitfor_safe.clear();
}

void Journaler::check_isreadable()
{
std::unique_lock l(lock);
while (!_is_readable() &&
get_read_pos() < get_write_pos() &&
!get_error()) {
C_SaferCond readable_waiter;
_wait_for_readable(&readable_waiter);
l.unlock();
readable_waiter.wait();
l.lock();
}
return ;
}
5 changes: 4 additions & 1 deletion src/osdc/Journaler.h
Expand Up @@ -383,7 +383,7 @@ class Journaler {
*/
void handle_write_error(int r);

bool _is_readable();
bool _have_next_entry();

void _finish_erase(int data_result, C_OnFinisher *completion);
class C_EraseFinish;
Expand Down Expand Up @@ -459,6 +459,7 @@ class Journaler {
void wait_for_flush(Context *onsafe = 0);
void flush(Context *onsafe = 0);
void wait_for_readable(Context *onfinish);
void _wait_for_readable(Context *onfinish);
bool have_waiter() const;
void wait_for_prezero(Context *onfinish);

Expand Down Expand Up @@ -527,6 +528,7 @@ class Journaler {
int get_error() { return error; }
bool is_readonly() { return readonly; }
bool is_readable();
bool _is_readable();
bool try_read_entry(bufferlist& bl);
uint64_t get_write_pos() const { return write_pos; }
uint64_t get_write_safe_pos() const { return safe_pos; }
Expand All @@ -536,6 +538,7 @@ class Journaler {
size_t get_journal_envelope_size() const {
return journal_stream.get_envelope_size();
}
void check_isreadable();
};
WRITE_CLASS_ENCODER(Journaler::Header)

Expand Down