Skip to content

Commit

Permalink
python: add some read_op methods
Browse files Browse the repository at this point in the history
This change adds python API for the following C methods:
* rados_aio_write_op_operate
* rados_write_op_omap_set
* rados_read_op_set_flags
* rados_aio_read_op_operate
  • Loading branch information
sileht committed Jun 27, 2016
1 parent 2cd0475 commit acfbe38
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 2 deletions.
110 changes: 110 additions & 0 deletions src/pybind/rados/rados.pyx
Expand Up @@ -22,6 +22,7 @@ import sys
import threading
import time

from collections import Callable
from datetime import datetime
from functools import partial, wraps
from itertools import chain
Expand Down Expand Up @@ -237,13 +238,17 @@ cdef extern from "rados/librados.h" nogil:
const char * in_buf, size_t in_len, char * buf, size_t out_len)

int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
void rados_write_op_omap_clear(rados_write_op_t write_op)
void rados_write_op_set_flags(rados_write_op_t write_op, int flags)
void rados_read_op_omap_get_vals(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, int * prval)
void rados_read_op_omap_get_keys(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, int * prval)
void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
void rados_omap_get_end(rados_omap_iter_t iter)

Expand Down Expand Up @@ -1689,6 +1694,21 @@ cdef class WriteOp(object):
with nogil:
rados_release_write_op(self.write_op)

@requires(('flags', int))
def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
"""
Set flags for the last operation added to this write_op.
:para flags: flags to apply to the last operation
:type flags: int
"""

cdef:
int _flags = flags

with nogil:
rados_write_op_set_flags(self.write_op, _flags)



class WriteOpCtx(WriteOp, OpCtx):
"""write operation context manager"""
Expand All @@ -1706,6 +1726,20 @@ cdef class ReadOp(object):
with nogil:
rados_release_read_op(self.read_op)

@requires(('flags', int))
def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
"""
Set flags for the last operation added to this read_op.
:para flags: flags to apply to the last operation
:type flags: int
"""

cdef:
int _flags = flags

with nogil:
rados_read_op_set_flags(self.read_op, _flags)


class ReadOpCtx(ReadOp, OpCtx):
"""read operation context manager"""
Expand Down Expand Up @@ -2803,6 +2837,48 @@ returned %d, but should return zero on success." % (self.name, ret))
if ret != 0:
raise make_ex(ret, "Failed to operate write op for oid %s" % oid)

@requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
"""
excute the real write operation asynchronously
:para write_op: write operation object
:type write_op: WriteOp
:para oid: object name
:type oid: str
:param oncomplete: what to do when the remove is safe and complete in memory
on all replicas
:type oncomplete: completion
:param onsafe: what to do when the remove is safe and complete on storage
on all replicas
:type onsafe: completion
:para mtime: the time to set the mtime to, 0 for the current time
:type mtime: int
:para flags: flags to apply to the entire operation
:type flags: int
:raises: :class:`Error`
:returns: completion object
"""

oid = cstr(oid, 'oid')
cdef:
WriteOp _write_op = write_op
char *_oid = oid
Completion completion
time_t _mtime = mtime
int _flags = flags

completion = self.__get_completion(oncomplete, onsafe)
self.__track_completion(completion)

with nogil:
ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
&_mtime, _flags)
if ret != 0:
completion._cleanup()
raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
return completion

@requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
"""
Expand All @@ -2825,6 +2901,40 @@ returned %d, but should return zero on success." % (self.name, ret))
if ret != 0:
raise make_ex(ret, "Failed to operate read op for oid %s" % oid)

@requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
"""
excute the real read operation
:para read_op: read operation object
:type read_op: ReadOp
:para oid: object name
:type oid: str
:param oncomplete: what to do when the remove is safe and complete in memory
on all replicas
:type oncomplete: completion
:param onsafe: what to do when the remove is safe and complete on storage
on all replicas
:type onsafe: completion
:para flag: flags to apply to the entire operation
:type flag: int
"""
oid = cstr(oid, 'oid')
cdef:
ReadOp _read_op = read_op
char *_oid = oid
Completion completion
int _flag = flag

completion = self.__get_completion(oncomplete, onsafe)
self.__track_completion(completion)

with nogil:
ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
if ret != 0:
completion._cleanup()
raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
return completion

@requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
"""
Expand Down
41 changes: 39 additions & 2 deletions src/test/pybind/test_rados.py
Expand Up @@ -3,7 +3,7 @@
from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
ObjectNotFound, ObjectBusy, requires, opt,
ANONYMOUS_AUID, ADMIN_AUID, LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx,
LIBRADOS_SNAP_HEAD, MonitorLog)
LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog)
import time
import threading
import json
Expand Down Expand Up @@ -444,6 +444,7 @@ def test_set_omap(self):
values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
with WriteOpCtx(self.ioctx) as write_op:
self.ioctx.set_omap(write_op, keys, values)
write_op.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS)
self.ioctx.operate_write_op(write_op, "hw")
with ReadOpCtx(self.ioctx) as read_op:
iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
Expand All @@ -460,9 +461,45 @@ def test_set_omap(self):
with ReadOpCtx(self.ioctx) as read_op:
iter, ret = self.ioctx.get_omap_vals(read_op, "", "2", 4)
eq(ret, 0)
read_op.set_flags(LIBRADOS_OPERATION_BALANCE_READS)
self.ioctx.operate_read_op(read_op, "hw")
eq(list(iter), [("2", b"bbb")])

def test_set_omap_aio(self):
lock = threading.Condition()
count = [0]
def cb(blah):
with lock:
count[0] += 1
lock.notify()
return 0

keys = ("1", "2", "3", "4")
values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
with WriteOpCtx(self.ioctx) as write_op:
self.ioctx.set_omap(write_op, keys, values)
comp = self.ioctx.operate_aio_write_op(write_op, "hw", cb, cb)
comp.wait_for_complete()
comp.wait_for_safe()
with lock:
while count[0] < 2:
lock.wait()
eq(comp.get_return_value(), 0)

with ReadOpCtx(self.ioctx) as read_op:
iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
eq(ret, 0)
comp = self.ioctx.operate_aio_read_op(read_op, "hw", cb, cb)
comp.wait_for_complete()
comp.wait_for_safe()
with lock:
while count[0] < 4:
lock.wait()
eq(comp.get_return_value(), 0)
next(iter)
eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])


def test_get_omap_vals_by_keys(self):
keys = ("1", "2", "3", "4")
values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
Expand Down Expand Up @@ -845,4 +882,4 @@ def test_ceph_osd_pool_create_utf8(self):
ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
eq(ret, 0)
assert len(out) > 0
eq(u"pool '\u9ec5' created", out)
eq(u"pool '\u9ec5' created", out)

0 comments on commit acfbe38

Please sign in to comment.