Skip to content

Commit

Permalink
Merge pull request #10679 from dillaman/wip-16735-jewel
Browse files Browse the repository at this point in the history
jewel: rbd-nbd does not properly handle resize notifications

Reviewed-by: Jason Dillaman <dillaman@redhat.com>
  • Loading branch information
Loic Dachary committed Aug 17, 2016
2 parents 3167918 + fcc00f7 commit eb706ab
Show file tree
Hide file tree
Showing 13 changed files with 557 additions and 38 deletions.
20 changes: 17 additions & 3 deletions qa/workunits/rbd/rbd-nbd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ DEV=`${SUDO} rbd-nbd --device ${dev1} map ${POOL}/${IMAGE}`
[ "${DEV}" = "${dev1}" ]
${SUDO} rbd-nbd list-mapped | grep "^${DEV}$"

#read test
# read test
[ "`dd if=${DATA} bs=1M | md5sum`" = "`${SUDO} dd if=${DEV} bs=1M | md5sum`" ]

#write test
# write test
dd if=/dev/urandom of=${DATA} bs=1M count=${SIZE}
${SUDO} dd if=${DATA} of=${DEV} bs=1M oflag=direct
[ "`dd if=${DATA} bs=1M | md5sum`" = "`rbd -p ${POOL} --no-progress export ${IMAGE} - | md5sum`" ]

#trim test
# trim test
provisioned=`rbd -p ${POOL} --format xml du ${IMAGE} |
$XMLSTARLET sel -t -m "//stats/images/image/provisioned_size" -v .`
used=`rbd -p ${POOL} --format xml du ${IMAGE} |
Expand All @@ -98,4 +98,18 @@ used=`rbd -p ${POOL} --format xml du ${IMAGE} |
$XMLSTARLET sel -t -m "//stats/images/image/used_size" -v .`
[ "${used}" -lt "${provisioned}" ]

# resize test
devname=$(basename ${DEV})
blocks=$(awk -v dev=${devname} '$4 == dev {print $3}' /proc/partitions)
test -n "${blocks}"
rbd resize ${POOL}/${IMAGE} --size $((SIZE * 2))M
rbd info ${POOL}/${IMAGE}
blocks2=$(awk -v dev=${devname} '$4 == dev {print $3}' /proc/partitions)
test -n "${blocks2}"
test ${blocks2} -eq $((blocks * 2))
rbd resize ${POOL}/${IMAGE} --allow-shrink --size ${SIZE}M
blocks2=$(awk -v dev=${devname} '$4 == dev {print $3}' /proc/partitions)
test -n "${blocks2}"
test ${blocks2} -eq ${blocks}

echo OK
23 changes: 23 additions & 0 deletions src/include/rbd/librbd.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ typedef void (*rbd_callback_t)(rbd_completion_t cb, void *arg);

typedef int (*librbd_progress_fn_t)(uint64_t offset, uint64_t total, void *ptr);

typedef void (*rbd_update_callback_t)(void *arg);

typedef struct {
uint64_t id;
uint64_t size;
Expand Down Expand Up @@ -641,6 +643,27 @@ CEPH_RBD_API int rbd_mirror_image_get_status(rbd_image_t image,
rbd_mirror_image_status_t *mirror_image_status,
size_t status_size);

/**
* Register an image metadata change watcher.
*
* @param image the image to watch
* @param handle where to store the internal id assigned to this watch
* @param watch_cb what to do when a notify is received on this image
* @param arg opaque value to pass to the callback
* @returns 0 on success, negative error code on failure
*/
CEPH_RBD_API int rbd_update_watch(rbd_image_t image, uint64_t *handle,
rbd_update_callback_t watch_cb, void *arg);

/**
* Unregister an image watcher.
*
* @param image the image to unwatch
* @param handle which watch to unregister
* @returns 0 on success, negative error code on failure
*/
CEPH_RBD_API int rbd_update_unwatch(rbd_image_t image, uint64_t handle);

#ifdef __cplusplus
}
#endif
Expand Down
12 changes: 12 additions & 0 deletions src/include/rbd/librbd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ class CEPH_RBD_API ImageOptions {
rbd_image_options_t opts;
};

class CEPH_RBD_API UpdateWatchCtx {
public:
virtual ~UpdateWatchCtx() {}
/**
* Callback activated when we receive a notify event.
*/
virtual void handle_notify() = 0;
};

class CEPH_RBD_API Image
{
public:
Expand Down Expand Up @@ -356,6 +365,9 @@ class CEPH_RBD_API Image
int mirror_image_get_status(mirror_image_status_t *mirror_image_status,
size_t status_size);

int update_watch(UpdateWatchCtx *ctx, uint64_t *handle);
int update_unwatch(uint64_t handle);

private:
friend class RBD;

Expand Down
255 changes: 253 additions & 2 deletions src/librbd/ImageState.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// vim: ts=8 sw=2 smarttab

#include "librbd/ImageState.h"
#include "include/rbd/librbd.hpp"
#include "common/dout.h"
#include "common/errno.h"
#include "common/Cond.h"
Expand All @@ -22,16 +23,224 @@ namespace librbd {
using util::create_async_context_callback;
using util::create_context_callback;

class ImageUpdateWatchers {
public:

ImageUpdateWatchers(CephContext *cct) : m_cct(cct),
m_lock(util::unique_lock_name("librbd::ImageUpdateWatchers::m_lock", this)) {
}

~ImageUpdateWatchers() {
assert(m_watchers.empty());
assert(m_in_flight.empty());
assert(m_pending_unregister.empty());
assert(m_on_shut_down_finish == nullptr);

destroy_work_queue();
}

void flush(Context *on_finish) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << dendl;
{
Mutex::Locker locker(m_lock);
if (!m_in_flight.empty()) {
Context *ctx = new FunctionContext(
[this, on_finish](int r) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
<< ": completing flush" << dendl;
on_finish->complete(r);
});
m_work_queue->queue(ctx, 0);
return;
}
}
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
<< ": completing flush" << dendl;
on_finish->complete(0);
}

void shut_down(Context *on_finish) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << dendl;
{
Mutex::Locker locker(m_lock);
assert(m_on_shut_down_finish == nullptr);
m_watchers.clear();
if (!m_in_flight.empty()) {
m_on_shut_down_finish = on_finish;
return;
}
}
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
<< ": completing shut down" << dendl;
on_finish->complete(0);
}

void register_watcher(UpdateWatchCtx *watcher, uint64_t *handle) {
ldout(m_cct, 20) << __func__ << ": watcher=" << watcher << dendl;

Mutex::Locker locker(m_lock);
assert(m_on_shut_down_finish == nullptr);

create_work_queue();

*handle = m_next_handle++;
m_watchers.insert(std::make_pair(*handle, watcher));
}

void unregister_watcher(uint64_t handle, Context *on_finish) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << ": handle="
<< handle << dendl;
int r = 0;
{
Mutex::Locker locker(m_lock);
auto it = m_watchers.find(handle);
if (it == m_watchers.end()) {
r = -ENOENT;
} else {
if (m_in_flight.find(handle) != m_in_flight.end()) {
assert(m_pending_unregister.find(handle) == m_pending_unregister.end());
m_pending_unregister[handle] = on_finish;
on_finish = nullptr;
}
m_watchers.erase(it);
}
}

if (on_finish) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
<< ": completing unregister" << dendl;
on_finish->complete(r);
}
}

void notify() {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << dendl;

Mutex::Locker locker(m_lock);
for (auto it : m_watchers) {
send_notify(it.first, it.second);
}
}

void send_notify(uint64_t handle, UpdateWatchCtx *watcher) {
assert(m_lock.is_locked());

ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << ": handle="
<< handle << ", watcher=" << watcher << dendl;

m_in_flight.insert(handle);

Context *ctx = new FunctionContext(
[this, handle, watcher](int r) {
handle_notify(handle, watcher);
});

m_work_queue->queue(ctx, 0);
}

void handle_notify(uint64_t handle, UpdateWatchCtx *watcher) {

ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << ": handle="
<< handle << ", watcher=" << watcher << dendl;

watcher->handle_notify();

Context *on_unregister_finish = nullptr;
Context *on_shut_down_finish = nullptr;

{
Mutex::Locker locker(m_lock);

auto in_flight_it = m_in_flight.find(handle);
assert(in_flight_it != m_in_flight.end());
m_in_flight.erase(in_flight_it);

// If there is no more in flight notifications for this watcher
// and it is pending unregister, complete it now.
if (m_in_flight.find(handle) == m_in_flight.end()) {
auto it = m_pending_unregister.find(handle);
if (it != m_pending_unregister.end()) {
on_unregister_finish = it->second;
m_pending_unregister.erase(it);
}
}

if (m_in_flight.empty()) {
assert(m_pending_unregister.empty());
if (m_on_shut_down_finish != nullptr) {
std::swap(m_on_shut_down_finish, on_shut_down_finish);
}
}
}

if (on_unregister_finish != nullptr) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
<< ": completing unregister" << dendl;
on_unregister_finish->complete(0);
}

if (on_shut_down_finish != nullptr) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
<< ": completing shut down" << dendl;
on_shut_down_finish->complete(0);
}
}

private:
class ThreadPoolSingleton : public ThreadPool {
public:
explicit ThreadPoolSingleton(CephContext *cct)
: ThreadPool(cct, "librbd::ImageUpdateWatchers::thread_pool", "tp_librbd",
1) {
start();
}
virtual ~ThreadPoolSingleton() {
stop();
}
};

CephContext *m_cct;
Mutex m_lock;
ContextWQ *m_work_queue = nullptr;
std::map<uint64_t, UpdateWatchCtx*> m_watchers;
uint64_t m_next_handle = 0;
std::multiset<uint64_t> m_in_flight;
std::map<uint64_t, Context*> m_pending_unregister;
Context *m_on_shut_down_finish = nullptr;

void create_work_queue() {
if (m_work_queue != nullptr) {
return;
}
ThreadPoolSingleton *thread_pool_singleton;
m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
thread_pool_singleton, "librbd::ImageUpdateWatchers::thread_pool");
m_work_queue = new ContextWQ("librbd::ImageUpdateWatchers::op_work_queue",
m_cct->_conf->rbd_op_thread_timeout,
thread_pool_singleton);
}

void destroy_work_queue() {
if (m_work_queue == nullptr) {
return;
}
m_work_queue->drain();
delete m_work_queue;
}
};

template <typename I>
ImageState<I>::ImageState(I *image_ctx)
: m_image_ctx(image_ctx), m_state(STATE_UNINITIALIZED),
m_lock(util::unique_lock_name("librbd::ImageState::m_lock", this)),
m_last_refresh(0), m_refresh_seq(0) {
m_last_refresh(0), m_refresh_seq(0),
m_update_watchers(new ImageUpdateWatchers(image_ctx->cct)) {
}

template <typename I>
ImageState<I>::~ImageState() {
assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
delete m_update_watchers;
}

template <typename I>
Expand Down Expand Up @@ -84,8 +293,12 @@ void ImageState<I>::handle_update_notification() {
++m_refresh_seq;

CephContext *cct = m_image_ctx->cct;
ldout(cct, 20) << "refresh_seq = " << m_refresh_seq << ", "
ldout(cct, 20) << __func__ << ": refresh_seq = " << m_refresh_seq << ", "
<< "last_refresh = " << m_last_refresh << dendl;

if (m_state == STATE_OPEN) {
m_update_watchers->notify();
}
}

template <typename I>
Expand Down Expand Up @@ -164,6 +377,44 @@ void ImageState<I>::snap_set(const std::string &snap_name, Context *on_finish) {
execute_action_unlock(action, on_finish);
}

template <typename I>
int ImageState<I>::register_update_watcher(UpdateWatchCtx *watcher,
uint64_t *handle) {
CephContext *cct = m_image_ctx->cct;
ldout(cct, 20) << __func__ << dendl;

m_update_watchers->register_watcher(watcher, handle);

ldout(cct, 20) << __func__ << ": handle=" << *handle << dendl;
return 0;
}

template <typename I>
int ImageState<I>::unregister_update_watcher(uint64_t handle) {
CephContext *cct = m_image_ctx->cct;
ldout(cct, 20) << __func__ << ": handle=" << handle << dendl;

C_SaferCond ctx;
m_update_watchers->unregister_watcher(handle, &ctx);
return ctx.wait();
}

template <typename I>
void ImageState<I>::flush_update_watchers(Context *on_finish) {
CephContext *cct = m_image_ctx->cct;
ldout(cct, 20) << __func__ << dendl;

m_update_watchers->flush(on_finish);
}

template <typename I>
void ImageState<I>::shut_down_update_watchers(Context *on_finish) {
CephContext *cct = m_image_ctx->cct;
ldout(cct, 20) << __func__ << dendl;

m_update_watchers->shut_down(on_finish);
}

template <typename I>
bool ImageState<I>::is_transition_state() const {
switch (m_state) {
Expand Down
Loading

0 comments on commit eb706ab

Please sign in to comment.