Skip to content

Commit

Permalink
Merge pull request #11326 from dillaman/wip-17416
Browse files Browse the repository at this point in the history
rbd-mirror: improve resiliency of stress test case

Reviewed-by: Mykola Golub <mgolub@mirantis.com>
  • Loading branch information
Mykola Golub committed Oct 6, 2016
2 parents 26f7267 + a6dd6b5 commit 5a98a8c
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 70 deletions.
2 changes: 1 addition & 1 deletion qa/workunits/rbd/rbd_mirror_helpers.sh
Expand Up @@ -671,7 +671,7 @@ stress_write_image()
local cluster=$1
local pool=$2
local image=$3
local duration=$(awk 'BEGIN {srand(); print int(35 * rand()) + 15}')
local duration=$(awk 'BEGIN {srand(); print int(10 * rand()) + 5}')

timeout ${duration}s ceph_test_rbd_mirror_random_write \
--cluster ${cluster} ${pool} ${image} \
Expand Down
8 changes: 2 additions & 6 deletions src/journal/JournalRecorder.cc
Expand Up @@ -105,10 +105,7 @@ Future JournalRecorder::append(uint64_t tag_tid,
entry_bl);
assert(entry_bl.length() <= m_journal_metadata->get_object_size());

AppendBuffers append_buffers;
append_buffers.push_back(std::make_pair(future, entry_bl));
bool object_full = object_ptr->append_unlock(append_buffers);

bool object_full = object_ptr->append_unlock({{future, entry_bl}});
if (object_full) {
ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
<< dendl;
Expand Down Expand Up @@ -284,8 +281,7 @@ void JournalRecorder::create_next_object_recorder_unlock(
new_object_recorder->get_object_number());
}

new_object_recorder->append_unlock(append_buffers);

new_object_recorder->append_unlock(std::move(append_buffers));
m_object_ptrs[splay_offset] = new_object_recorder;
}

Expand Down
106 changes: 68 additions & 38 deletions src/journal/ObjectRecorder.cc
Expand Up @@ -44,12 +44,12 @@ ObjectRecorder::~ObjectRecorder() {
assert(!m_aio_scheduled);
}

bool ObjectRecorder::append_unlock(const AppendBuffers &append_buffers) {
bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) {
assert(m_lock->is_locked());

FutureImplPtr last_flushed_future;
bool schedule_append = false;

if (m_overflowed) {
m_append_buffers.insert(m_append_buffers.end(),
append_buffers.begin(), append_buffers.end());
Expand Down Expand Up @@ -132,12 +132,16 @@ void ObjectRecorder::flush(const FutureImplPtr &future) {
return;
}

AppendBuffers::iterator it;
for (it = m_append_buffers.begin(); it != m_append_buffers.end(); ++it) {
if (it->first == future) {
AppendBuffers::reverse_iterator r_it;
for (r_it = m_append_buffers.rbegin(); r_it != m_append_buffers.rend();
++r_it) {
if (r_it->first == future) {
break;
}
}
assert(r_it != m_append_buffers.rend());

auto it = (++r_it).base();
assert(it != m_append_buffers.end());
++it;

Expand Down Expand Up @@ -241,7 +245,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {

AppendBuffers append_buffers;
{
Mutex::Locker locker(*m_lock);
m_lock->Lock();
auto tid_iter = m_in_flight_tids.find(tid);
assert(tid_iter != m_in_flight_tids.end());
m_in_flight_tids.erase(tid_iter);
Expand All @@ -250,15 +254,17 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
if (r == -EOVERFLOW || m_overflowed) {
if (iter != m_in_flight_appends.end()) {
m_overflowed = true;
append_overflowed(tid);
} else {
// must have seen an overflow on a previous append op
assert(r == -EOVERFLOW && m_overflowed);
}

// notify of overflow once all in-flight ops are complete
if (m_in_flight_tids.empty() && !m_aio_scheduled) {
notify_handler();
append_overflowed();
notify_handler_unlock();
} else {
m_lock->Unlock();
}
return;
}
Expand All @@ -268,11 +274,8 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
assert(!append_buffers.empty());

m_in_flight_appends.erase(iter);
if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
// all remaining unsent appends should be redirected to new object
notify_handler();
}
m_in_flight_flushes = true;
m_lock->Unlock();
}

// Flag the associated futures as complete.
Expand All @@ -284,18 +287,24 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
}

// wake up any flush requests that raced with a RADOS callback
Mutex::Locker locker(*m_lock);
m_lock->Lock();
m_in_flight_flushes = false;
m_in_flight_flushes_cond.Signal();

if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) {
// all remaining unsent appends should be redirected to new object
notify_handler_unlock();
} else {
m_lock->Unlock();
}
}

void ObjectRecorder::append_overflowed(uint64_t tid) {
void ObjectRecorder::append_overflowed() {
ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed"
<< dendl;

assert(m_lock->is_locked());
assert(!m_in_flight_appends.empty());
assert(m_in_flight_appends.begin()->first == tid);

cancel_append_task();

Expand All @@ -314,6 +323,13 @@ void ObjectRecorder::append_overflowed(uint64_t tid) {
m_append_buffers.begin(),
m_append_buffers.end());
restart_append_buffers.swap(m_append_buffers);

for (AppendBuffers::const_iterator it = m_append_buffers.begin();
it != m_append_buffers.end(); ++it) {
ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first
<< dendl;
it->first->detach();
}
}

void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
Expand All @@ -339,58 +355,72 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
}

void ObjectRecorder::send_appends_aio() {
Mutex::Locker locker(*m_lock);

m_aio_scheduled = false;
AppendBuffers *append_buffers;
uint64_t append_tid;
{
Mutex::Locker locker(*m_lock);
append_tid = m_append_tid++;
m_in_flight_tids.insert(append_tid);

AppendBuffers append_buffers;
m_pending_buffers.swap(append_buffers);
// safe to hold pointer outside lock until op is submitted
append_buffers = &m_in_flight_appends[append_tid];
append_buffers->swap(m_pending_buffers);
}

uint64_t append_tid = m_append_tid++;
ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
<< append_tid << dendl;
C_AppendFlush *append_flush = new C_AppendFlush(this, append_tid);
C_Gather *gather_ctx = new C_Gather(m_cct, append_flush);

librados::ObjectWriteOperation op;
client::guard_append(&op, m_soft_max_size);

for (AppendBuffers::iterator it = append_buffers.begin();
it != append_buffers.end(); ++it) {
for (AppendBuffers::iterator it = append_buffers->begin();
it != append_buffers->end(); ++it) {
ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
<< dendl;
op.append(it->second);
op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
}
m_in_flight_tids.insert(append_tid);
m_in_flight_appends[append_tid].swap(append_buffers);

librados::AioCompletion *rados_completion =
librados::Rados::aio_create_completion(append_flush, NULL,
librados::Rados::aio_create_completion(gather_ctx->new_sub(), nullptr,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
assert(r == 0);
rados_completion->release();
}

void ObjectRecorder::notify_handler() {
assert(m_lock->is_locked());

for (AppendBuffers::const_iterator it = m_append_buffers.begin();
it != m_append_buffers.end(); ++it) {
ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first
<< dendl;
it->first->detach();
{
m_lock->Lock();
if (m_pending_buffers.empty()) {
m_aio_scheduled = false;
if (m_in_flight_appends.empty() && m_object_closed) {
// all remaining unsent appends should be redirected to new object
notify_handler_unlock();
} else {
m_lock->Unlock();
}
} else {
// additional pending items -- reschedule
m_op_work_queue->queue(new FunctionContext([this] (int r) {
send_appends_aio();
}));
m_lock->Unlock();
}
}

// allow append op to complete
gather_ctx->activate();
}

void ObjectRecorder::notify_handler_unlock() {
assert(m_lock->is_locked());
if (m_object_closed) {
m_lock->Unlock();
m_handler->closed(this);
m_lock->Lock();
} else {
// TODO need to delay completion until after aio_notify completes
m_lock->Unlock();
m_handler->overflow(this);
m_lock->Lock();
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/journal/ObjectRecorder.h
Expand Up @@ -51,7 +51,7 @@ class ObjectRecorder : public RefCountedObject, boost::noncopyable {
return m_oid;
}

bool append_unlock(const AppendBuffers &append_buffers);
bool append_unlock(AppendBuffers &&append_buffers);
void flush(Context *on_safe);
void flush(const FutureImplPtr &future);

Expand Down Expand Up @@ -160,11 +160,11 @@ class ObjectRecorder : public RefCountedObject, boost::noncopyable {
bool append(const AppendBuffer &append_buffer, bool *schedule_append);
bool flush_appends(bool force);
void handle_append_flushed(uint64_t tid, int r);
void append_overflowed(uint64_t tid);
void append_overflowed();
void send_appends(AppendBuffers *append_buffers);
void send_appends_aio();

void notify_handler();
void notify_handler_unlock();
};

} // namespace journal
Expand Down
7 changes: 4 additions & 3 deletions src/librbd/ImageWatcher.cc
Expand Up @@ -475,11 +475,12 @@ void ImageWatcher<I>::notify_request_lock() {
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
RWLock::RLocker snap_locker(m_image_ctx.snap_lock);

// ExclusiveLock state machine can be dynamically disabled
if (m_image_ctx.exclusive_lock == nullptr) {
// ExclusiveLock state machine can be dynamically disabled or
// race with task cancel
if (m_image_ctx.exclusive_lock == nullptr ||
m_image_ctx.exclusive_lock->is_lock_owner()) {
return;
}
assert(!m_image_ctx.exclusive_lock->is_lock_owner());

ldout(m_image_ctx.cct, 10) << this << " notify request lock" << dendl;

Expand Down

0 comments on commit 5a98a8c

Please sign in to comment.