Skip to content

Commit

Permalink
journal: clean up playback notification handling
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
  • Loading branch information
Jason Dillaman committed Mar 8, 2016
1 parent b710374 commit b37f135
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 20 deletions.
51 changes: 31 additions & 20 deletions src/journal/JournalPlayer.cc
Expand Up @@ -150,6 +150,8 @@ void JournalPlayer::unwatch() {
bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
ldout(m_cct, 20) << __func__ << dendl;
Mutex::Locker locker(m_lock);

m_handler_notified = false;
if (m_state != STATE_PLAYBACK) {
return false;
}
Expand All @@ -160,9 +162,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {

if (!verify_playback_ready()) {
if (!m_watch_enabled) {
ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
m_replay_handler), 0);
notify_complete(0);
} else if (!m_watch_scheduled) {
schedule_watch();
}
Expand All @@ -180,17 +180,15 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
lderr(m_cct) << "unexpected tag in journal entry: " << *entry << dendl;

m_state = STATE_ERROR;
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
m_replay_handler), -ENOMSG);
notify_complete(-ENOMSG);
return false;
} else if (m_journal_metadata->get_last_allocated_entry_tid(
entry->get_tag_tid(), &last_entry_tid) &&
entry->get_entry_tid() != last_entry_tid + 1) {
lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;

m_state = STATE_ERROR;
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
m_replay_handler), -ENOMSG);
notify_complete(-ENOMSG);
return false;
}

Expand Down Expand Up @@ -233,8 +231,7 @@ void JournalPlayer::process_state(uint64_t object_number, int r) {

if (r < 0) {
m_state = STATE_ERROR;
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
m_replay_handler), r);
notify_complete(r);
}
}

Expand Down Expand Up @@ -329,16 +326,13 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
if (!is_object_set_ready()) {
ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
} else if (verify_playback_ready()) {
ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
m_replay_handler), 0);
notify_entries_available();
} else if (m_watch_enabled) {
schedule_watch();
} else {
ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
<< dendl;
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
m_replay_handler), 0);
notify_complete(0);
}
return 0;
}
Expand All @@ -353,17 +347,13 @@ int JournalPlayer::process_playback(uint64_t object_number) {

ObjectPlayerPtr object_player = get_object_player();
if (verify_playback_ready()) {
ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
m_replay_handler), 0);
notify_entries_available();
} else if (!m_watch_enabled && is_object_set_ready()) {
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint64_t active_set = m_journal_metadata->get_active_set();
uint64_t object_set = object_player->get_object_number() / splay_width;
if (object_set == active_set) {
ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
m_replay_handler), 0);
notify_complete(0);
}
}
return 0;
Expand Down Expand Up @@ -558,4 +548,25 @@ void JournalPlayer::handle_watch(int r) {
}
}

void JournalPlayer::notify_entries_available() {
assert(m_lock.is_locked());
if (m_handler_notified) {
return;
}
m_handler_notified = true;

ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
m_replay_handler), 0);
}

void JournalPlayer::notify_complete(int r) {
assert(m_lock.is_locked());
m_handler_notified = true;

ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl;
m_journal_metadata->get_finisher().queue(new C_HandleComplete(
m_replay_handler), r);
}

} // namespace journal
5 changes: 5 additions & 0 deletions src/journal/JournalPlayer.h
Expand Up @@ -112,6 +112,8 @@ class JournalPlayer {
bool m_watch_scheduled;
double m_watch_interval;

bool m_handler_notified = false;

PrefetchSplayOffsets m_prefetch_splay_offsets;
SplayedObjectPlayers m_object_players;
uint64_t m_commit_object;
Expand All @@ -136,6 +138,9 @@ class JournalPlayer {

void schedule_watch();
void handle_watch(int r);

void notify_entries_available();
void notify_complete(int r);
};

} // namespace journal
Expand Down

0 comments on commit b37f135

Please sign in to comment.