Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

librados: Add rados_aio_exec to the C API #11709

Merged
merged 3 commits into from
Nov 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/include/rados/librados.h
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,31 @@ CEPH_RADOS_API int rados_aio_stat(rados_ioctx_t io, const char *o,
CEPH_RADOS_API int rados_aio_cancel(rados_ioctx_t io,
rados_completion_t completion);

/**
* Asynchronously execute an OSD class method on an object
*
* The OSD has a plugin mechanism for performing complicated
* operations on an object atomically. These plugins are called
* classes. This function allows librados users to call the custom
* methods. The input and output formats are defined by the class.
* Classes in ceph.git can be found in src/cls subdirectories
*
* @param io the context in which to call the method
* @param oid the object to call the method on
* @param cls the name of the class
* @param method the name of the method
* @param in_buf where to find input
* @param in_len length of in_buf in bytes
* @param buf where to store output
* @param out_len length of buf in bytes
* @returns 0 on success, negative error code on failure
*/
CEPH_RADOS_API int rados_aio_exec(rados_ioctx_t io, const char *o,
rados_completion_t completion,
const char *cls, const char *method,
const char *in_buf, size_t in_len,
char *buf, size_t out_len);

/** @} Asynchronous I/O */

/**
Expand Down
22 changes: 22 additions & 0 deletions src/librados/IoCtxImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,28 @@ int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
return 0;
}

int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
const char *cls, const char *method,
bufferlist& inbl, char *buf, size_t out_len)
{
Context *onack = new C_aio_Ack(c);

c->is_read = true;
c->io = this;
c->bl.clear();
c->bl.push_back(buffer::create_static(out_len, buf));
c->blp = &c->bl;
c->out_buf = buf;

::ObjectOperation rd;
prepare_assert_ops(&rd);
rd.call(cls, method, inbl);
Objecter::Op *o = objecter->prepare_read_op(
oid, oloc, rd, snap_seq, &c->bl, 0, onack, &c->objver);
objecter->op_submit(o, &c->tid);
return 0;
}

int librados::IoCtxImpl::read(const object_t& oid,
bufferlist& bl, size_t len, uint64_t off)
{
Expand Down
2 changes: 2 additions & 0 deletions src/librados/IoCtxImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ struct librados::IoCtxImpl {
int aio_remove(const object_t &oid, AioCompletionImpl *c, int flags=0);
int aio_exec(const object_t& oid, AioCompletionImpl *c, const char *cls,
const char *method, bufferlist& inbl, bufferlist *outbl);
int aio_exec(const object_t& oid, AioCompletionImpl *c, const char *cls,
const char *method, bufferlist& inbl, char *buf, size_t out_len);
int aio_stat(const object_t& oid, AioCompletionImpl *c, uint64_t *psize, time_t *pmtime);
int aio_stat2(const object_t& oid, AioCompletionImpl *c, uint64_t *psize, struct timespec *pts);
int aio_getxattr(const object_t& oid, AioCompletionImpl *c,
Expand Down
17 changes: 17 additions & 0 deletions src/librados/librados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4717,6 +4717,23 @@ extern "C" int rados_aio_cancel(rados_ioctx_t io, rados_completion_t completion)
return ctx->aio_cancel((librados::AioCompletionImpl*)completion);
}

extern "C" int rados_aio_exec(rados_ioctx_t io, const char *o,
rados_completion_t completion,
const char *cls, const char *method,
const char *inbuf, size_t in_len,
char *buf, size_t out_len)
{
tracepoint(librados, rados_aio_exec_enter, io, o, completion);
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
bufferlist inbl;
inbl.append(inbuf, in_len);
int retval = ctx->aio_exec(oid, (librados::AioCompletionImpl*)completion,
cls, method, inbl, buf, out_len);
tracepoint(librados, rados_aio_exec_exit, retval);
return retval;
}

struct C_WatchCB : public librados::WatchCtx {
rados_watchcb_t wcb;
void *arg;
Expand Down
134 changes: 134 additions & 0 deletions src/test/librados/aio.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1982,6 +1982,73 @@ TEST(LibRadosAio, StatRemovePP) {
delete my_completion4;
}

TEST(LibRadosAio, ExecuteClass) {
AioTestData test_data;
rados_completion_t my_completion;
ASSERT_EQ("", test_data.init());
ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
set_completion_complete, set_completion_safe, &my_completion));
char buf[128];
memset(buf, 0xcc, sizeof(buf));
ASSERT_EQ(0, rados_aio_write(test_data.m_ioctx, "foo",
my_completion, buf, sizeof(buf), 0));
{
TestAlarm alarm;
sem_wait(test_data.m_sem);
sem_wait(test_data.m_sem);
}
ASSERT_EQ(0, rados_aio_get_return_value(my_completion));
rados_completion_t my_completion2;
ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
set_completion_complete, set_completion_safe, &my_completion2));
char out[128];
ASSERT_EQ(0, rados_aio_exec(test_data.m_ioctx, "foo", my_completion2,
"hello", "say_hello", NULL, 0, out, sizeof(out)));
{
TestAlarm alarm;
ASSERT_EQ(0, rados_aio_wait_for_complete(my_completion2));
}
ASSERT_EQ(13, rados_aio_get_return_value(my_completion2));
ASSERT_EQ(0, strncmp("Hello, world!", out, 13));
rados_aio_release(my_completion);
rados_aio_release(my_completion2);
}

TEST(LibRadosAio, ExecuteClassPP) {
AioTestDataPP test_data;
ASSERT_EQ("", test_data.init());
AioCompletion *my_completion = test_data.m_cluster.aio_create_completion(
(void*)&test_data, set_completion_complete, set_completion_safe);
AioCompletion *my_completion_null = NULL;
ASSERT_NE(my_completion, my_completion_null);
char buf[128];
memset(buf, 0xcc, sizeof(buf));
bufferlist bl1;
bl1.append(buf, sizeof(buf));
ASSERT_EQ(0, test_data.m_ioctx.aio_write("foo", my_completion,
bl1, sizeof(buf), 0));
{
TestAlarm alarm;
sem_wait(test_data.m_sem);
sem_wait(test_data.m_sem);
}
ASSERT_EQ(0, my_completion->get_return_value());
AioCompletion *my_completion2 = test_data.m_cluster.aio_create_completion(
(void*)&test_data, set_completion_complete, set_completion_safe);
ASSERT_NE(my_completion2, my_completion_null);
bufferlist in, out;
ASSERT_EQ(0, test_data.m_ioctx.aio_exec("foo", my_completion2,
"hello", "say_hello", in, &out));
{
TestAlarm alarm;
ASSERT_EQ(0, my_completion2->wait_for_complete());
}
ASSERT_EQ(0, my_completion2->get_return_value());
ASSERT_EQ(std::string("Hello, world!"), std::string(out.c_str(), out.length()));
delete my_completion;
delete my_completion2;
}

using std::string;
using std::map;
using std::set;
Expand Down Expand Up @@ -3699,6 +3766,73 @@ TEST(LibRadosAioEC, StatRemovePP) {
delete my_completion4;
}

TEST(LibRadosAioEC, ExecuteClass) {
AioTestDataEC test_data;
rados_completion_t my_completion;
ASSERT_EQ("", test_data.init());
ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
set_completion_completeEC, set_completion_safeEC, &my_completion));
char buf[128];
memset(buf, 0xcc, sizeof(buf));
ASSERT_EQ(0, rados_aio_write(test_data.m_ioctx, "foo",
my_completion, buf, sizeof(buf), 0));
{
TestAlarm alarm;
sem_wait(test_data.m_sem);
sem_wait(test_data.m_sem);
}
ASSERT_EQ(0, rados_aio_get_return_value(my_completion));
rados_completion_t my_completion2;
ASSERT_EQ(0, rados_aio_create_completion((void*)&test_data,
set_completion_completeEC, set_completion_safeEC, &my_completion2));
char out[128];
ASSERT_EQ(0, rados_aio_exec(test_data.m_ioctx, "foo", my_completion2,
"hello", "say_hello", NULL, 0, out, sizeof(out)));
{
TestAlarm alarm;
ASSERT_EQ(0, rados_aio_wait_for_complete(my_completion2));
}
ASSERT_EQ(13, rados_aio_get_return_value(my_completion2));
ASSERT_EQ(0, strncmp("Hello, world!", out, 13));
rados_aio_release(my_completion);
rados_aio_release(my_completion2);
}

TEST(LibRadosAioEC, ExecuteClassPP) {
AioTestDataECPP test_data;
ASSERT_EQ("", test_data.init());
AioCompletion *my_completion = test_data.m_cluster.aio_create_completion(
(void*)&test_data, set_completion_completeEC, set_completion_safeEC);
AioCompletion *my_completion_null = NULL;
ASSERT_NE(my_completion, my_completion_null);
char buf[128];
memset(buf, 0xcc, sizeof(buf));
bufferlist bl1;
bl1.append(buf, sizeof(buf));
ASSERT_EQ(0, test_data.m_ioctx.aio_write("foo", my_completion,
bl1, sizeof(buf), 0));
{
TestAlarm alarm;
sem_wait(test_data.m_sem);
sem_wait(test_data.m_sem);
}
ASSERT_EQ(0, my_completion->get_return_value());
AioCompletion *my_completion2 = test_data.m_cluster.aio_create_completion(
(void*)&test_data, set_completion_completeEC, set_completion_safeEC);
ASSERT_NE(my_completion2, my_completion_null);
bufferlist in, out;
ASSERT_EQ(0, test_data.m_ioctx.aio_exec("foo", my_completion2,
"hello", "say_hello", in, &out));
{
TestAlarm alarm;
ASSERT_EQ(0, my_completion2->wait_for_complete());
}
ASSERT_EQ(0, my_completion2->get_return_value());
ASSERT_EQ(std::string("Hello, world!"), std::string(out.c_str(), out.length()));
delete my_completion;
delete my_completion2;
}

TEST(LibRadosAioEC, OmapPP) {
Rados cluster;
std::string pool_name = get_temp_pool_name();
Expand Down
20 changes: 20 additions & 0 deletions src/tracing/librados.tp
Original file line number Diff line number Diff line change
Expand Up @@ -2407,6 +2407,26 @@ TRACEPOINT_EVENT(librados, rados_aio_stat_exit,
)
)

TRACEPOINT_EVENT(librados, rados_aio_exec_enter,
TP_ARGS(
rados_ioctx_t, ioctx,
const char*, oid,
rados_completion_t, completion),
TP_FIELDS(
ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
ctf_string(oid, oid)
ctf_integer_hex(rados_completion_t, completion, completion)
)
)

TRACEPOINT_EVENT(librados, rados_aio_exec_exit,
TP_ARGS(
int, retval),
TP_FIELDS(
ctf_integer(int, retval, retval)
)
)

TRACEPOINT_EVENT(librados, rados_watch_enter,
TP_ARGS(
rados_ioctx_t, ioctx,
Expand Down