Skip to content

Commit

Permalink
Merge pull request #5465 from yuyuyu101/wip-librbd-ap
Browse files Browse the repository at this point in the history
librbd: support eventfd for AIO completion notifications

Reviewed-by: Jason Dillaman <dillaman@redhat.com>
  • Loading branch information
Jason Dillaman committed Dec 1, 2015
2 parents 57c7d55 + c57ceff commit 2884d8b
Show file tree
Hide file tree
Showing 18 changed files with 450 additions and 19 deletions.
11 changes: 9 additions & 2 deletions configure.ac
Expand Up @@ -1309,8 +1309,15 @@ AC_ARG_WITH(
]
)



# Force not to use eventfd
AC_ARG_WITH([eventfd],
[AS_HELP_STRING([--without-eventfd], [disable eventfd [default=no]])],
,
[with_eventfd=yes])
AS_IF([test "x$with_eventfd" != xno],
AC_CHECK_HEADERS(sys/eventfd.h,
[AC_DEFINE(HAVE_EVENTFD, 1, [Have eventfd extension.])]))
AM_CONDITIONAL(WITH_EVENTFD, [ test "$with_eventfd" = "yes" ])

# Checks for typedefs, structures, and compiler characteristics.
#AC_HEADER_STDBOOL
Expand Down
3 changes: 2 additions & 1 deletion src/common/Makefile.am
Expand Up @@ -259,7 +259,8 @@ noinst_HEADERS += \
common/bit_vector.hpp \
common/SubProcess.h \
common/valgrind.h \
common/TracepointProvider.h
common/TracepointProvider.h \
common/event_socket.h

if ENABLE_XIO
noinst_HEADERS += \
Expand Down
74 changes: 74 additions & 0 deletions src/common/event_socket.h
@@ -0,0 +1,74 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 XSky <haomai@xsky.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#ifndef CEPH_COMMON_EVENT_SOCKET_H
#define CEPH_COMMON_EVENT_SOCKET_H

#include "include/event_type.h"

class EventSocket {
int socket;
int type;

public:
EventSocket(): socket(-1), type(EVENT_SOCKET_TYPE_NONE) {}
bool is_valid() const { return socket != -1; }
int init(int fd, int t) {
switch (t) {
case EVENT_SOCKET_TYPE_PIPE:
#ifdef HAVE_EVENTFD
case EVENT_SOCKET_TYPE_EVENTFD:
#endif
{
socket = fd;
type = t;
return 0;
}
}
return -EINVAL;
}
int notify() {
int ret;
switch (type) {
case EVENT_SOCKET_TYPE_PIPE:
{
char buf[1];
buf[0] = 'i';
ret = write(socket, buf, 1);
if (ret < 0)
ret = -errno;
else
ret = 0;
}
case EVENT_SOCKET_TYPE_EVENTFD:
{
uint64_t value = 1;
ret = write(socket, &value, sizeof (value));
if (ret < 0)
ret = -errno;
else
ret = 0;
}
default:
{
ret = -1;
}
}
return ret;
}
};

#endif
3 changes: 2 additions & 1 deletion src/include/Makefile.am
Expand Up @@ -115,4 +115,5 @@ noinst_HEADERS += \
include/rados/memory.h \
include/unordered_set.h \
include/unordered_map.h \
include/timegm.h
include/timegm.h \
include/event_type.h
24 changes: 24 additions & 0 deletions src/include/event_type.h
@@ -0,0 +1,24 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 XSky <haomai@xsky.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#ifndef CEPH_COMMON_EVENT_TYPE_H
#define CEPH_COMMON_EVENT_TYPE_H

#define EVENT_SOCKET_TYPE_NONE 0
#define EVENT_SOCKET_TYPE_PIPE 1
#define EVENT_SOCKET_TYPE_EVENTFD 2

#endif
14 changes: 14 additions & 0 deletions src/include/rbd/librbd.h
Expand Up @@ -65,6 +65,15 @@ typedef struct {
#define RBD_MAX_IMAGE_NAME_SIZE 96
#define RBD_MAX_BLOCK_NAME_SIZE 24

/**
* These types used to in set_image_notification to indicate the type of event
* socket passed in.
*/
enum {
EVENT_TYPE_PIPE = 1,
EVENT_TYPE_EVENTFD = 2
};

typedef struct {
uint64_t size;
uint64_t obj_size;
Expand Down Expand Up @@ -191,6 +200,7 @@ CEPH_RBD_API int rbd_get_parent_info(rbd_image_t image,
char *parent_snapname,
size_t psnapnamelen);
CEPH_RBD_API int rbd_get_flags(rbd_image_t image, uint64_t *flags);
CEPH_RBD_API int rbd_set_image_notification(rbd_image_t image, int fd, int type);

/* exclusive lock feature */
CEPH_RBD_API int rbd_is_exclusive_lock_owner(rbd_image_t image, int *is_owner);
Expand Down Expand Up @@ -471,12 +481,14 @@ CEPH_RBD_API int rbd_aio_read2(rbd_image_t image, uint64_t off, size_t len,
char *buf, rbd_completion_t c, int op_flags);
CEPH_RBD_API int rbd_aio_discard(rbd_image_t image, uint64_t off, uint64_t len,
rbd_completion_t c);

CEPH_RBD_API int rbd_aio_create_completion(void *cb_arg,
rbd_callback_t complete_cb,
rbd_completion_t *c);
CEPH_RBD_API int rbd_aio_is_complete(rbd_completion_t c);
CEPH_RBD_API int rbd_aio_wait_for_complete(rbd_completion_t c);
CEPH_RBD_API ssize_t rbd_aio_get_return_value(rbd_completion_t c);
CEPH_RBD_API void *rbd_aio_get_arg(rbd_completion_t c);
CEPH_RBD_API void rbd_aio_release(rbd_completion_t c);
CEPH_RBD_API int rbd_flush(rbd_image_t image);
/**
Expand All @@ -497,6 +509,8 @@ CEPH_RBD_API int rbd_aio_flush(rbd_image_t image, rbd_completion_t c);
*/
CEPH_RBD_API int rbd_invalidate_cache(rbd_image_t image);

CEPH_RBD_API int rbd_poll_io_events(rbd_image_t image, rbd_completion_t *comps, int numcomp);

CEPH_RBD_API int rbd_metadata_get(rbd_image_t image, const char *key, char *value, size_t *val_len);
CEPH_RBD_API int rbd_metadata_set(rbd_image_t image, const char *key, const char *value);
CEPH_RBD_API int rbd_metadata_remove(rbd_image_t image, const char *key);
Expand Down
4 changes: 4 additions & 0 deletions src/include/rbd/librbd.hpp
Expand Up @@ -70,6 +70,7 @@ class CEPH_RBD_API RBD
bool is_complete();
int wait_for_complete();
ssize_t get_return_value();
void *get_arg();
void release();
};

Expand Down Expand Up @@ -147,6 +148,7 @@ class CEPH_RBD_API Image
int update_features(uint64_t features, bool enabled);
int overlap(uint64_t *overlap);
int get_flags(uint64_t *flags);
int set_image_notification(int fd, int type);

/* exclusive lock feature */
int is_exclusive_lock_owner(bool *is_owner);
Expand Down Expand Up @@ -283,6 +285,8 @@ class CEPH_RBD_API Image
*/
int invalidate_cache();

int poll_io_events(RBD::AioCompletion **comps, int numcomp);

int metadata_get(const std::string &key, std::string *value);
int metadata_set(const std::string &key, const std::string &value);
int metadata_remove(const std::string &key);
Expand Down
7 changes: 7 additions & 0 deletions src/librbd/AioCompletion.cc
Expand Up @@ -103,7 +103,14 @@ namespace librbd {
complete_cb(rbd_comp, complete_arg);
lock.Lock();
}

done = true;
if (ictx && event_notify && ictx->event_socket.is_valid()) {
ictx->completed_reqs_lock.Lock();
ictx->completed_reqs.push_back(&m_xlist_item);
ictx->completed_reqs_lock.Unlock();
ictx->event_socket.notify();
}
cond.Signal();
tracepoint(librbd, aio_complete_exit);
}
Expand Down
25 changes: 22 additions & 3 deletions src/librbd/AioCompletion.h
Expand Up @@ -10,6 +10,7 @@
#include "include/rbd/librbd.hpp"

#include "librbd/AsyncOperation.h"
#include "librbd/ImageCtx.h"

#include "osdc/Striper.h"

Expand Down Expand Up @@ -63,6 +64,8 @@ namespace librbd {
AsyncOperation async_op;

uint64_t journal_tid;
xlist<AioCompletion*>::item m_xlist_item;
bool event_notify;

AioCompletion() : lock("AioCompletion::lock", true, false),
done(false), rval(0), complete_cb(NULL),
Expand All @@ -71,7 +74,8 @@ namespace librbd {
ref(1), released(false), ictx(NULL),
aio_type(AIO_TYPE_NONE),
read_bl(NULL), read_buf(NULL), read_buf_len(0),
journal_tid(0) {
journal_tid(0),
m_xlist_item(this), event_notify(false) {
}
~AioCompletion() {
}
Expand Down Expand Up @@ -128,8 +132,14 @@ namespace librbd {
assert(ref > 0);
int n = --ref;
lock.Unlock();
if (!n)
delete this;
if (!n) {
if (ictx && event_notify) {
ictx->completed_reqs_lock.Lock();
m_xlist_item.remove_myself();
ictx->completed_reqs_lock.Unlock();
}
delete this;
}
}

void block() {
Expand All @@ -145,6 +155,15 @@ namespace librbd {
complete(cct);
}
}

void set_event_notify(bool s) {
Mutex::Locker l(lock);
event_notify = s;
}

void *get_arg() {
return complete_arg;
}
};

class C_AioRequest : public Context {
Expand Down
26 changes: 19 additions & 7 deletions src/librbd/AioImageRequestWQ.cc
Expand Up @@ -35,7 +35,7 @@ ssize_t AioImageRequestWQ::read(uint64_t off, uint64_t len, char *buf,

C_SaferCond cond;
AioCompletion *c = aio_create_completion_internal(&cond, rbd_ctx_cb);
aio_read(c, off, len, buf, NULL, op_flags);
aio_read(c, off, len, buf, NULL, op_flags, false);
return cond.wait();
}

Expand All @@ -54,7 +54,7 @@ ssize_t AioImageRequestWQ::write(uint64_t off, uint64_t len, const char *buf,

C_SaferCond cond;
AioCompletion *c = aio_create_completion_internal(&cond, rbd_ctx_cb);
aio_write(c, off, len, buf, op_flags);
aio_write(c, off, len, buf, op_flags, false);

r = cond.wait();
if (r < 0) {
Expand All @@ -77,7 +77,7 @@ int AioImageRequestWQ::discard(uint64_t off, uint64_t len) {

C_SaferCond cond;
AioCompletion *c = aio_create_completion_internal(&cond, rbd_ctx_cb);
aio_discard(c, off, len);
aio_discard(c, off, len, false);

r = cond.wait();
if (r < 0) {
Expand All @@ -87,13 +87,16 @@ int AioImageRequestWQ::discard(uint64_t off, uint64_t len) {
}

void AioImageRequestWQ::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
char *buf, bufferlist *pbl, int op_flags) {
char *buf, bufferlist *pbl, int op_flags, bool native_async) {
c->init_time(&m_image_ctx, librbd::AIO_TYPE_READ);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "aio_read: ictx=" << &m_image_ctx << ", "
<< "completion=" << c << ", off=" << off << ", "
<< "len=" << len << ", " << "flags=" << op_flags << dendl;

if (native_async && m_image_ctx.event_socket.is_valid())
c->set_event_notify(true);

RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_image_ctx.non_blocking_aio) {
queue(new AioImageRead(m_image_ctx, c, off, len, buf, pbl, op_flags));
Expand All @@ -103,13 +106,16 @@ void AioImageRequestWQ::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
}

void AioImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
const char *buf, int op_flags) {
const char *buf, int op_flags, bool native_async) {
c->init_time(&m_image_ctx, librbd::AIO_TYPE_WRITE);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "aio_write: ictx=" << &m_image_ctx << ", "
<< "completion=" << c << ", off=" << off << ", "
<< "len=" << len << ", flags=" << op_flags << dendl;

if (native_async && m_image_ctx.event_socket.is_valid())
c->set_event_notify(true);

RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_image_ctx.non_blocking_aio || is_journal_required() ||
writes_blocked()) {
Expand All @@ -120,13 +126,16 @@ void AioImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
}

void AioImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off,
uint64_t len) {
uint64_t len, bool native_async) {
c->init_time(&m_image_ctx, librbd::AIO_TYPE_DISCARD);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "aio_discard: ictx=" << &m_image_ctx << ", "
<< "completion=" << c << ", off=" << off << ", len=" << len
<< dendl;

if (native_async && m_image_ctx.event_socket.is_valid())
c->set_event_notify(true);

RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_image_ctx.non_blocking_aio || is_journal_required() ||
writes_blocked()) {
Expand All @@ -136,12 +145,15 @@ void AioImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off,
}
}

void AioImageRequestWQ::aio_flush(AioCompletion *c) {
void AioImageRequestWQ::aio_flush(AioCompletion *c, bool native_async) {
c->init_time(&m_image_ctx, librbd::AIO_TYPE_FLUSH);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "aio_flush: ictx=" << &m_image_ctx << ", "
<< "completion=" << c << dendl;

if (native_async && m_image_ctx.event_socket.is_valid())
c->set_event_notify(true);

RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_image_ctx.non_blocking_aio || is_journal_required() ||
writes_blocked() || !writes_empty()) {
Expand Down
8 changes: 4 additions & 4 deletions src/librbd/AioImageRequestWQ.h
Expand Up @@ -25,11 +25,11 @@ class AioImageRequestWQ : protected ThreadPool::PointerWQ<AioImageRequest> {
int discard(uint64_t off, uint64_t len);

void aio_read(AioCompletion *c, uint64_t off, uint64_t len, char *buf,
bufferlist *pbl, int op_flags);
bufferlist *pbl, int op_flags, bool native_async=true);
void aio_write(AioCompletion *c, uint64_t off, uint64_t len, const char *buf,
int op_flags);
void aio_discard(AioCompletion *c, uint64_t off, uint64_t len);
void aio_flush(AioCompletion *c);
int op_flags, bool native_async=true);
void aio_discard(AioCompletion *c, uint64_t off, uint64_t len, bool native_async=true);
void aio_flush(AioCompletion *c, bool native_async=true);

using ThreadPool::PointerWQ<AioImageRequest>::drain;
using ThreadPool::PointerWQ<AioImageRequest>::empty;
Expand Down

0 comments on commit 2884d8b

Please sign in to comment.