diff --git a/src/pybind/rados/rados.pyx b/src/pybind/rados/rados.pyx index 955d2bdf404a3..5de11509acf16 100644 --- a/src/pybind/rados/rados.pyx +++ b/src/pybind/rados/rados.pyx @@ -22,6 +22,7 @@ import sys import threading import time +from collections import Callable from datetime import datetime from functools import partial, wraps from itertools import chain @@ -80,6 +81,8 @@ cdef extern from "rados/librados.h" nogil: _LIBRADOS_OPERATION_IGNORE_CACHE "LIBRADOS_OPERATION_IGNORE_CACHE" _LIBRADOS_OPERATION_SKIPRWLOCKS "LIBRADOS_OPERATION_SKIPRWLOCKS" _LIBRADOS_OPERATION_IGNORE_OVERLAY "LIBRADOS_OPERATION_IGNORE_OVERLAY" + _LIBRADOS_CREATE_EXCLUSIVE "LIBRADOS_CREATE_EXCLUSIVE" + _LIBRADOS_CREATE_IDEMPOTENT "LIBRADOS_CREATE_IDEMPOTENT" cdef uint64_t _LIBRADOS_SNAP_HEAD "LIBRADOS_SNAP_HEAD" @@ -237,13 +240,26 @@ cdef extern from "rados/librados.h" nogil: const char * in_buf, size_t in_len, char * buf, size_t out_len) int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags) + int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags) void rados_write_op_omap_set(rados_write_op_t write_op, const char * const* keys, const char * const* vals, const size_t * lens, size_t num) void rados_write_op_omap_rm_keys(rados_write_op_t write_op, const char * const* keys, size_t keys_len) void rados_write_op_omap_clear(rados_write_op_t write_op) + void rados_write_op_set_flags(rados_write_op_t write_op, int flags) + + void rados_write_op_create(rados_write_op_t write_op, int exclusive, const char *category) + void rados_write_op_append(rados_write_op_t write_op, const char *buffer, size_t len) + void rados_write_op_write_full(rados_write_op_t write_op, const char *buffer, size_t len) + void rados_write_op_write(rados_write_op_t write_op, const char *buffer, size_t len, uint64_t offset) + void rados_write_op_remove(rados_write_op_t write_op) + void rados_write_op_truncate(rados_write_op_t write_op, uint64_t offset) + void rados_write_op_zero(rados_write_op_t write_op, uint64_t offset, uint64_t len) + void rados_read_op_omap_get_vals(rados_read_op_t read_op, const char * start_after, const char * filter_prefix, uint64_t max_return, rados_omap_iter_t * iter, int * prval) void rados_read_op_omap_get_keys(rados_read_op_t read_op, const char * start_after, uint64_t max_return, rados_omap_iter_t * iter, int * prval) void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, const char * const* keys, size_t keys_len, rados_omap_iter_t * iter, int * prval) int rados_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, const char * oid, int flags) + int rados_aio_read_op_operate(rados_read_op_t read_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, int flags) + void rados_read_op_set_flags(rados_read_op_t read_op, int flags) int rados_omap_get_next(rados_omap_iter_t iter, const char * const* key, const char * const* val, size_t * len) void rados_omap_get_end(rados_omap_iter_t iter) @@ -268,6 +284,9 @@ LIBRADOS_OPERATION_IGNORE_OVERLAY = _LIBRADOS_OPERATION_IGNORE_OVERLAY LIBRADOS_ALL_NSPACES = _LIBRADOS_ALL_NSPACES.decode('utf-8') +LIBRADOS_CREATE_EXCLUSIVE = _LIBRADOS_CREATE_EXCLUSIVE +LIBRADOS_CREATE_IDEMPOTENT = _LIBRADOS_CREATE_IDEMPOTENT + ANONYMOUS_AUID = 0xffffffffffffffff ADMIN_AUID = 0 @@ -1689,6 +1708,119 @@ cdef class WriteOp(object): with nogil: rados_release_write_op(self.write_op) + @requires(('exclusive', opt(int))) + def new(self, exclusive=None): + """ + Create the object. + """ + + cdef: + int _exclusive = exclusive + + with nogil: + rados_write_op_create(self.write_op, _exclusive, NULL) + + + def remove(self): + """ + Remove object. + """ + with nogil: + rados_write_op_remove(self.write_op) + + @requires(('flags', int)) + def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG): + """ + Set flags for the last operation added to this write_op. + :para flags: flags to apply to the last operation + :type flags: int + """ + + cdef: + int _flags = flags + + with nogil: + rados_write_op_set_flags(self.write_op, _flags) + + @requires(('to_write', bytes)) + def append(self, to_write): + """ + Append data to an object synchronously + :param to_write: data to write + :type to_write: bytes + """ + + cdef: + char *_to_write = to_write + size_t length = len(to_write) + + with nogil: + rados_write_op_append(self.write_op, _to_write, length) + + @requires(('to_write', bytes)) + def write_full(self, to_write): + """ + Write whole object, atomically replacing it. + :param to_write: data to write + :type to_write: bytes + """ + + cdef: + char *_to_write = to_write + size_t length = len(to_write) + + with nogil: + rados_write_op_write_full(self.write_op, _to_write, length) + + @requires(('to_write', bytes), ('offset', int)) + def write(self, to_write, offset=0): + """ + Write to offset. + :param to_write: data to write + :type to_write: bytes + :param offset: byte offset in the object to begin writing at + :type offset: int + """ + + cdef: + char *_to_write = to_write + size_t length = len(to_write) + uint64_t _offset = offset + + with nogil: + rados_write_op_write(self.write_op, _to_write, length, _offset) + + @requires(('offset', int), ('length', int)) + def zero(self, offset, length): + """ + Zero part of an object. + :param offset: byte offset in the object to begin writing at + :type offset: int + :param offset: number of zero to write + :type offset: int + """ + + cdef: + size_t _length = length + uint64_t _offset = offset + + with nogil: + rados_write_op_zero(self.write_op, _length, _offset) + + @requires(('offset', int)) + def truncate(self, offset): + """ + Truncate an object. + :param offset: byte offset in the object to begin truncating at + :type offset: int + """ + + cdef: + uint64_t _offset = offset + + with nogil: + rados_write_op_truncate(self.write_op, _offset) + class WriteOpCtx(WriteOp, OpCtx): """write operation context manager""" @@ -1706,6 +1838,20 @@ cdef class ReadOp(object): with nogil: rados_release_read_op(self.read_op) + @requires(('flags', int)) + def set_flags(self, flags=LIBRADOS_OPERATION_NOFLAG): + """ + Set flags for the last operation added to this read_op. + :para flags: flags to apply to the last operation + :type flags: int + """ + + cdef: + int _flags = flags + + with nogil: + rados_read_op_set_flags(self.read_op, _flags) + class ReadOpCtx(ReadOp, OpCtx): """read operation context manager""" @@ -2803,6 +2949,48 @@ returned %d, but should return zero on success." % (self.name, ret)) if ret != 0: raise make_ex(ret, "Failed to operate write op for oid %s" % oid) + @requires(('write_op', WriteOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('mtime', opt(int)), ('flags', opt(int))) + def operate_aio_write_op(self, write_op, oid, oncomplete=None, onsafe=None, mtime=0, flags=LIBRADOS_OPERATION_NOFLAG): + """ + excute the real write operation asynchronously + :para write_op: write operation object + :type write_op: WriteOp + :para oid: object name + :type oid: str + :param oncomplete: what to do when the remove is safe and complete in memory + on all replicas + :type oncomplete: completion + :param onsafe: what to do when the remove is safe and complete on storage + on all replicas + :type onsafe: completion + :para mtime: the time to set the mtime to, 0 for the current time + :type mtime: int + :para flags: flags to apply to the entire operation + :type flags: int + + :raises: :class:`Error` + :returns: completion object + """ + + oid = cstr(oid, 'oid') + cdef: + WriteOp _write_op = write_op + char *_oid = oid + Completion completion + time_t _mtime = mtime + int _flags = flags + + completion = self.__get_completion(oncomplete, onsafe) + self.__track_completion(completion) + + with nogil: + ret = rados_aio_write_op_operate(_write_op.write_op, self.io, completion.rados_comp, _oid, + &_mtime, _flags) + if ret != 0: + completion._cleanup() + raise make_ex(ret, "Failed to operate aio write op for oid %s" % oid) + return completion + @requires(('read_op', ReadOp), ('oid', str_type), ('flag', opt(int))) def operate_read_op(self, read_op, oid, flag=LIBRADOS_OPERATION_NOFLAG): """ @@ -2825,6 +3013,40 @@ returned %d, but should return zero on success." % (self.name, ret)) if ret != 0: raise make_ex(ret, "Failed to operate read op for oid %s" % oid) + @requires(('read_op', ReadOp), ('oid', str_type), ('oncomplete', opt(Callable)), ('onsafe', opt(Callable)), ('flag', opt(int))) + def operate_aio_read_op(self, read_op, oid, oncomplete=None, onsafe=None, flag=LIBRADOS_OPERATION_NOFLAG): + """ + excute the real read operation + :para read_op: read operation object + :type read_op: ReadOp + :para oid: object name + :type oid: str + :param oncomplete: what to do when the remove is safe and complete in memory + on all replicas + :type oncomplete: completion + :param onsafe: what to do when the remove is safe and complete on storage + on all replicas + :type onsafe: completion + :para flag: flags to apply to the entire operation + :type flag: int + """ + oid = cstr(oid, 'oid') + cdef: + ReadOp _read_op = read_op + char *_oid = oid + Completion completion + int _flag = flag + + completion = self.__get_completion(oncomplete, onsafe) + self.__track_completion(completion) + + with nogil: + ret = rados_aio_read_op_operate(_read_op.read_op, self.io, completion.rados_comp, _oid, _flag) + if ret != 0: + completion._cleanup() + raise make_ex(ret, "Failed to operate aio read op for oid %s" % oid) + return completion + @requires(('read_op', ReadOp), ('start_after', str_type), ('filter_prefix', str_type), ('max_return', int)) def get_omap_vals(self, read_op, start_after, filter_prefix, max_return): """ diff --git a/src/test/pybind/test_rados.py b/src/test/pybind/test_rados.py index d9b40c1f6c937..42f17fb031caf 100644 --- a/src/test/pybind/test_rados.py +++ b/src/test/pybind/test_rados.py @@ -3,7 +3,7 @@ from rados import (Rados, Error, RadosStateError, Object, ObjectExists, ObjectNotFound, ObjectBusy, requires, opt, ANONYMOUS_AUID, ADMIN_AUID, LIBRADOS_ALL_NSPACES, WriteOpCtx, ReadOpCtx, - LIBRADOS_SNAP_HEAD, MonitorLog) + LIBRADOS_SNAP_HEAD, LIBRADOS_OPERATION_BALANCE_READS, LIBRADOS_OPERATION_SKIPRWLOCKS, MonitorLog) import time import threading import json @@ -444,6 +444,7 @@ def test_set_omap(self): values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04") with WriteOpCtx(self.ioctx) as write_op: self.ioctx.set_omap(write_op, keys, values) + write_op.set_flags(LIBRADOS_OPERATION_SKIPRWLOCKS) self.ioctx.operate_write_op(write_op, "hw") with ReadOpCtx(self.ioctx) as read_op: iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4) @@ -460,9 +461,70 @@ def test_set_omap(self): with ReadOpCtx(self.ioctx) as read_op: iter, ret = self.ioctx.get_omap_vals(read_op, "", "2", 4) eq(ret, 0) + read_op.set_flags(LIBRADOS_OPERATION_BALANCE_READS) self.ioctx.operate_read_op(read_op, "hw") eq(list(iter), [("2", b"bbb")]) + def test_set_omap_aio(self): + lock = threading.Condition() + count = [0] + def cb(blah): + with lock: + count[0] += 1 + lock.notify() + return 0 + + keys = ("1", "2", "3", "4") + values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04") + with WriteOpCtx(self.ioctx) as write_op: + self.ioctx.set_omap(write_op, keys, values) + comp = self.ioctx.operate_aio_write_op(write_op, "hw", cb, cb) + comp.wait_for_complete() + comp.wait_for_safe() + with lock: + while count[0] < 2: + lock.wait() + eq(comp.get_return_value(), 0) + + with ReadOpCtx(self.ioctx) as read_op: + iter, ret = self.ioctx.get_omap_vals(read_op, "", "", 4) + eq(ret, 0) + comp = self.ioctx.operate_aio_read_op(read_op, "hw", cb, cb) + comp.wait_for_complete() + comp.wait_for_safe() + with lock: + while count[0] < 4: + lock.wait() + eq(comp.get_return_value(), 0) + next(iter) + eq(list(iter), [("2", b"bbb"), ("3", b"ccc"), ("4", b"\x04\x04\x04\x04")]) + + def test_write_ops(self): + with WriteOpCtx(self.ioctx) as write_op: + write_op.new(0) + self.ioctx.operate_write_op(write_op, "write_ops") + eq(self.ioctx.read('write_ops'), b'') + write_op.write_full(b'1') + write_op.append(b'2') + self.ioctx.operate_write_op(write_op, "write_ops") + eq(self.ioctx.read('write_ops'), b'12') + write_op.write_full(b'12345') + write_op.write(b'x', 2) + self.ioctx.operate_write_op(write_op, "write_ops") + eq(self.ioctx.read('write_ops'), b'12x45') + write_op.write_full(b'12345') + write_op.zero(2, 2) + self.ioctx.operate_write_op(write_op, "write_ops") + eq(self.ioctx.read('write_ops'), b'12\x00\x005') + write_op.write_full(b'12345') + write_op.truncate(2) + self.ioctx.operate_write_op(write_op, "write_ops") + eq(self.ioctx.read('write_ops'), b'12') + write_op.remove() + self.ioctx.operate_write_op(write_op, "write_ops") + with assert_raises(ObjectNotFound): + self.ioctx.read('write_ops') + def test_get_omap_vals_by_keys(self): keys = ("1", "2", "3", "4") values = (b"aaa", b"bbb", b"ccc", b"\x04\x04\x04\x04") @@ -845,4 +907,4 @@ def test_ceph_osd_pool_create_utf8(self): ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'') eq(ret, 0) assert len(out) > 0 - eq(u"pool '\u9ec5' created", out) + eq(u"pool '\u9ec5' created", out) \ No newline at end of file