Skip to content

Key filters #6

Merged
merged 3 commits into from May 24, 2011
View
8 riak/client.py
@@ -213,6 +213,14 @@ def set_decoder(self, content_type, decoder):
self._decoders[content_type] = decoder
return self
+ def get_buckets(self):
+ """
+ Get the list of buckets.
+ NOTE: Do not use this in production, as it requires traversing through
+ all keys stored in a cluster.
+ """
+ return self._transport.get_buckets()
+
def bucket(self, name):
"""
Get the bucket by the specified name. Since buckets always exist,
View
33 riak/mapreduce.py
@@ -19,6 +19,7 @@
"""
import urllib
from riak_object import RiakObject
+from bucket import RiakBucket
class RiakMapReduce(object):
"""
@@ -34,6 +35,7 @@ def __init__(self, client):
self._client = client
self._phases = []
self._inputs = []
+ self._key_filters = []
self._input_mode = None
def add(self, arg1, arg2=None, arg3=None):
@@ -72,19 +74,33 @@ def add_bucket(self, bucket) :
self._inputs = bucket
return self
+ def add_key_filters(self, key_filters) :
+ if self._input_mode == 'search':
+ raise Exception('Key filters are not supported in search query.')
+
+ self._key_filters.extend(key_filters)
+ return self
+
+ def add_key_filter(self, *args) :
+ if self._input_mode == 'search':
+ raise Exception('Key filters are not supported in search query.')
+
+ self._key_filters.append(args)
+ return self
+
def search(self, bucket, query):
"""
- Begin a map/reduce operation using a Search. This command will
+ Begin a map/reduce operation using a Search. This command will
return an error unless executed against a Riak Search cluster.
@param bucket - The bucket over which to perform the search.
@param query - The search query.
"""
self._input_mode = 'search'
- self._inputs = {'module':'riak_search',
+ self._inputs = {'module':'riak_search',
'function':'mapred_search',
'arg':[bucket, query]}
return self
-
+
def link(self, bucket='_', tag='_', keep=False):
"""
@@ -180,6 +196,17 @@ def run(self, timeout=None):
if phase._keep: keep_flag = True
query.append(phase.to_array())
+ if (len(self._key_filters) > 0):
+ bucket_name = None
+ if (type(self._inputs) == str):
+ bucket_name = self._inputs
+ elif (type(self._inputs) == RiakBucket):
+ bucket_name = self._inputs.get_name()
+
+ if (bucket_name is not None):
+ self._inputs = {'bucket': bucket_name,
+ 'key_filters': self._key_filters}
+
t = self._client.get_transport()
result = t.mapred(self._inputs, query, timeout)
View
15 riak/tests/test_all.py
@@ -298,6 +298,21 @@ def test_javascript_arg_map_reduce(self):
.run()
self.assertEqual(result, [10])
+ def test_key_filters(self):
+ bucket = self.client.bucket("kftest")
+ bucket.new("basho-20101215", 1).store()
+ bucket.new("google-20110103", 2).store()
+ bucket.new("yahoo-20090613", 3).store()
+
+ result = self.client \
+ .add("kftest") \
+ .add_key_filters([["tokenize", "-", 2]]) \
+ .add_key_filter("ends_with", "0613") \
+ .map("function (v, keydata) { return [v.key]; }") \
+ .run()
+
+ self.assertEqual(result, ["yahoo-20090613"])
+
def test_erlang_map_reduce(self):
# Create the object...
bucket = self.client.bucket("bucket")
View
24 riak/transports/http.py
@@ -63,7 +63,7 @@ def __init__(self, host='127.0.0.1', port=8098, prefix='riak',
self._client_id = self.make_random_client_id()
def __copy__(self):
- return RiakHttpTransport(self._host, self._port, self._prefix,
+ return RiakHttpTransport(self._host, self._port, self._prefix,
self._mapred_prefix)
"""
@@ -134,14 +134,25 @@ def get_keys(self, bucket):
host, port, url = self.build_rest_path(bucket, None, None, params)
response = self.http_request('GET', host, port, url)
- headers = response[0]
- encoded_props = response[1]
+ headers, encoded_props = response[0:2]
if (headers['http_code'] == 200):
props = json.loads(encoded_props)
return props['keys']
else:
raise Exception('Error getting bucket properties.')
-
+
+ def get_buckets(self):
+ params = {'buckets': 'true'}
+ host, port, url = self.build_rest_path(None, None, None, params)
+ response = self.http_request('GET', host, port, url)
+
+ headers, encoded_props = response[0:2]
+ if (headers['http_code'] == 200):
+ props = json.loads(encoded_props)
+ return props['buckets']
+ else:
+ raise Exception('Error getting buckets.')
+
def get_bucket_props(self, bucket, keys=False):
# Run the request...
params = {'props' : 'True', 'keys' : 'False'}
@@ -311,7 +322,10 @@ def build_rest_path(self, bucket, key=None, spec=None, params=None) :
# Build 'http://hostname:port/prefix/bucket'
path = ''
path += '/' + self._prefix
- path += '/' + urllib.quote_plus(bucket._name)
+
+ # Add '.../bucket'
+ if (bucket is not None):
+ path += '/' + urllib.quote_plus(bucket._name)
# Add '.../key'
if (key is not None):
View
21 riak/transports/pbc.py
@@ -74,7 +74,7 @@ class RiakPbcTransport(RiakTransport):
'default' : RIAKC_RW_DEFAULT,
'all' : RIAKC_RW_ALL,
'quorum' : RIAKC_RW_QUORUM,
- 'one' : RIAKC_RW_ONE
+ 'one' : RIAKC_RW_ONE
}
def __init__(self, host='127.0.0.1', port=8087, client_id=None):
"""
@@ -128,7 +128,7 @@ def set_client_id(self, client_id):
"""
req = riakclient_pb2.RpbSetClientIdReq()
req.client_id = client_id
-
+
self.maybe_connect()
self.send_msg(MSG_CODE_SET_CLIENT_ID_REQ, req)
msg_code, resp = self.recv_msg()
@@ -170,7 +170,7 @@ def put(self, robj, w = None, dw = None, return_body = True):
Serialize get request and deserialize response
"""
bucket = robj.get_bucket()
-
+
req = riakclient_pb2.RpbPutReq()
req.w = self.translate_rw_val(w)
req.dw = self.translate_rw_val(dw)
@@ -214,7 +214,7 @@ def delete(self, robj, rw = None):
if msg_code != MSG_CODE_DEL_RESP:
raise RiakError("unexpected protocol buffer message code: ", msg_code)
return self
-
+
def get_keys(self, bucket):
"""
Lists all keys within a bucket.
@@ -238,6 +238,19 @@ def get_keys(self, bucket):
return keys
+ def get_buckets(self):
+ """
+ Serialize bucket listing request and deserialize response
+ """
+ req = riakclient_pb2.RpbListBucketsReq()
+
+ self.maybe_connect()
+ self.send_msg(MSG_CODE_LIST_KEYS_REQ, req)
+ msg_code, resp = self.recv_msg()
+ if msg_code != MSG_CODE_LIST_BUCKETS_RESP:
+ raise RiakError("unexpected protocol buffer message code: ", msg_code)
+ return resp.buckets
+
def get_bucket_props(self, bucket):
"""
Serialize bucket property request and deserialize response
View
9 riak/transports/transport.py
@@ -17,7 +17,7 @@
specific language governing permissions and limitations
under the License.
"""
-from riak import RiakError
+from riak import RiakError
import base64
import random
import threading
@@ -76,6 +76,13 @@ def delete(self, robj, rw = None):
"""
raise RiakError("not implemented")
+ def get_buckets(self) :
+ """
+ Serialize get buckets request and deserialize response
+ @return dict()
+ """
+ raise RiakError("not implemented")
+
def get_bucket_props(self, bucket) :
"""
Serialize get bucket property request and deserialize response
Something went wrong with that request. Please try again.