Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

librbd: journal replay needs to support re-executing maintenance ops #7785

Merged
merged 5 commits into from Mar 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/journal/JournalMetadata.cc
Expand Up @@ -504,12 +504,16 @@ void JournalMetadata::flush_commit_position(Context *on_safe) {
Mutex::Locker locker(m_lock);
if (m_commit_position_ctx == nullptr) {
// nothing to flush
m_finisher->queue(on_safe, 0);
if (on_safe != nullptr) {
m_finisher->queue(on_safe, 0);
}
return;
}

m_commit_position_ctx = new C_FlushCommitPosition(
m_commit_position_ctx, on_safe);
if (on_safe != nullptr) {
m_commit_position_ctx = new C_FlushCommitPosition(
m_commit_position_ctx, on_safe);
}
cancel_commit_task();
handle_commit_position_task();
}
Expand Down
40 changes: 27 additions & 13 deletions src/librbd/Journal.cc
Expand Up @@ -407,10 +407,14 @@ void Journal<I>::append_op_event(uint64_t op_tid,

// TODO: use allocated tag_id
future = m_journaler->append(0, bl);

// delay committing op event to ensure consistent replay
assert(m_op_futures.count(op_tid) == 0);
m_op_futures[op_tid] = future;
}

on_safe = create_async_context_callback(m_image_ctx, on_safe);
future.flush(new C_OpEventSafe(this, op_tid, future, on_safe));
future.flush(on_safe);

CephContext *cct = m_image_ctx.cct;
ldout(cct, 10) << this << " " << __func__ << ": "
Expand All @@ -429,16 +433,24 @@ void Journal<I>::commit_op_event(uint64_t op_tid, int r) {
bufferlist bl;
::encode(event_entry, bl);

Future future;
Future op_start_future;
Future op_finish_future;
{
Mutex::Locker locker(m_lock);
assert(m_state == STATE_READY);

// ready to commit op event
auto it = m_op_futures.find(op_tid);
assert(it != m_op_futures.end());
op_start_future = it->second;
m_op_futures.erase(it);

// TODO: use allocated tag_id
future = m_journaler->append(0, bl);
op_finish_future = m_journaler->append(0, bl);
}

future.flush(new C_OpEventSafe(this, op_tid, future, nullptr));
op_finish_future.flush(new C_OpEventSafe(this, op_tid, op_start_future,
op_finish_future));
}

template <typename I>
Expand Down Expand Up @@ -675,13 +687,13 @@ void Journal<I>::handle_replay_complete(int r) {
transition_state(STATE_FLUSHING_RESTART, r);
m_lock.Unlock();

m_journal_replay->shut_down(create_context_callback<
m_journal_replay->shut_down(true, create_context_callback<
Journal<I>, &Journal<I>::handle_flushing_restart>(this));
} else {
transition_state(STATE_FLUSHING_REPLAY, 0);
m_lock.Unlock();

m_journal_replay->shut_down(create_context_callback<
m_journal_replay->shut_down(false, create_context_callback<
Journal<I>, &Journal<I>::handle_flushing_replay>(this));
}
}
Expand Down Expand Up @@ -711,7 +723,7 @@ void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
m_journaler->stop_replay();
transition_state(STATE_FLUSHING_RESTART, r);

m_journal_replay->shut_down(create_context_callback<
m_journal_replay->shut_down(true, create_context_callback<
Journal<I>, &Journal<I>::handle_flushing_restart>(this));
return;
} else if (m_state == STATE_FLUSHING_REPLAY) {
Expand Down Expand Up @@ -866,8 +878,9 @@ void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
}

template <typename I>
void Journal<I>::handle_op_event_safe(int r, uint64_t tid, const Future &future,
Context *on_safe) {
void Journal<I>::handle_op_event_safe(int r, uint64_t tid,
const Future &op_start_future,
const Future &op_finish_future) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
<< "tid=" << tid << dendl;
Expand All @@ -878,10 +891,11 @@ void Journal<I>::handle_op_event_safe(int r, uint64_t tid, const Future &future,
lderr(cct) << "failed to commit op event: " << cpp_strerror(r) << dendl;
}

m_journaler->committed(future);
if (on_safe != nullptr) {
on_safe->complete(r);
}
m_journaler->committed(op_start_future);
m_journaler->committed(op_finish_future);

// reduce the replay window after committing an op event
m_journaler->flush_commit_position(nullptr);
}

template <typename I>
Expand Down
23 changes: 13 additions & 10 deletions src/librbd/Journal.h
Expand Up @@ -8,7 +8,6 @@
#include "include/atomic.h"
#include "include/Context.h"
#include "include/interval_set.h"
#include "include/unordered_map.h"
#include "include/rados/librados.hpp"
#include "common/Mutex.h"
#include "journal/Future.h"
Expand All @@ -18,6 +17,7 @@
#include <iosfwd>
#include <list>
#include <string>
#include <unordered_map>

class Context;
namespace journal {
Expand Down Expand Up @@ -173,7 +173,8 @@ class Journal {
}
};

typedef ceph::unordered_map<uint64_t, Event> Events;
typedef std::unordered_map<uint64_t, Event> Events;
typedef std::unordered_map<uint64_t, Future> TidToFutures;

struct C_IOEventSafe : public Context {
Journal *journal;
Expand All @@ -191,16 +192,17 @@ class Journal {
struct C_OpEventSafe : public Context {
Journal *journal;
uint64_t tid;
Future future;
Context *on_safe;
Future op_start_future;
Future op_finish_future;

C_OpEventSafe(Journal *journal, uint64_t tid, const Future &future,
Context *on_safe)
: journal(journal), tid(tid), future(future), on_safe(on_safe) {
C_OpEventSafe(Journal *journal, uint64_t tid, const Future &op_start_future,
const Future &op_finish_future)
: journal(journal), tid(tid), op_start_future(op_start_future),
op_finish_future(op_finish_future) {
}

virtual void finish(int r) {
journal->handle_op_event_safe(r, tid, future, on_safe);
journal->handle_op_event_safe(r, tid, op_start_future, op_finish_future);
}
};

Expand Down Expand Up @@ -251,6 +253,7 @@ class Journal {
Events m_events;

atomic_t m_op_tid;
TidToFutures m_op_futures;

bool m_blocking_writes;

Expand Down Expand Up @@ -279,8 +282,8 @@ class Journal {
void handle_journal_destroyed(int r);

void handle_io_event_safe(int r, uint64_t tid);
void handle_op_event_safe(int r, uint64_t tid, const Future &future,
Context *on_safe);
void handle_op_event_safe(int r, uint64_t tid, const Future &op_start_future,
const Future &op_finish_future);

void stop_recording();

Expand Down