Skip to content

Commit

Permalink
Merge pull request #414 from basho/feature/bch/write-once
Browse files Browse the repository at this point in the history
Add 2.1 Preflists and Write-Once Bucket Types

Reviewed-by: javajolt
  • Loading branch information
borshop committed Oct 20, 2015
2 parents 11ecadb + bdbc35f commit d73000b
Show file tree
Hide file tree
Showing 17 changed files with 258 additions and 25 deletions.
2 changes: 2 additions & 0 deletions .gitignore
@@ -1,4 +1,5 @@
*.pyc
.python-version

docs/_build

Expand All @@ -9,6 +10,7 @@ build/
dist/
riak.egg-info/
*.egg
.eggs/

#*#
*~
3 changes: 3 additions & 0 deletions commands.py
Expand Up @@ -81,6 +81,7 @@ class create_bucket_types(Command):
* `pytest-sets` with ``{"datatype":"set"}``
* `pytest-counters` with ``{"datatype":"counter"}``
* `pytest-consistent` with ``{"consistent":true}``
* `pytest-write-once` with ``{"write_once": true}``
* `pytest-mr`
* `pytest` with ``{"allow_mult":false}``
"""
Expand All @@ -96,6 +97,7 @@ class create_bucket_types(Command):
'pytest-sets': {'datatype': 'set'},
'pytest-counters': {'datatype': 'counter'},
'pytest-consistent': {'consistent': True},
'pytest-write-once': {'write_once': True},
'pytest-mr': {},
'pytest': {'allow_mult': False}
}
Expand Down Expand Up @@ -236,6 +238,7 @@ class setup_security(Command, security_commands):

_grants = {
"riak_kv.get": ["any"],
"riak_kv.get_preflist": ["any"],
"riak_kv.put": ["any"],
"riak_kv.delete": ["any"],
"riak_kv.index": ["any"],
Expand Down
10 changes: 10 additions & 0 deletions riak/bucket.py
Expand Up @@ -586,6 +586,16 @@ def update_counter(self, key, value, **kwargs):

increment_counter = update_counter

def get_preflist(self, key):
"""
Retrieve the preflist associated with a given bucket/key
:param key: Name of the key.
:type key: string
:rtype: list of dict()
"""
return self._client.get_preflist(self, key)

def __str__(self):
if self.bucket_type.is_default():
return '<RiakBucket {0!r}>'.format(self.name)
Expand Down
24 changes: 22 additions & 2 deletions riak/client/operations.py
Expand Up @@ -685,7 +685,8 @@ def stream_mapred(self, inputs, query, timeout):
stream.close()

@retryable
def create_search_index(self, transport, index, schema=None, n_val=None):
def create_search_index(self, transport, index, schema=None, n_val=None,
timeout=None):
"""
create_search_index(index, schema=None, n_val=None)
Expand All @@ -698,8 +699,10 @@ def create_search_index(self, transport, index, schema=None, n_val=None):
:type schema: string, None
:param n_val: this indexes N value
:type n_val: integer, None
:param timeout: optional timeout (in ms)
:type timeout: integer, None
"""
return transport.create_search_index(index, schema, n_val)
return transport.create_search_index(index, schema, n_val, timeout)

@retryable
def get_search_index(self, transport, index):
Expand Down Expand Up @@ -1000,6 +1003,23 @@ def update_datatype(self, datatype, w=None, dw=None, pw=None,
timeout=timeout,
include_context=include_context)

@retryable
def get_preflist(self, transport, bucket, key):
"""
Fetch the preflist for a given bucket and key.
.. note:: This request is automatically retried :attr:`retries`
times if it fails due to network error.
:param bucket: the bucket whose index will be queried
:type bucket: RiakBucket
:param key: the key of the preflist
:type key: string
:return: list of dicts (partition, node, primary)
"""
return transport.get_preflist(bucket, key)

def _bucket_type_bucket_builder(self, name, bucket_type):
"""
Build a bucket from a bucket type
Expand Down
5 changes: 4 additions & 1 deletion riak/tests/__init__.py
Expand Up @@ -60,9 +60,12 @@
SECURITY_CERT_PASSWD = os.environ.get('RIAK_TEST_SECURITY_CERT_PASSWD',
'certpass')

SECURITY_CIPHERS = 'DHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA:DHE-RSA-AES256-SHA256:DHE-RSA-AES256-SHA:AES128-SHA256:AES128-SHA:AES256-SHA256:AES256-SHA:RC4-SHA'

SECURITY_CREDS = None
if RUN_SECURITY:
SECURITY_CREDS = SecurityCreds(username=SECURITY_USER,
password=SECURITY_PASSWD,
cacert_file=SECURITY_CACERT)
cacert_file=SECURITY_CACERT,
ciphers=SECURITY_CIPHERS)
SKIP_DATATYPES = int(os.environ.get('SKIP_DATATYPES', '0'))
2 changes: 1 addition & 1 deletion riak/tests/test_all.py
Expand Up @@ -87,7 +87,7 @@ def setUpModule():
'index': 'mrbucket'}

for yz in (testrun_yz, testrun_yz_index, testrun_yz_mr):
c.create_search_index(yz['index'])
c.create_search_index(yz['index'], timeout=30000)
if yz['btype'] is not None:
t = c.bucket_type(yz['btype'])
b = t.bucket(yz['bucket'])
Expand Down
21 changes: 21 additions & 0 deletions riak/tests/test_btypes.py
Expand Up @@ -176,3 +176,24 @@ def test_multiget_bucket_types(self):
self.assertIsInstance(mobj, RiakObject)
self.assertEqual(bucket, mobj.bucket)
self.assertEqual(btype, mobj.bucket.bucket_type)

@unittest.skipIf(SKIP_BTYPES == '1', "SKIP_BTYPES is set")
def test_write_once_bucket_type(self):
btype = self.client.bucket_type('pytest-write-once')
btype.set_property('write_once', True)
bucket = btype.bucket(self.bucket_name)

for i in range(100):
obj = bucket.new(self.key_name + str(i))
obj.data = {'id': i}
obj.store()

mget = bucket.multiget([self.key_name + str(i) for i in range(100)])
for mobj in mget:
self.assertIsInstance(mobj, RiakObject)
self.assertEqual(bucket, mobj.bucket)
self.assertEqual(btype, mobj.bucket.bucket_type)

props = btype.get_properties()
self.assertIn('write_once', props)
self.assertEqual(True, props['write_once'])
35 changes: 35 additions & 0 deletions riak/tests/test_feature_detection.py
Expand Up @@ -60,6 +60,8 @@ def test_pre_10(self):
self.assertFalse(t.index_term_regex())
self.assertFalse(t.bucket_types())
self.assertFalse(t.datatypes())
self.assertFalse(t.preflists())
self.assertFalse(t.write_once())

def test_10(self):
t = DummyTransport("1.0.3")
Expand All @@ -77,6 +79,8 @@ def test_10(self):
self.assertFalse(t.index_term_regex())
self.assertFalse(t.bucket_types())
self.assertFalse(t.datatypes())
self.assertFalse(t.preflists())
self.assertFalse(t.write_once())

def test_11(self):
t = DummyTransport("1.1.4")
Expand All @@ -94,6 +98,8 @@ def test_11(self):
self.assertFalse(t.index_term_regex())
self.assertFalse(t.bucket_types())
self.assertFalse(t.datatypes())
self.assertFalse(t.preflists())
self.assertFalse(t.write_once())

def test_12(self):
t = DummyTransport("1.2.0")
Expand All @@ -111,6 +117,8 @@ def test_12(self):
self.assertFalse(t.index_term_regex())
self.assertFalse(t.bucket_types())
self.assertFalse(t.datatypes())
self.assertFalse(t.preflists())
self.assertFalse(t.write_once())

def test_12_loose(self):
t = DummyTransport("1.2.1p3")
Expand All @@ -128,6 +136,8 @@ def test_12_loose(self):
self.assertFalse(t.index_term_regex())
self.assertFalse(t.bucket_types())
self.assertFalse(t.datatypes())
self.assertFalse(t.preflists())
self.assertFalse(t.write_once())

def test_14(self):
t = DummyTransport("1.4.0rc1")
Expand All @@ -145,6 +155,8 @@ def test_14(self):
self.assertFalse(t.index_term_regex())
self.assertFalse(t.bucket_types())
self.assertFalse(t.datatypes())
self.assertFalse(t.preflists())
self.assertFalse(t.write_once())

def test_144(self):
t = DummyTransport("1.4.6")
Expand All @@ -162,6 +174,8 @@ def test_144(self):
self.assertTrue(t.index_term_regex())
self.assertFalse(t.bucket_types())
self.assertFalse(t.datatypes())
self.assertFalse(t.preflists())
self.assertFalse(t.write_once())

def test_20(self):
t = DummyTransport("2.0.1")
Expand All @@ -179,6 +193,27 @@ def test_20(self):
self.assertTrue(t.index_term_regex())
self.assertTrue(t.bucket_types())
self.assertTrue(t.datatypes())
self.assertFalse(t.preflists())
self.assertFalse(t.write_once())

def test_21(self):
t = DummyTransport("2.1.0")
self.assertTrue(t.phaseless_mapred())
self.assertTrue(t.pb_indexes())
self.assertTrue(t.pb_search())
self.assertTrue(t.pb_conditionals())
self.assertTrue(t.quorum_controls())
self.assertTrue(t.tombstone_vclocks())
self.assertTrue(t.pb_head())
self.assertTrue(t.pb_clear_bucket_props())
self.assertTrue(t.pb_all_bucket_props())
self.assertTrue(t.counters())
self.assertTrue(t.stream_indexes())
self.assertTrue(t.index_term_regex())
self.assertTrue(t.bucket_types())
self.assertTrue(t.datatypes())
self.assertTrue(t.preflists())
self.assertTrue(t.write_once())

if __name__ == '__main__':
unittest.main()
12 changes: 12 additions & 0 deletions riak/tests/test_kv.py
Expand Up @@ -579,6 +579,18 @@ def test_get_params(self):
basic_quorum=True)
self.assertFalse(missing.exists)

def test_preflist(self):
bucket = self.client.bucket(self.bucket_name)
bucket.new(self.key_name, data={"foo": "one",
"bar": "baz"}).store()
preflist = bucket.get_preflist(self.key_name)
preflist2 = self.client.get_preflist(bucket, self.key_name)
nodes = ['riak@127.0.0.1', 'dev1@127.0.0.1']
for pref in (preflist, preflist2):
self.assertEqual(len(pref), 3)
self.assertIn(pref[0]['node'], nodes)
[self.assertTrue(node['primary']) for node in pref]

def generate_siblings(self, original, count=5, delay=None):
vals = []
for _ in range(count):
Expand Down
37 changes: 23 additions & 14 deletions riak/tests/test_security.py
Expand Up @@ -20,7 +20,8 @@
import sys
from riak.tests import RUN_SECURITY, SECURITY_USER, SECURITY_PASSWD, \
SECURITY_CACERT, SECURITY_KEY, SECURITY_CERT, SECURITY_REVOKED, \
SECURITY_CERT_USER, SECURITY_CERT_PASSWD, SECURITY_BAD_CERT
SECURITY_CERT_USER, SECURITY_CERT_PASSWD, SECURITY_BAD_CERT, \
SECURITY_CREDS, SECURITY_CIPHERS
from riak.security import SecurityCreds
if sys.version_info < (2, 7):
unittest = __import__('unittest2')
Expand All @@ -31,10 +32,7 @@
class SecurityTests(object):
@unittest.skipIf(RUN_SECURITY, 'RUN_SECURITY is set')
def test_security_disabled(self):
creds = SecurityCreds(username=SECURITY_USER,
password=SECURITY_PASSWD,
cacert_file=SECURITY_CACERT)
client = self.create_client(credentials=creds)
client = self.create_client(credentials=SECURITY_CREDS)
myBucket = client.bucket('test')
val1 = "foobar"
key1 = myBucket.new('x', data=val1)
Expand All @@ -51,31 +49,39 @@ def test_security_basic_connection(self):

@unittest.skipUnless(RUN_SECURITY, 'RUN_SECURITY is not set')
def test_security_bad_user(self):
creds = SecurityCreds(username='foo', password=SECURITY_PASSWD,
cacert_file=SECURITY_CACERT)
creds = SecurityCreds(username='foo',
password=SECURITY_PASSWD,
cacert_file=SECURITY_CACERT,
ciphers=SECURITY_CIPHERS)
client = self.create_client(credentials=creds)
with self.assertRaises(Exception):
client.get_buckets()

@unittest.skipUnless(RUN_SECURITY, 'RUN_SECURITY is not set')
def test_security_bad_password(self):
creds = SecurityCreds(username=SECURITY_USER, password='foo',
cacert_file=SECURITY_CACERT)
creds = SecurityCreds(username=SECURITY_USER,
password='foo',
cacert_file=SECURITY_CACERT,
ciphers=SECURITY_CIPHERS)
client = self.create_client(credentials=creds)
with self.assertRaises(Exception):
client.get_buckets()

@unittest.skipUnless(RUN_SECURITY, 'RUN_SECURITY is not set')
def test_security_invalid_cert(self):
creds = SecurityCreds(username=SECURITY_USER, password=SECURITY_PASSWD,
cacert_file='/tmp/foo')
creds = SecurityCreds(username=SECURITY_USER,
password=SECURITY_PASSWD,
cacert_file='/tmp/foo',
ciphers=SECURITY_CIPHERS)
client = self.create_client(credentials=creds)
with self.assertRaises(Exception):
client.get_buckets()

@unittest.skipUnless(RUN_SECURITY, 'RUN_SECURITY is not set')
def test_security_password_without_cacert(self):
creds = SecurityCreds(username=SECURITY_USER, password=SECURITY_PASSWD)
creds = SecurityCreds(username=SECURITY_USER,
password=SECURITY_PASSWD,
ciphers=SECURITY_CIPHERS)
client = self.create_client(credentials=creds)
with self.assertRaises(Exception):
myBucket = client.bucket('test')
Expand All @@ -87,6 +93,7 @@ def test_security_password_without_cacert(self):
def test_security_cert_authentication(self):
creds = SecurityCreds(username=SECURITY_CERT_USER,
password=SECURITY_CERT_PASSWD,
ciphers=SECURITY_CIPHERS,
cert_file=SECURITY_CERT,
pkey_file=SECURITY_KEY,
cacert_file=SECURITY_CACERT)
Expand All @@ -107,6 +114,7 @@ def test_security_cert_authentication(self):
@unittest.skipUnless(RUN_SECURITY, 'RUN_SECURITY is not set')
def test_security_revoked_cert(self):
creds = SecurityCreds(username=SECURITY_USER, password=SECURITY_PASSWD,
ciphers=SECURITY_CIPHERS,
cacert_file=SECURITY_CACERT,
crl_file=SECURITY_REVOKED)
# Currently Python >= 2.7.9 and Python 3.x native CRL doesn't seem to
Expand All @@ -120,6 +128,7 @@ def test_security_revoked_cert(self):
@unittest.skipUnless(RUN_SECURITY, 'RUN_SECURITY is not set')
def test_security_bad_ca_cert(self):
creds = SecurityCreds(username=SECURITY_USER, password=SECURITY_PASSWD,
ciphers=SECURITY_CIPHERS,
cacert_file=SECURITY_BAD_CERT)
client = self.create_client(credentials=creds)
with self.assertRaises(Exception):
Expand All @@ -128,8 +137,8 @@ def test_security_bad_ca_cert(self):
@unittest.skipUnless(RUN_SECURITY, 'RUN_SECURITY is not set')
def test_security_ciphers(self):
creds = SecurityCreds(username=SECURITY_USER, password=SECURITY_PASSWD,
cacert_file=SECURITY_CACERT,
ciphers='DHE-RSA-AES256-SHA')
ciphers=SECURITY_CIPHERS,
cacert_file=SECURITY_CACERT)
client = self.create_client(credentials=creds)
myBucket = client.bucket('test')
val1 = "foobar"
Expand Down
20 changes: 19 additions & 1 deletion riak/transports/feature_detect.py
Expand Up @@ -26,7 +26,9 @@
1.2: LooseVersion("1.2.0"),
1.4: LooseVersion("1.4.0"),
1.44: LooseVersion("1.4.4"),
2.0: LooseVersion("2.0.0")
2.0: LooseVersion("2.0.0"),
2.1: LooseVersion("2.1.0"),
2.12: LooseVersion("2.1.2")
}


Expand Down Expand Up @@ -192,6 +194,22 @@ def datatypes(self):
"""
return self.server_version >= versions[2.0]

def preflists(self):
"""
Whether bucket/key preflists are supported.
:rtype: bool
"""
return self.server_version >= versions[2.1]

def write_once(self):
"""
Whether write-once operations are supported.
:rtype: bool
"""
return self.server_version >= versions[2.1]

@lazy_property
def server_version(self):
return LooseVersion(self._server_version())

0 comments on commit d73000b

Please sign in to comment.