Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions riak/client/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,85 @@ def stream_mapred(self, inputs, query, timeout):
finally:
stream.close()

@retryable
def create_search_index(self, transport, index, schema=None):
"""
create_search_index(index, schema)

Create a search index of the given name, and optionally set
a schema. If no schema is set, the default will be used.

:param index: the name of the index to create
:type index: string
:param schema: the schema that this index will follow
:type schema: string, None
"""
return transport.create_search_index(index, schema)

@retryable
def get_search_index(self, transport, index):
"""
get_search_index(index)

Gets a search index of the given name if it exists,
which will also return the schema. Raises a RiakError
if no such schema exists.

:param index: the name of the index to create
:type index: string
"""
return transport.get_search_index(index)

@retryable
def list_search_indexes(self, transport):
"""
list_search_indexes(bucket)

Gets all search indexes and their schemas. Returns
a blank list if none exist
"""
return transport.list_search_indexes()

@retryable
def delete_search_index(self, transport, index):
"""
delete_search_index(index)

Delete the search index that matches the given name.

:param index: the name of the index to delete
:type index: string
"""
return transport.delete_search_index(index)

@retryable
def create_search_schema(self, transport, schema, content):
"""
create_search_schema(schema, content)

Creates a solr schema of the given name and content.
Content must be valid solr schema xml.

:param schema: the name of the schema to create
:type schema: string
:param schema: the solr schema xml content
:type schema: string
"""
return transport.create_search_schema(schema, content)

@retryable
def get_search_schema(self, transport, schema):
"""
get_search_schema(schema)

Gets a search schema of the given name if it exists.
Raises a RiakError if no such schema exists.

:param schema: the name of the schema to get
:type schema: string
"""
return transport.get_search_schema(schema)

@retryable
def fulltext_search(self, transport, index, query, **params):
"""
Expand Down
14 changes: 11 additions & 3 deletions riak/tests/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from riak.test_server import TestServer

from riak.tests.test_yokozuna import YZSearchTests
from riak.tests.test_search import SearchTests, \
EnableSearchTests, SolrSearchTests
from riak.tests.test_mapreduce import MapReduceAliasTests, \
Expand Down Expand Up @@ -64,15 +65,14 @@ def setUpModule():
testrun_sibs_bucket = 'sibsbucket'
c.bucket(testrun_sibs_bucket).allow_mult = True

if not int(os.environ.get('SKIP_SEARCH', '0')):
if not int(os.environ.get('SKIP_SEARCH', '0')) and not int(os.environ.get('RUN_YZ', '1')):
testrun_search_bucket = 'searchbucket'
b = c.bucket(testrun_search_bucket)
b.enable_search()


def tearDownModule():
c = RiakClient(protocol='http', host=HTTP_HOST, http_port=HTTP_PORT)
if not int(os.environ.get('SKIP_SEARCH', '0')):
if not int(os.environ.get('SKIP_SEARCH', '0')) and not int(os.environ.get('RUN_YZ', '1')):
b = c.bucket(testrun_search_bucket)
b.clear_properties()
b = c.bucket(testrun_sibs_bucket)
Expand Down Expand Up @@ -216,6 +216,7 @@ class RiakPbcTransportTestCase(BasicKVTests,
MapReduceStreamTests,
EnableSearchTests,
SearchTests,
YZSearchTests,
ClientTests,
CounterTests,
BaseTestCase,
Expand All @@ -229,6 +230,13 @@ def setUp(self):
self.protocol = 'pbc'
self.http_client = self.create_client(HTTP_HOST,
http_port=HTTP_PORT)
# Only supporting yokozuna via PBC
if int(os.environ.get('RUN_YZ', '0')):
testrun_yz_bucket = 'yztest'
self.http_client.create_search_index(testrun_yz_bucket)
b = self.http_client.bucket(testrun_yz_bucket)
b.set_property('yz_index', testrun_yz_bucket)

super(RiakPbcTransportTestCase, self).setUp()

def test_uses_client_id_if_given(self):
Expand Down
2 changes: 1 addition & 1 deletion riak/tests/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
else:
import unittest

SKIP_SEARCH = int(os.environ.get('SKIP_SEARCH', '0'))
SKIP_SEARCH = int(os.environ.get('SKIP_SEARCH', '0')) or int(os.environ.get('RUN_YZ', '1'))


class EnableSearchTests(object):
Expand Down
136 changes: 136 additions & 0 deletions riak/tests/test_yokozuna.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
import os
import platform
import time
if platform.python_version() < '2.7':
unittest = __import__('unittest2')
else:
import unittest

RUN_YZ = int(os.environ.get('RUN_YZ', '0'))


class YZSearchTests(object):
@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
def test_yz_search_from_bucket(self):
bucket = self.client.bucket('yztest')
bucket.new("user", {"user_s": "Z"}).store()
time.sleep(1)
results = bucket.search("user_s:Z")
self.assertEquals(1, len(results['docs']))
# TODO: check that docs return useful info
result = results['docs'][0]
self.assertEquals(True, result.has_key('_yz_rk'))
self.assertEquals(u'user', result['_yz_rk'])
self.assertEquals(True, result.has_key('_yz_rb'))
self.assertEquals(u'yztest', result['_yz_rb'])
self.assertEquals(True, result.has_key('score'))
self.assertEquals(True, result.has_key('user_s'))
self.assertEquals(u'Z', result['user_s'])

@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
def test_yz_get_search_index(self):
index = self.client.get_search_index('yztest')
self.assertEquals('yztest', index['name'])
self.assertEquals('_yz_default', index['schema'])
self.assertRaises(Exception, self.client.get_search_index, 'NOTyztest')

@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
def test_yz_delete_search_index(self):
testrun_yz_bucket = 'yztest'
# expected to fail, since there's an attached bucket
self.assertRaises(Exception,
self.client.delete_search_index, testrun_yz_bucket)
# detatch bucket from index then delete
b = self.client.bucket(testrun_yz_bucket)
b.set_property('yz_index', '')
resp = self.client.delete_search_index(testrun_yz_bucket)
self.assertEquals(True, resp)
# create it again
self.client.create_search_index(testrun_yz_bucket)
b = self.client.bucket(testrun_yz_bucket)
b.set_property('yz_index', testrun_yz_bucket)
time.sleep(1) # wait for index to apply

@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
def test_yz_list_search_indexes(self):
indexes = self.client.list_search_indexes()
self.assertEquals(1, len(indexes))

@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
def test_yz_create_schema(self):
content = """<?xml version="1.0" encoding="UTF-8" ?>
<schema name="test" version="1.5">
<fields>
<field name="_yz_id" type="_yz_str" indexed="true" stored="true" required="true" />
<field name="_yz_ed" type="_yz_str" indexed="true" stored="true"/>
<field name="_yz_pn" type="_yz_str" indexed="true" stored="true"/>
<field name="_yz_fpn" type="_yz_str" indexed="true" stored="true"/>
<field name="_yz_vtag" type="_yz_str" indexed="true" stored="true"/>
<field name="_yz_node" type="_yz_str" indexed="true" stored="true"/>
<field name="_yz_rk" type="_yz_str" indexed="true" stored="true"/>
<field name="_yz_rb" type="_yz_str" indexed="true" stored="true"/>
</fields>
<uniqueKey>_yz_id</uniqueKey>
<types>
<fieldType name="_yz_str" class="solr.StrField" sortMissingLast="true" />
</types>
</schema>"""
schema_name = 'yzgoodschema'
resp = self.client.create_search_schema(schema_name, content)
self.assertEquals(True, resp)
schema = self.client.get_search_schema(schema_name)
self.assertEquals(schema_name, schema['name'])
self.assertEquals(content, schema['content'])

@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
def test_yz_create_bad_schema(self):
bad_content = """
<derp nope nope, how do i computer?
"""
self.assertRaises(Exception, self.client.create_search_schema,
'yzbadschema', bad_content)


@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
def test_yz_search_queries(self):
bucket = self.client.bucket('yztest')
bucket.new("Z", {"username_s": "Z", "name_s": "ryan", "age_i":30}).store()
bucket.new("R", {"username_s": "R", "name_s": "eric", "age_i":34}).store()
bucket.new("F", {"username_s": "F", "name_s": "bryan fink", "age_i":32}).store()
bucket.new("H", {"username_s": "H", "name_s": "brett", "age_i":14}).store()
time.sleep(1)
# multiterm
results = bucket.search("username_s:(F OR H)")
self.assertEquals(2, len(results['docs']))
# boolean
results = bucket.search("username_s:Z AND name_s:ryan")
self.assertEquals(1, len(results['docs']))
# range
results = bucket.search("age_i:[30 TO 33]")
self.assertEquals(2, len(results['docs']))
# phrase
results = bucket.search('name_s:"bryan fink"')
self.assertEquals(1, len(results['docs']))
# wildcard
results = bucket.search('name_s:*ryan*')
self.assertEquals(2, len(results['docs']))
# regexp
results = bucket.search('name_s:/br.*/')
self.assertEquals(2, len(results['docs']))
# Parameters:
# limit
results = bucket.search('username_s:*', rows=2)
self.assertEquals(2, len(results['docs']))
# sort
results = bucket.search('username_s:*', sort="age_i asc")
self.assertEquals(14, int(results['docs'][0]['age_i']))

@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
def test_yz_search_utf8(self):
bucket = self.client.bucket('yztest')
body = {"text_ja" : u"私はハイビスカスを食べるのが 大好き"}
bucket.new("shift_jis", body).store()
# TODO: fails due to lack of direct PB unicode support
# results = bucket.search(u"text_ja:大好き")
# self.assertEquals(1, len(results['docs']))
11 changes: 10 additions & 1 deletion riak/transports/feature_detect.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
1: LooseVersion("1.0.0"),
1.1: LooseVersion("1.1.0"),
1.2: LooseVersion("1.2.0"),
1.4: LooseVersion("1.4.0")
1.4: LooseVersion("1.4.0"),
2.0: LooseVersion("2.0.0")
}


Expand Down Expand Up @@ -65,6 +66,14 @@ def pb_indexes(self):
"""
return self.server_version >= versions[1.2]

def pb_search_admin(self):
"""
Whether search administration is supported over Protocol Buffers

:rtype: bool
"""
return self.server_version >= versions[2.0]

def pb_search(self):
"""
Whether search queries are supported over Protocol Buffers
Expand Down
16 changes: 15 additions & 1 deletion riak/transports/pbc/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _invert(d):

NORMAL_PROPS = ['n_val', 'allow_mult', 'last_write_wins', 'old_vclock',
'young_vclock', 'big_vclock', 'small_vclock', 'basic_quorum',
'notfound_ok', 'search', 'backend']
'notfound_ok', 'search', 'backend', 'yz_index']
COMMIT_HOOK_PROPS = ['precommit', 'postcommit']
MODFUN_PROPS = ['chash_keyfun', 'linkfun']
QUORUM_PROPS = ['r', 'pr', 'w', 'pw', 'dw', 'rw']
Expand Down Expand Up @@ -412,3 +412,17 @@ def _encode_index_req(self, bucket, index, startkey, endkey=None,
if continuation:
req.continuation = continuation
return req

def _decode_yz_index(self, index):
"""
Fills an RpbYokozunaIndex message with the appropriate data.

:param index: a yz index message
:type index: riak_pb.RpbYokozunaIndex
:rtype dict
"""
result = {}
result['name'] = index.name
if index.HasField('schema'):
result['schema'] = index.schema
return result
17 changes: 15 additions & 2 deletions riak/transports/pbc/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@
MSG_CODE_COUNTER_UPDATE_RESP = 51
MSG_CODE_COUNTER_GET_REQ = 52
MSG_CODE_COUNTER_GET_RESP = 53
MSG_CODE_YOKOZUNA_INDEX_GET_REQ = 54
MSG_CODE_YOKOZUNA_INDEX_GET_RESP = 55
MSG_CODE_YOKOZUNA_INDEX_PUT_REQ = 56
MSG_CODE_YOKOZUNA_INDEX_DELETE_REQ = 57
MSG_CODE_YOKOZUNA_SCHEMA_GET_REQ = 58
MSG_CODE_YOKOZUNA_SCHEMA_GET_RESP = 59
MSG_CODE_YOKOZUNA_SCHEMA_PUT_REQ = 60

# These responses don't include messages
EMPTY_RESPONSES = [
Expand Down Expand Up @@ -101,6 +108,12 @@
MSG_CODE_COUNTER_UPDATE_REQ: riak_pb.RpbCounterUpdateReq,
MSG_CODE_COUNTER_UPDATE_RESP: riak_pb.RpbCounterUpdateResp,
MSG_CODE_COUNTER_GET_REQ: riak_pb.RpbCounterGetReq,
MSG_CODE_COUNTER_GET_RESP: riak_pb.RpbCounterGetResp

MSG_CODE_COUNTER_GET_RESP: riak_pb.RpbCounterGetResp,
MSG_CODE_YOKOZUNA_INDEX_GET_REQ: riak_pb.RpbYokozunaIndexGetReq,
MSG_CODE_YOKOZUNA_INDEX_GET_RESP: riak_pb.RpbYokozunaIndexGetResp,
MSG_CODE_YOKOZUNA_INDEX_PUT_REQ: riak_pb.RpbYokozunaIndexPutReq,
MSG_CODE_YOKOZUNA_INDEX_DELETE_REQ: riak_pb.RpbYokozunaIndexDeleteReq,
MSG_CODE_YOKOZUNA_SCHEMA_GET_REQ: riak_pb.RpbYokozunaSchemaGetReq,
MSG_CODE_YOKOZUNA_SCHEMA_GET_RESP: riak_pb.RpbYokozunaSchemaGetResp,
MSG_CODE_YOKOZUNA_SCHEMA_PUT_REQ: riak_pb.RpbYokozunaSchemaPutReq
}
Loading