Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pybind/rados: fix object lifetime issues and other bugs in aio #7778

Merged
merged 2 commits into from Mar 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
67 changes: 33 additions & 34 deletions src/pybind/rados/rados.pyx
Expand Up @@ -40,7 +40,7 @@ else:
cdef extern from "Python.h":
# These are in cpython/string.pxd, but use "object" types instead of
# PyObject*, which invokes assumptions in cpython that we need to
# legitimately break to implement zero-copy string buffers in Image.read().
# legitimately break to implement zero-copy string buffers in Ioctx.read().
# This is valid use of the Python API and documented as a special case.
PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
char* PyBytes_AsString(PyObject *string) except NULL
Expand Down Expand Up @@ -1524,6 +1524,7 @@ cdef class Completion(object):
rados_callback_t complete_cb
rados_callback_t safe_cb
rados_completion_t rados_comp
PyObject* buf

def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
self.oncomplete = oncomplete
Expand Down Expand Up @@ -1611,8 +1612,12 @@ cdef class Completion(object):
Call this when you no longer need the completion. It may not be
freed immediately if the operation is not acked and committed.
"""
with nogil:
rados_aio_release(self.rados_comp)
ref.Py_XDECREF(self.buf)
self.buf = NULL
if self.rados_comp != NULL:
with nogil:
rados_aio_release(self.rados_comp)
self.rados_comp = NULL

def _complete(self):
self.oncomplete(self)
Expand Down Expand Up @@ -1681,7 +1686,7 @@ cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
Callback to onsafe() for asynchronous operations
"""
cdef object cb = <object>args
cb.onsafe(cb)
cb._safe()
return 0


Expand All @@ -1690,7 +1695,7 @@ cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
Callback to oncomplete() for asynchronous operations
"""
cdef object cb = <object>args
cb.oncomplete(cb)
cb._complete()
return 0


Expand Down Expand Up @@ -1797,13 +1802,13 @@ cdef class Ioctx(object):
uint64_t _offset = offset

completion = self.__get_completion(oncomplete, onsafe)

self.__track_completion(completion)
with nogil:
ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
_to_write, size, _offset)
if ret < 0:
completion._cleanup()
raise make_ex(ret, "error writing object %s" % object_name)
self.__track_completion(completion)
return completion

def aio_write_full(self, object_name, to_write,
Expand Down Expand Up @@ -1839,13 +1844,14 @@ cdef class Ioctx(object):
size_t size = len(to_write)

completion = self.__get_completion(oncomplete, onsafe)
self.__track_completion(completion)
with nogil:
ret = rados_aio_write_full(self.io, _object_name,
completion.rados_comp,
_to_write, size)
if ret < 0:
completion._cleanup()
raise make_ex(ret, "error writing object %s" % object_name)
self.__track_completion(completion)
return completion

def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
Expand Down Expand Up @@ -1879,13 +1885,14 @@ cdef class Ioctx(object):
size_t size = len(to_append)

completion = self.__get_completion(oncomplete, onsafe)
self.__track_completion(completion)
with nogil:
ret = rados_aio_append(self.io, _object_name,
completion.rados_comp,
_to_append, size)
if ret < 0:
completion._cleanup()
raise make_ex(ret, "error appending object %s" % object_name)
self.__track_completion(completion)
return completion

def aio_flush(self):
Expand Down Expand Up @@ -1930,34 +1937,25 @@ cdef class Ioctx(object):

char *ref_buf
size_t _length = length
PyObject* ret_s = NULL

def oncomplete_(completion_v):
try:
return_value = completion_v.get_return_value()
if return_value != length:
_PyBytes_Resize(&ret_s, return_value)
return oncomplete(completion_v, <object>ret_s if return_value >= 0 else None)
finally:
# We DECREF unconditionally: the cast to object above will have
# INCREFed if necessary. This also takes care of exceptions,
# including if _PyString_Resize fails (that will free the string
# itself and set ret_s to NULL, hence XDECREF).
ref.Py_XDECREF(ret_s)
cdef Completion _completion_v = completion_v
return_value = _completion_v.get_return_value()
if return_value > 0 and return_value != length:
_PyBytes_Resize(&_completion_v.buf, return_value)
return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)

completion = self.__get_completion(oncomplete_, None)
ret_s = PyBytes_FromStringAndSize(NULL, length)
try:
ret_buf = PyBytes_AsString(ret_s)
with nogil:
ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
ret_buf, _length, _offset)
if ret < 0:
raise make_ex(ret, "error reading %s" % object_name)
self.__track_completion(completion)
return completion
except Exception:
ref.Py_XDECREF(ret_s)
completion.buf = PyBytes_FromStringAndSize(NULL, length)
ret_buf = PyBytes_AsString(completion.buf)
self.__track_completion(completion)
with nogil:
ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
ret_buf, _length, _offset)
if ret < 0:
completion._cleanup()
raise make_ex(ret, "error reading %s" % object_name)
return completion

def aio_remove(self, object_name, oncomplete=None, onsafe=None):
"""
Expand All @@ -1982,12 +1980,13 @@ cdef class Ioctx(object):
char* _object_name = object_name

completion = self.__get_completion(oncomplete, onsafe)
self.__track_completion(completion)
with nogil:
ret = rados_aio_remove(self.io, _object_name,
completion.rados_comp)
if ret < 0:
completion._cleanup()
raise make_ex(ret, "error removing %s" % object_name)
self.__track_completion(completion)
return completion

def require_ioctx_open(self):
Expand Down
20 changes: 20 additions & 0 deletions src/test/pybind/test_rados.py
Expand Up @@ -649,6 +649,7 @@ def cb(_, buf):
assert(loops <= 10)

eq(retval[0], payload)
eq(sys.getrefcount(comp), 2)

# test2: use wait_for_complete_and_cb(), verify retval[0] is
# set by the time we regain control
Expand All @@ -666,6 +667,25 @@ def cb(_, buf):
comp.wait_for_complete_and_cb()
assert(retval[0] is not None)
eq(retval[0], payload)
eq(sys.getrefcount(comp), 2)

# test3: error case, use wait_for_complete_and_cb(), verify retval[0] is
# set by the time we regain control

retval[0] = 1
self._take_down_acting_set('test_pool', 'bar')
comp = self.ioctx.aio_read("bar", len(payload), 0, cb)
eq(False, comp.is_complete())
time.sleep(3)
eq(False, comp.is_complete())
with lock:
eq(1, retval[0])
self._let_osds_back_up()

comp.wait_for_complete_and_cb()
eq(None, retval[0])
assert(comp.get_return_value() < 0)
eq(sys.getrefcount(comp), 2)

[i.remove() for i in self.ioctx.list_objects()]

Expand Down