Skip to content

Commit

Permalink
Add optional timeout to YZ create index command
Browse files Browse the repository at this point in the history
  • Loading branch information
Brett Hazen committed Oct 13, 2015
1 parent 63a205b commit 30f8ae0
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 6 deletions.
7 changes: 5 additions & 2 deletions riak/client/operations.py
Expand Up @@ -685,7 +685,8 @@ def stream_mapred(self, inputs, query, timeout):
stream.close()

@retryable
def create_search_index(self, transport, index, schema=None, n_val=None):
def create_search_index(self, transport, index, schema=None, n_val=None,
timeout=None):
"""
create_search_index(index, schema=None, n_val=None)
Expand All @@ -698,8 +699,10 @@ def create_search_index(self, transport, index, schema=None, n_val=None):
:type schema: string, None
:param n_val: this indexes N value
:type n_val: integer, None
:param timeout: optional timeout (in ms)
:type timeout: integer, None
"""
return transport.create_search_index(index, schema, n_val)
return transport.create_search_index(index, schema, n_val, timeout)

@retryable
def get_search_index(self, transport, index):
Expand Down
2 changes: 1 addition & 1 deletion riak/tests/test_all.py
Expand Up @@ -87,7 +87,7 @@ def setUpModule():
'index': 'mrbucket'}

for yz in (testrun_yz, testrun_yz_index, testrun_yz_mr):
c.create_search_index(yz['index'])
c.create_search_index(yz['index'], timeout=30000)
if yz['btype'] is not None:
t = c.bucket_type(yz['btype'])
b = t.bucket(yz['bucket'])
Expand Down
7 changes: 6 additions & 1 deletion riak/transports/http/transport.py
Expand Up @@ -444,7 +444,8 @@ def stream_index(self, bucket, index, startkey, endkey=None,
else:
raise RiakError('Error streaming secondary index.')

def create_search_index(self, index, schema=None, n_val=None):
def create_search_index(self, index, schema=None, n_val=None,
timeout=None):
"""
Create a Solr search index for Yokozuna.
Expand All @@ -454,6 +455,8 @@ def create_search_index(self, index, schema=None, n_val=None):
:type schema: string
:param n_val: N value of the write
:type n_val: int
:param timeout: optional timeout (in ms)
:type timeout: integer, None
:rtype boolean
"""
Expand All @@ -468,6 +471,8 @@ def create_search_index(self, index, schema=None, n_val=None):
content_dict['schema'] = schema
if n_val:
content_dict['n_val'] = n_val
if timeout:
content_dict['timeout'] = timeout
content = json.dumps(content_dict)

# Run the request...
Expand Down
5 changes: 4 additions & 1 deletion riak/transports/pbc/transport.py
Expand Up @@ -487,7 +487,8 @@ def stream_index(self, bucket, index, startkey, endkey=None,

return RiakPbcIndexStream(self, index, return_terms)

def create_search_index(self, index, schema=None, n_val=None):
def create_search_index(self, index, schema=None, n_val=None,
timeout=None):
if not self.pb_search_admin():
raise NotImplementedError("Search 2.0 administration is not "
"supported for this version")
Expand All @@ -498,6 +499,8 @@ def create_search_index(self, index, schema=None, n_val=None):
if n_val:
idx.n_val = n_val
req = riak_pb.RpbYokozunaIndexPutReq(index=idx)
if timeout is not None:
req.timeout = timeout

self._request(MSG_CODE_YOKOZUNA_INDEX_PUT_REQ, req,
MSG_CODE_PUT_RESP)
Expand Down
3 changes: 2 additions & 1 deletion riak/transports/transport.py
Expand Up @@ -172,7 +172,8 @@ def get_client_id(self):
"""
raise NotImplementedError

def create_search_index(self, index, schema=None, n_val=None):
def create_search_index(self, index, schema=None, n_val=None,
timeout=None):
"""
Creates a yokozuna search index.
"""
Expand Down

1 comment on commit 30f8ae0

@hazen
Copy link

@hazen hazen commented on 30f8ae0 Oct 16, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@borshop: retry

Please sign in to comment.