Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python-rados: extends ReadOp/WriteOp API #9944

Merged
merged 2 commits into from Aug 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
222 changes: 222 additions & 0 deletions src/pybind/rados/rados.pyx
Expand Up @@ -22,6 +22,7 @@ import sys
import threading
import time

from collections import Callable
from datetime import datetime
from functools import partial, wraps
from itertools import chain
Expand Down Expand Up @@ -80,6 +81,8 @@ cdef extern from "rados/librados.h" nogil:
_LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE"
_LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS"
_LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY"
_LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE"
_LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT"

cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD"

Expand Down Expand Up @@ -237,13 +240,26 @@ cdef extern from "rados/librados.h" nogil:
const char * in_buf, size_t in_len, char * buf, size_t out_len)

int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num)
void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len)
void rados_write_op_omap_clear(rados_write_op_t write_op)
void rados_write_op_set_flags(rados_write_op_t write_op, int flags)

void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category)
void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len)
void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len)
void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset)
void rados_write_op_remove(rados_write_op_t write_op)
void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset)
void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len)

void rados_read_op_omap_get_vals(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, int * prval)
void rados_read_op_omap_get_keys(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, int * prval)
void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval)
int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags)
int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags)
void rados_read_op_set_flags(rados_read_op_t read_op, int flags)
int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len)
void rados_omap_get_end(rados_omap_iter_t iter)

Expand All @@ -268,6 +284,9 @@ LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY

LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8')

LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE
LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT

ANONYMOUS_AUID = 0xffffffffffffffff
ADMIN_AUID = 0

Expand Down Expand Up @@ -1689,6 +1708,119 @@ cdef class WriteOp(object):
with nogil:
rados_release_write_op(self.write_op)

@requires(('exclusive', opt(int)))
def new(self, exclusive=None):
"""
Create the object.
"""

cdef:
int _exclusive = exclusive

with nogil:
rados_write_op_create(self.write_op, _exclusive, NULL)


def remove(self):
"""
Remove object.
"""
with nogil:
rados_write_op_remove(self.write_op)

@requires(('flags', int))
def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
"""
Set flags for the last operation added to this write_op.
:para flags: flags to apply to the last operation
:type flags: int
"""

cdef:
int _flags = flags

with nogil:
rados_write_op_set_flags(self.write_op, _flags)

@requires(('to_write', bytes))
def append(self, to_write):
"""
Append data to an object synchronously
:param to_write: data to write
:type to_write: bytes
"""

cdef:
char *_to_write = to_write
size_t length = len(to_write)

with nogil:
rados_write_op_append(self.write_op, _to_write, length)

@requires(('to_write', bytes))
def write_full(self, to_write):
"""
Write whole object, atomically replacing it.
:param to_write: data to write
:type to_write: bytes
"""

cdef:
char *_to_write = to_write
size_t length = len(to_write)

with nogil:
rados_write_op_write_full(self.write_op, _to_write, length)

@requires(('to_write', bytes), ('offset', int))
def write(self, to_write, offset=0):
"""
Write to offset.
:param to_write: data to write
:type to_write: bytes
:param offset: byte offset in the object to begin writing at
:type offset: int
"""

cdef:
char *_to_write = to_write
size_t length = len(to_write)
uint64_t _offset = offset

with nogil:
rados_write_op_write(self.write_op, _to_write, length, _offset)

@requires(('offset', int), ('length', int))
def zero(self, offset, length):
"""
Zero part of an object.
:param offset: byte offset in the object to begin writing at
:type offset: int
:param offset: number of zero to write
:type offset: int
"""

cdef:
size_t _length = length
uint64_t _offset = offset

with nogil:
rados_write_op_zero(self.write_op, _length, _offset)

@requires(('offset', int))
def truncate(self, offset):
"""
Truncate an object.
:param offset: byte offset in the object to begin truncating at
:type offset: int
"""

cdef:
uint64_t _offset = offset

with nogil:
rados_write_op_truncate(self.write_op, _offset)


class WriteOpCtx(WriteOp, OpCtx):
"""write operation context manager"""
Expand All @@ -1706,6 +1838,20 @@ cdef class ReadOp(object):
with nogil:
rados_release_read_op(self.read_op)

@requires(('flags', int))
def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG):
"""
Set flags for the last operation added to this read_op.
:para flags: flags to apply to the last operation
:type flags: int
"""

cdef:
int _flags = flags

with nogil:
rados_read_op_set_flags(self.read_op, _flags)


class ReadOpCtx(ReadOp, OpCtx):
"""read operation context manager"""
Expand Down Expand Up @@ -2803,6 +2949,48 @@ returned %d, but should return zero on success." % (self.name, ret))
if ret != 0:
raise make_ex(ret, "Failed to operate write op for oid %s" % oid)

@requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int)))
def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG):
"""
excute the real write operation asynchronously
:para write_op: write operation object
:type write_op: WriteOp
:para oid: object name
:type oid: str
:param oncomplete: what to do when the remove is safe and complete in memory
on all replicas
:type oncomplete: completion
:param onsafe: what to do when the remove is safe and complete on storage
on all replicas
:type onsafe: completion
:para mtime: the time to set the mtime to, 0 for the current time
:type mtime: int
:para flags: flags to apply to the entire operation
:type flags: int

:raises: :class:`Error`
:returns: completion object
"""

oid = cstr(oid, 'oid')
cdef:
WriteOp _write_op = write_op
char *_oid = oid
Completion completion
time_t _mtime = mtime
int _flags = flags

completion = self.__get_completion(oncomplete, onsafe)
self.__track_completion(completion)

with nogil:
ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid,
&_mtime, _flags)
if ret != 0:
completion._cleanup()
raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid)
return completion

@requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int)))
def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG):
"""
Expand All @@ -2825,6 +3013,40 @@ returned %d, but should return zero on success." % (self.name, ret))
if ret != 0:
raise make_ex(ret, "Failed to operate read op for oid %s" % oid)

@requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int)))
def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG):
"""
excute the real read operation
:para read_op: read operation object
:type read_op: ReadOp
:para oid: object name
:type oid: str
:param oncomplete: what to do when the remove is safe and complete in memory
on all replicas
:type oncomplete: completion
:param onsafe: what to do when the remove is safe and complete on storage
on all replicas
:type onsafe: completion
:para flag: flags to apply to the entire operation
:type flag: int
"""
oid = cstr(oid, 'oid')
cdef:
ReadOp _read_op = read_op
char *_oid = oid
Completion completion
int _flag = flag

completion = self.__get_completion(oncomplete, onsafe)
self.__track_completion(completion)

with nogil:
ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag)
if ret != 0:
completion._cleanup()
raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid)
return completion

@requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int))
def get_omap_vals(self, read_op, start_after, filter_prefix, max_return):
"""
Expand Down
66 changes: 64 additions & 2 deletions src/test/pybind/test_rados.py
Expand Up @@ -3,7 +3,7 @@
from rados import (Rados, Error, RadosStateError, Object, ObjectExists,
ObjectNotFound, ObjectBusy, requires, opt,
ANONYMOUS_AUID, ADMIN_AUID, LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx,
LIBRADOS_SNAP_HEAD, MonitorLog)
LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog)
import time
import threading
import json
Expand Down Expand Up @@ -444,6 +444,7 @@ def test_set_omap(self):
values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
with WriteOpCtx(self.ioctx) as write_op:
self.ioctx.set_omap(write_op, keys, values)
write_op.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS)
self.ioctx.operate_write_op(write_op, "hw")
with ReadOpCtx(self.ioctx) as read_op:
iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
Expand All @@ -460,9 +461,70 @@ def test_set_omap(self):
with ReadOpCtx(self.ioctx) as read_op:
iter, ret = self.ioctx.get_omap_vals(read_op, "", "2", 4)
eq(ret, 0)
read_op.set_flags(LIBRADOS_OPERATION_BALANCE_READS)
self.ioctx.operate_read_op(read_op, "hw")
eq(list(iter), [("2", b"bbb")])

def test_set_omap_aio(self):
lock = threading.Condition()
count = [0]
def cb(blah):
with lock:
count[0] += 1
lock.notify()
return 0

keys = ("1", "2", "3", "4")
values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
with WriteOpCtx(self.ioctx) as write_op:
self.ioctx.set_omap(write_op, keys, values)
comp = self.ioctx.operate_aio_write_op(write_op, "hw", cb, cb)
comp.wait_for_complete()
comp.wait_for_safe()
with lock:
while count[0] < 2:
lock.wait()
eq(comp.get_return_value(), 0)

with ReadOpCtx(self.ioctx) as read_op:
iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4)
eq(ret, 0)
comp = self.ioctx.operate_aio_read_op(read_op, "hw", cb, cb)
comp.wait_for_complete()
comp.wait_for_safe()
with lock:
while count[0] < 4:
lock.wait()
eq(comp.get_return_value(), 0)
next(iter)
eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")])

def test_write_ops(self):
with WriteOpCtx(self.ioctx) as write_op:
write_op.new(0)
self.ioctx.operate_write_op(write_op, "write_ops")
eq(self.ioctx.read('write_ops'), b'')
write_op.write_full(b'1')
write_op.append(b'2')
self.ioctx.operate_write_op(write_op, "write_ops")
eq(self.ioctx.read('write_ops'), b'12')
write_op.write_full(b'12345')
write_op.write(b'x', 2)
self.ioctx.operate_write_op(write_op, "write_ops")
eq(self.ioctx.read('write_ops'), b'12x45')
write_op.write_full(b'12345')
write_op.zero(2, 2)
self.ioctx.operate_write_op(write_op, "write_ops")
eq(self.ioctx.read('write_ops'), b'12\x00\x005')
write_op.write_full(b'12345')
write_op.truncate(2)
self.ioctx.operate_write_op(write_op, "write_ops")
eq(self.ioctx.read('write_ops'), b'12')
write_op.remove()
self.ioctx.operate_write_op(write_op, "write_ops")
with assert_raises(ObjectNotFound):
self.ioctx.read('write_ops')

def test_get_omap_vals_by_keys(self):
keys = ("1", "2", "3", "4")
values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04")
Expand Down Expand Up @@ -845,4 +907,4 @@ def test_ceph_osd_pool_create_utf8(self):
ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
eq(ret, 0)
assert len(out) > 0
eq(u"pool '\u9ec5' created", out)
eq(u"pool '\u9ec5' created", out)