Skip to content

Commit

Permalink
Merge pull request #7785 from dillaman/wip-14822
Browse files Browse the repository at this point in the history
librbd: journal replay needs to support re-executing maintenance ops

Reviewed-by: Josh Durgin <jdurgin@redhat.com>
  • Loading branch information
jdurgin committed Mar 2, 2016
2 parents 939984f + e4f73b3 commit 928c2e4
Show file tree
Hide file tree
Showing 9 changed files with 530 additions and 80 deletions.
10 changes: 7 additions & 3 deletions src/journal/JournalMetadata.cc
Expand Up @@ -475,12 +475,16 @@ void JournalMetadata::flush_commit_position(Context *on_safe) {
Mutex::Locker locker(m_lock);
if (m_commit_position_ctx == nullptr) {
// nothing to flush
m_finisher->queue(on_safe, 0);
if (on_safe != nullptr) {
m_finisher->queue(on_safe, 0);
}
return;
}

m_commit_position_ctx = new C_FlushCommitPosition(
m_commit_position_ctx, on_safe);
if (on_safe != nullptr) {
m_commit_position_ctx = new C_FlushCommitPosition(
m_commit_position_ctx, on_safe);
}
cancel_commit_task();
handle_commit_position_task();
}
Expand Down
40 changes: 27 additions & 13 deletions src/librbd/Journal.cc
Expand Up @@ -407,10 +407,14 @@ void Journal<I>::append_op_event(uint64_t op_tid,

// TODO: use allocated tag_id
future = m_journaler->append(0, bl);

// delay committing op event to ensure consistent replay
assert(m_op_futures.count(op_tid) == 0);
m_op_futures[op_tid] = future;
}

on_safe = create_async_context_callback(m_image_ctx, on_safe);
future.flush(new C_OpEventSafe(this, op_tid, future, on_safe));
future.flush(on_safe);

CephContext *cct = m_image_ctx.cct;
ldout(cct, 10) << this << " " << __func__ << ": "
Expand All @@ -429,16 +433,24 @@ void Journal<I>::commit_op_event(uint64_t op_tid, int r) {
bufferlist bl;
::encode(event_entry, bl);

Future future;
Future op_start_future;
Future op_finish_future;
{
Mutex::Locker locker(m_lock);
assert(m_state == STATE_READY);

// ready to commit op event
auto it = m_op_futures.find(op_tid);
assert(it != m_op_futures.end());
op_start_future = it->second;
m_op_futures.erase(it);

// TODO: use allocated tag_id
future = m_journaler->append(0, bl);
op_finish_future = m_journaler->append(0, bl);
}

future.flush(new C_OpEventSafe(this, op_tid, future, nullptr));
op_finish_future.flush(new C_OpEventSafe(this, op_tid, op_start_future,
op_finish_future));
}

template <typename I>
Expand Down Expand Up @@ -675,13 +687,13 @@ void Journal<I>::handle_replay_complete(int r) {
transition_state(STATE_FLUSHING_RESTART, r);
m_lock.Unlock();

m_journal_replay->shut_down(create_context_callback<
m_journal_replay->shut_down(true, create_context_callback<
Journal<I>, &Journal<I>::handle_flushing_restart>(this));
} else {
transition_state(STATE_FLUSHING_REPLAY, 0);
m_lock.Unlock();

m_journal_replay->shut_down(create_context_callback<
m_journal_replay->shut_down(false, create_context_callback<
Journal<I>, &Journal<I>::handle_flushing_replay>(this));
}
}
Expand Down Expand Up @@ -711,7 +723,7 @@ void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
m_journaler->stop_replay();
transition_state(STATE_FLUSHING_RESTART, r);

m_journal_replay->shut_down(create_context_callback<
m_journal_replay->shut_down(true, create_context_callback<
Journal<I>, &Journal<I>::handle_flushing_restart>(this));
return;
} else if (m_state == STATE_FLUSHING_REPLAY) {
Expand Down Expand Up @@ -866,8 +878,9 @@ void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
}

template <typename I>
void Journal<I>::handle_op_event_safe(int r, uint64_t tid, const Future &future,
Context *on_safe) {
void Journal<I>::handle_op_event_safe(int r, uint64_t tid,
const Future &op_start_future,
const Future &op_finish_future) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
<< "tid=" << tid << dendl;
Expand All @@ -878,10 +891,11 @@ void Journal<I>::handle_op_event_safe(int r, uint64_t tid, const Future &future,
lderr(cct) << "failed to commit op event: " << cpp_strerror(r) << dendl;
}

m_journaler->committed(future);
if (on_safe != nullptr) {
on_safe->complete(r);
}
m_journaler->committed(op_start_future);
m_journaler->committed(op_finish_future);

// reduce the replay window after committing an op event
m_journaler->flush_commit_position(nullptr);
}

template <typename I>
Expand Down
23 changes: 13 additions & 10 deletions src/librbd/Journal.h
Expand Up @@ -8,7 +8,6 @@
#include "include/atomic.h"
#include "include/Context.h"
#include "include/interval_set.h"
#include "include/unordered_map.h"
#include "include/rados/librados.hpp"
#include "common/Mutex.h"
#include "journal/Future.h"
Expand All @@ -18,6 +17,7 @@
#include <iosfwd>
#include <list>
#include <string>
#include <unordered_map>

class Context;
namespace journal {
Expand Down Expand Up @@ -173,7 +173,8 @@ class Journal {
}
};

typedef ceph::unordered_map<uint64_t, Event> Events;
typedef std::unordered_map<uint64_t, Event> Events;
typedef std::unordered_map<uint64_t, Future> TidToFutures;

struct C_IOEventSafe : public Context {
Journal *journal;
Expand All @@ -191,16 +192,17 @@ class Journal {
struct C_OpEventSafe : public Context {
Journal *journal;
uint64_t tid;
Future future;
Context *on_safe;
Future op_start_future;
Future op_finish_future;

C_OpEventSafe(Journal *journal, uint64_t tid, const Future &future,
Context *on_safe)
: journal(journal), tid(tid), future(future), on_safe(on_safe) {
C_OpEventSafe(Journal *journal, uint64_t tid, const Future &op_start_future,
const Future &op_finish_future)
: journal(journal), tid(tid), op_start_future(op_start_future),
op_finish_future(op_finish_future) {
}

virtual void finish(int r) {
journal->handle_op_event_safe(r, tid, future, on_safe);
journal->handle_op_event_safe(r, tid, op_start_future, op_finish_future);
}
};

Expand Down Expand Up @@ -251,6 +253,7 @@ class Journal {
Events m_events;

atomic_t m_op_tid;
TidToFutures m_op_futures;

bool m_blocking_writes;

Expand Down Expand Up @@ -279,8 +282,8 @@ class Journal {
void handle_journal_destroyed(int r);

void handle_io_event_safe(int r, uint64_t tid);
void handle_op_event_safe(int r, uint64_t tid, const Future &future,
Context *on_safe);
void handle_op_event_safe(int r, uint64_t tid, const Future &op_start_future,
const Future &op_finish_future);

void stop_recording();

Expand Down

0 comments on commit 928c2e4

Please sign in to comment.