Skip to content

Commit

Permalink
Merge pull request #11709 from iain-buclaw-sociomantic/librados_aioexec
Browse files Browse the repository at this point in the history
librados: Add rados_aio_exec to the C API

Reviewed-by: Kefu Chai <kchai@redhat.com>
Reviewed-by: Sage Weil <sage@redhat.com>
  • Loading branch information
liewegas committed Nov 18, 2016
2 parents 060307c + e74b90e commit cc37efa
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/include/rados/librados.h
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,31 @@ CEPH_RADOS_API int rados_aio_stat(rados_ioctx_t io, const char *o,
CEPH_RADOS_API int rados_aio_cancel(rados_ioctx_t io,
rados_completion_t completion);

/**
* Asynchronously execute an OSD class method on an object
*
* The OSD has a plugin mechanism for performing complicated
* operations on an object atomically. These plugins are called
* classes. This function allows librados users to call the custom
* methods. The input and output formats are defined by the class.
* Classes in ceph.git can be found in src/cls subdirectories
*
* @param io the context in which to call the method
* @param oid the object to call the method on
* @param cls the name of the class
* @param method the name of the method
* @param in_buf where to find input
* @param in_len length of in_buf in bytes
* @param buf where to store output
* @param out_len length of buf in bytes
* @returns 0 on success, negative error code on failure
*/
CEPH_RADOS_API int rados_aio_exec(rados_ioctx_t io, const char *o,
rados_completion_t completion,
const char *cls, const char *method,
const char *in_buf, size_t in_len,
char *buf, size_t out_len);

/** @} Asynchronous I/O */

/**
Expand Down
22 changes: 22 additions & 0 deletions src/librados/IoCtxImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,28 @@ int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
return 0;
}

int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
const char *cls, const char *method,
bufferlist& inbl, char *buf, size_t out_len)
{
Context *onack = new C_aio_Ack(c);

c->is_read = true;
c->io = this;
c->bl.clear();
c->bl.push_back(buffer::create_static(out_len, buf));
c->blp = &c->bl;
c->out_buf = buf;

::ObjectOperation rd;
prepare_assert_ops(&rd);
rd.call(cls, method, inbl);
Objecter::Op *o = objecter->prepare_read_op(
oid, oloc, rd, snap_seq, &c->bl, 0, onack, &c->objver);
objecter->op_submit(o, &c->tid);
return 0;
}

int librados::IoCtxImpl::read(const object_t& oid,
bufferlist& bl, size_t len, uint64_t off)
{
Expand Down
2 changes: 2 additions & 0 deletions src/librados/IoCtxImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ struct librados::IoCtxImpl {
int aio_remove(const object_t &oid, AioCompletionImpl *c, int flags=0);
int aio_exec(const object_t& oid, AioCompletionImpl *c, const char *cls,
const char *method, bufferlist& inbl, bufferlist *outbl);
int aio_exec(const object_t& oid, AioCompletionImpl *c, const char *cls,
const char *method, bufferlist& inbl, char *buf, size_t out_len);
int aio_stat(const object_t& oid, AioCompletionImpl *c, uint64_t *psize, time_t *pmtime);
int aio_stat2(const object_t& oid, AioCompletionImpl *c, uint64_t *psize, struct timespec *pts);
int aio_getxattr(const object_t& oid, AioCompletionImpl *c,
Expand Down
17 changes: 17 additions & 0 deletions src/librados/librados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4732,6 +4732,23 @@ extern "C" int rados_aio_cancel(rados_ioctx_t io, rados_completion_t completion)
return ctx->aio_cancel((librados::AioCompletionImpl*)completion);
}

extern "C" int rados_aio_exec(rados_ioctx_t io, const char *o,
rados_completion_t completion,
const char *cls, const char *method,
const char *inbuf, size_t in_len,
char *buf, size_t out_len)
{
tracepoint(librados, rados_aio_exec_enter, io, o, completion);
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
bufferlist inbl;
inbl.append(inbuf, in_len);
int retval = ctx->aio_exec(oid, (librados::AioCompletionImpl*)completion,
cls, method, inbl, buf, out_len);
tracepoint(librados, rados_aio_exec_exit, retval);
return retval;
}

struct C_WatchCB : public librados::WatchCtx {
rados_watchcb_t wcb;
void *arg;
Expand Down
134 changes: 134 additions & 0 deletions src/test/librados/aio.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1982,6 +1982,73 @@ TEST(LibRadosAio, StatRemovePP) {
delete my_completion4;
}

TEST(LibRadosAio, ExecuteClass) {
AioTestData test_data;
rados_completion_t my_completion;
ASSERT_EQ("", test_data.init());
ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
set_completion_complete, set_completion_safe, &my_completion));
char buf[128];
memset(buf, 0xcc, sizeof(buf));
ASSERT_EQ(0, rados_aio_write(test_data.m_ioctx, "foo",
my_completion, buf, sizeof(buf), 0));
{
TestAlarm alarm;
sem_wait(test_data.m_sem);
sem_wait(test_data.m_sem);
}
ASSERT_EQ(0, rados_aio_get_return_value(my_completion));
rados_completion_t my_completion2;
ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
set_completion_complete, set_completion_safe, &my_completion2));
char out[128];
ASSERT_EQ(0, rados_aio_exec(test_data.m_ioctx, "foo", my_completion2,
"hello", "say_hello", NULL, 0, out, sizeof(out)));
{
TestAlarm alarm;
ASSERT_EQ(0, rados_aio_wait_for_complete(my_completion2));
}
ASSERT_EQ(13, rados_aio_get_return_value(my_completion2));
ASSERT_EQ(0, strncmp("Hello, world!", out, 13));
rados_aio_release(my_completion);
rados_aio_release(my_completion2);
}

TEST(LibRadosAio, ExecuteClassPP) {
AioTestDataPP test_data;
ASSERT_EQ("", test_data.init());
AioCompletion *my_completion = test_data.m_cluster.aio_create_completion(
(void*)&test_data, set_completion_complete, set_completion_safe);
AioCompletion *my_completion_null = NULL;
ASSERT_NE(my_completion, my_completion_null);
char buf[128];
memset(buf, 0xcc, sizeof(buf));
bufferlist bl1;
bl1.append(buf, sizeof(buf));
ASSERT_EQ(0, test_data.m_ioctx.aio_write("foo", my_completion,
bl1, sizeof(buf), 0));
{
TestAlarm alarm;
sem_wait(test_data.m_sem);
sem_wait(test_data.m_sem);
}
ASSERT_EQ(0, my_completion->get_return_value());
AioCompletion *my_completion2 = test_data.m_cluster.aio_create_completion(
(void*)&test_data, set_completion_complete, set_completion_safe);
ASSERT_NE(my_completion2, my_completion_null);
bufferlist in, out;
ASSERT_EQ(0, test_data.m_ioctx.aio_exec("foo", my_completion2,
"hello", "say_hello", in, &out));
{
TestAlarm alarm;
ASSERT_EQ(0, my_completion2->wait_for_complete());
}
ASSERT_EQ(0, my_completion2->get_return_value());
ASSERT_EQ(std::string("Hello, world!"), std::string(out.c_str(), out.length()));
delete my_completion;
delete my_completion2;
}

using std::string;
using std::map;
using std::set;
Expand Down Expand Up @@ -3699,6 +3766,73 @@ TEST(LibRadosAioEC, StatRemovePP) {
delete my_completion4;
}

TEST(LibRadosAioEC, ExecuteClass) {
AioTestDataEC test_data;
rados_completion_t my_completion;
ASSERT_EQ("", test_data.init());
ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
set_completion_completeEC, set_completion_safeEC, &my_completion));
char buf[128];
memset(buf, 0xcc, sizeof(buf));
ASSERT_EQ(0, rados_aio_write(test_data.m_ioctx, "foo",
my_completion, buf, sizeof(buf), 0));
{
TestAlarm alarm;
sem_wait(test_data.m_sem);
sem_wait(test_data.m_sem);
}
ASSERT_EQ(0, rados_aio_get_return_value(my_completion));
rados_completion_t my_completion2;
ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
set_completion_completeEC, set_completion_safeEC, &my_completion2));
char out[128];
ASSERT_EQ(0, rados_aio_exec(test_data.m_ioctx, "foo", my_completion2,
"hello", "say_hello", NULL, 0, out, sizeof(out)));
{
TestAlarm alarm;
ASSERT_EQ(0, rados_aio_wait_for_complete(my_completion2));
}
ASSERT_EQ(13, rados_aio_get_return_value(my_completion2));
ASSERT_EQ(0, strncmp("Hello, world!", out, 13));
rados_aio_release(my_completion);
rados_aio_release(my_completion2);
}

TEST(LibRadosAioEC, ExecuteClassPP) {
AioTestDataECPP test_data;
ASSERT_EQ("", test_data.init());
AioCompletion *my_completion = test_data.m_cluster.aio_create_completion(
(void*)&test_data, set_completion_completeEC, set_completion_safeEC);
AioCompletion *my_completion_null = NULL;
ASSERT_NE(my_completion, my_completion_null);
char buf[128];
memset(buf, 0xcc, sizeof(buf));
bufferlist bl1;
bl1.append(buf, sizeof(buf));
ASSERT_EQ(0, test_data.m_ioctx.aio_write("foo", my_completion,
bl1, sizeof(buf), 0));
{
TestAlarm alarm;
sem_wait(test_data.m_sem);
sem_wait(test_data.m_sem);
}
ASSERT_EQ(0, my_completion->get_return_value());
AioCompletion *my_completion2 = test_data.m_cluster.aio_create_completion(
(void*)&test_data, set_completion_completeEC, set_completion_safeEC);
ASSERT_NE(my_completion2, my_completion_null);
bufferlist in, out;
ASSERT_EQ(0, test_data.m_ioctx.aio_exec("foo", my_completion2,
"hello", "say_hello", in, &out));
{
TestAlarm alarm;
ASSERT_EQ(0, my_completion2->wait_for_complete());
}
ASSERT_EQ(0, my_completion2->get_return_value());
ASSERT_EQ(std::string("Hello, world!"), std::string(out.c_str(), out.length()));
delete my_completion;
delete my_completion2;
}

TEST(LibRadosAioEC, OmapPP) {
Rados cluster;
std::string pool_name = get_temp_pool_name();
Expand Down
20 changes: 20 additions & 0 deletions src/tracing/librados.tp
Original file line number Diff line number Diff line change
Expand Up @@ -2407,6 +2407,26 @@ TRACEPOINT_EVENT(librados, rados_aio_stat_exit,
)
)

TRACEPOINT_EVENT(librados, rados_aio_exec_enter,
TP_ARGS(
rados_ioctx_t, ioctx,
const char*, oid,
rados_completion_t, completion),
TP_FIELDS(
ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
ctf_string(oid, oid)
ctf_integer_hex(rados_completion_t, completion, completion)
)
)

TRACEPOINT_EVENT(librados, rados_aio_exec_exit,
TP_ARGS(
int, retval),
TP_FIELDS(
ctf_integer(int, retval, retval)
)
)

TRACEPOINT_EVENT(librados, rados_watch_enter,
TP_ARGS(
rados_ioctx_t, ioctx,
Expand Down

0 comments on commit cc37efa

Please sign in to comment.