Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python-rados: implement new aio_execute #12140

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
73 changes: 72 additions & 1 deletion src/pybind/rados/rados.pyx
Expand Up @@ -243,6 +243,8 @@ cdef extern from "rados/librados.h" nogil:

int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
const char * in_buf, size_t in_len, char * buf, size_t out_len)
int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
const char * in_buf, size_t in_len, char * buf, size_t out_len)

int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
Expand Down Expand Up @@ -2224,6 +2226,75 @@ cdef class Ioctx(object):
raise make_ex(ret, "error reading %s" % object_name)
return completion

@requires(('object_name', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
def aio_execute(self, object_name, cls, method, data,
iain-buclaw-sociomantic marked this conversation as resolved.
Show resolved Hide resolved
length=8192, oncomplete=None, onsafe=None):
"""
Asynchronously execute an OSD class method on an object.

oncomplete and onsafe will be called with the data returned from
the plugin as well as the completion:

oncomplete(completion, data)
onsafe(completion, data)

:param object_name: name of the object
:type object_name: str
:param cls: name of the object class
:type cls: str
:param method: name of the method
:type method: str
:param data: input data
:type data: bytes
:param length: size of output buffer in bytes (default=8192)
:type length: int
:param oncomplete: what to do when the execution is complete
:type oncomplete: completion
:param onsafe: what to do when the execution is safe and complete
:type onsafe: completion

:raises: :class:`Error`
:returns: completion object
"""

object_name = cstr(object_name, 'object_name')
cls = cstr(cls, 'cls')
method = cstr(method, 'method')
cdef:
Completion completion
char *_object_name = object_name
char *_cls = cls
iain-buclaw-sociomantic marked this conversation as resolved.
Show resolved Hide resolved
char *_method = method
char *_data = data
size_t _data_len = len(data)

char *ref_buf
size_t _length = length

def oncomplete_(completion_v):
cdef Completion _completion_v = completion_v
return_value = _completion_v.get_return_value()
if return_value > 0 and return_value != length:
_PyBytes_Resize(&_completion_v.buf, return_value)
return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)

def onsafe_(completion_v):
cdef Completion _completion_v = completion_v
return_value = _completion_v.get_return_value()
return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)

completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
completion.buf = PyBytes_FromStringAndSize(NULL, length)
ret_buf = PyBytes_AsString(completion.buf)
self.__track_completion(completion)
with nogil:
ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
_cls, _method, _data, _data_len, ret_buf, _length)
if ret < 0:
completion._cleanup()
raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
return completion

def aio_remove(self, object_name, oncomplete=None, onsafe=None):
"""
Asychronously remove an object
Expand Down Expand Up @@ -2541,7 +2612,7 @@ returned %d, but should return zero on success." % (self.name, ret))
:type method: str
:param data: input data
:type data: bytes
:param length: size of output buffer in bytes (default=8291)
:param length: size of output buffer in bytes (default=8192)
iain-buclaw-sociomantic marked this conversation as resolved.
Show resolved Hide resolved
:type length: int

:raises: :class:`TypeError`
Expand Down
31 changes: 31 additions & 0 deletions src/test/pybind/test_rados.py
Expand Up @@ -825,6 +825,37 @@ def test_execute(self):
ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"nose")
eq(buf, b"Hello, nose!")

def test_aio_execute(self):
count = [0]
retval = [None]
lock = threading.Condition()
def cb(_, buf):
with lock:
if retval[0] is None:
retval[0] = buf
count[0] += 1
lock.notify()
self.ioctx.write("foo", b"") # ensure object exists

comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"", 32, cb, cb)
comp.wait_for_complete()
with lock:
while count[0] < 2:
lock.wait()
eq(comp.get_return_value(), 13)
eq(retval[0], b"Hello, world!")

retval[0] = None
comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"nose", 32, cb, cb)
comp.wait_for_complete()
with lock:
while count[0] < 4:
lock.wait()
eq(comp.get_return_value(), 12)
eq(retval[0], b"Hello, nose!")

[i.remove() for i in self.ioctx.list_objects()]

class TestObject(object):

def setUp(self):
Expand Down