Permalink
Browse files

Merge pull request #89 from gstein/deprecate

Deprecate old transports
These transports are obsoleted by the new ConnectionManager.
  • Loading branch information...
1 parent 5ec95e7 commit bd74b800c2717601396322acd58b83cbadeed2ad @russelldb russelldb committed Nov 21, 2011
Showing with 23 additions and 312 deletions.
  1. +2 −77 riak/tests/test_all.py
  2. +11 −97 riak/transports/http.py
  3. +7 −137 riak/transports/pbc.py
  4. +3 −1 setup.py
View
@@ -19,8 +19,8 @@
import time
from riak import RiakClient
-from riak import RiakPbcTransport, RiakPbcCachedTransport
-from riak import RiakHttpTransport, RiakHttpPoolTransport, RiakHttpReuseTransport
+from riak import RiakPbcTransport
+from riak import RiakHttpTransport
from riak import RiakKeyFilter, key_filter
from riak.riak_index_entry import RiakIndexEntry
from riak.mapreduce import RiakLink
@@ -31,11 +31,6 @@
HAVE_PROTO = True
except ImportError:
HAVE_PROTO = False
-try:
- import urllib3
- HAVE_HTTP_POOL = True
-except ImportError:
- HAVE_HTTP_POOL = False
HOST = os.environ.get('RIAK_TEST_HOST', 'localhost')
HTTP_HOST = os.environ.get('RIAK_TEST_HTTP_HOST', HOST)
@@ -872,26 +867,6 @@ def test_uses_client_id_if_given(self):
self.assertEqual(zero_client_id, c.get_client_id()) #
-class RiakPbcCachedTransportCase(BaseTestCase, MapReduceAliasTestMixIn,
- unittest.TestCase):
- def setUp(self):
- if not HAVE_PROTO:
- self.skipTest('protobuf is unavailable')
- self.host = PB_HOST
- self.port = PB_PORT
- self.transport_class = RiakPbcCachedTransport
- super(RiakPbcCachedTransportCase, self).setUp()
-
- def test_uses_client_id_if_given(self):
- self.host = PB_HOST
- self.port = PB_PORT
- zero_client_id = "\0\0\0\0"
- c = RiakClient(PB_HOST, PB_PORT,
- transport_class = RiakPbcCachedTransport,
- client_id = zero_client_id)
- self.assertEqual(zero_client_id, c.get_client_id()) #
-
-
class RiakHttpTransportTestCase(BaseTestCase, MapReduceAliasTestMixIn, unittest.TestCase):
def setUp(self):
@@ -1045,56 +1020,6 @@ def test_delete_documents_from_search_by_query_and_id(self):
results = self.client.solr().search("searchbucket", "username:russell OR username:dizzy")
self.assertEquals(0, len(results["response"]["docs"]))
-class RiakHttpPoolTransportTestCase(BaseTestCase, MapReduceAliasTestMixIn, unittest.TestCase):
-
- def setUp(self):
- if not HAVE_HTTP_POOL:
- self.skipTest('urllib3 is unavailable')
- self.host = HTTP_HOST
- self.port = HTTP_PORT
- self.transport_class = RiakHttpPoolTransport
- super(RiakHttpPoolTransportTestCase, self).setUp()
-
- def test_no_returnbody(self):
- bucket = self.client.bucket("bucket")
- o = bucket.new("foo", "bar").store(return_body=False)
- self.assertEqual(o.vclock(), None)
-
- def test_generate_key(self):
- # Ensure that Riak generates a random key when
- # the key passed to bucket.new() is None.
- bucket = self.client.bucket('random_key_bucket')
- for key in bucket.get_keys():
- bucket.get(str(key)).delete()
- bucket.new(None, data={}).store()
- self.assertEqual(len(bucket.get_keys()), 1)
-
- def test_set_client_id(self):
- self.client.set_client_id("Client")
- self.assertEqual(self.client.get_transport().get_client_id(), "Client")
-
-class RiakHttpReuseTransportTestCase(BaseTestCase, MapReduceAliasTestMixIn, unittest.TestCase):
-
- def setUp(self):
- self.host = HTTP_HOST
- self.port = HTTP_PORT
- self.transport_class = RiakHttpReuseTransport
- super(RiakHttpReuseTransportTestCase, self).setUp()
-
- def test_no_returnbody(self):
- bucket = self.client.bucket("bucket")
- o = bucket.new("foo", "bar").store(return_body=False)
- self.assertEqual(o.vclock(), None)
-
- def test_generate_key(self):
- # Ensure that Riak generates a random key when
- # the key passed to bucket.new() is None.
- bucket = self.client.bucket('random_key_bucket')
- for key in bucket.get_keys():
- bucket.get(str(key)).delete()
- bucket.new(None, data={}).store()
- self.assertEqual(len(bucket.get_keys()), 1)
-
class RiakTestFilter(unittest.TestCase):
def test_simple(self):
View
@@ -36,6 +36,7 @@
from riak.riak_index_entry import RiakIndexEntry
from riak.multidict import MultiDict
from connection import HTTPConnectionManager
+import riak.util
MAX_LINK_HEADER_SIZE = 8192 - 8 # substract length of "Link: " header string and newline
@@ -519,111 +520,24 @@ def parse_http_headers(cls, headers) :
retVal[key] = value
return retVal
-import socket
class RiakHttpReuseTransport(RiakHttpTransport):
- """
- Reuse sockets
- """
-
+ "Deprecated transport."
def __init__(self, cm,
prefix='riak', mapred_prefix='mapred', client_id=None,
**unused_options):
- super(RiakHttpReuseTransport, self).__init__(cm,
- prefix,
- mapred_prefix,
- client_id)
- ### for backwards compat
- self._host, self._port = cm.hostports[0]
+ RiakHttpTransport.__init__(self, cm, prefix, mapred_prefix,
+ client_id, **unused_options)
+ riak.util.deprecated('please use RiakHttpTransport instead',
+ stacklevel=4)
- def __copy__(self):
- return RiakHttpReuseTransport(self._conns, self._prefix,
- self._mapred_prefix)
-
- def http_request(self, method, uri, headers=None, body=''):
- if headers is None:
- headers = {}
- # Run the request...
- client = None
- response = None
- try:
- client = httplib.HTTPConnection(self._host, self._port)
-
- #handle the connection myself, try to reuse sockets
- client.auto_open = 0
- client.connect()
- client.sock.setsockopt(
- socket.SOL_SOCKET, socket.SO_REUSEADDR,
- client.sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1)
-
- client.request(method, uri, body, headers)
- response = client.getresponse()
-
- # Get the response headers...
- response_headers = {'http_code': response.status}
- for (key, value) in response.getheaders():
- response_headers[key.lower()] = value
-
- # Get the body...
- response_body = response.read()
- response.close()
-
- #close, this does not make any difference
- client.close()
-
- return response_headers, response_body
- except:
- if client is not None: client.close()
- if response is not None: response.close()
- raise
-
-try:
- import urllib3
-except ImportError:
- urllib3 = None
class RiakHttpPoolTransport(RiakHttpTransport):
- """
- Use HTTP pool
- """
-
- http_pool = None
-
+ "Deprecated transport."
def __init__(self, cm,
prefix='riak', mapred_prefix='mapred', client_id=None,
**unused_options):
- if urllib3 is None:
- raise RiakError("this transport is not available (no urllib3)")
-
- super(RiakHttpPoolTransport, self).__init__(cm,
- prefix,
- mapred_prefix,
- client_id)
- ### for backwards compat
- self._host, self._port = cm.hostports[0]
-
- def __copy__(self):
- return RiakHttpPoolTransport(self._conns, self._prefix,
- self._mapred_prefix)
-
- def http_request(self, method, uri, headers={}, body=''):
- if headers is None:
- headers = {}
- try:
- ### it seems wrong to put the pool into a *class* variable,
- ### but this code is supporting backwards-compat where the
- ### use of a class variable was the design.
- if self.__class__.http_pool is None:
- self.__class__.http_pool = urllib3.connection_from_url('http://%s:%d' % (self._host, self._port), maxsize=10)
-
- response = self.http_pool.urlopen(method, uri, body, headers)
-
- response_headers = {'http_code': response.status}
- for key, value in response.getheaders().iteritems():
- response_headers[key.lower()] = value
-
- response_body = response.data
-
- return response_headers, response_body
- except:
- raise
+ RiakHttpTransport.__init__(self, cm, prefix, mapred_prefix,
+ client_id, **unused_options)
+ riak.util.deprecated('please use RiakHttpTransport instead',
+ stacklevel=4)
View
@@ -32,6 +32,8 @@
from riak import RiakError
from riak.riak_index_entry import RiakIndexEntry
from riak.transports import connection
+from connection import SocketConnectionManager
+import riak.util
try:
import riakclient_pb2
@@ -546,143 +548,11 @@ def pbify_content(self, metadata, data, rpb_content) :
pb_link.tag = link.get_tag()
rpb_content.value = data
-from Queue import Empty, Full, Queue
-import contextlib
-class RiakPbcCachedTransport(RiakTransport):
- """Threadsafe pool of PBC connections, based on urllib3's pool [aka Queue]"""
-
- # We're using the new RiakTransport API
- api = 2
-
- # The ConnectionManager class that this transport prefers.
- default_cm = connection.cm_using(SocketWithId)
-
+class RiakPbcCachedTransport(RiakPbcTransport):
+ "Deprecated transport."
def __init__(self, cm,
client_id=None, maxsize=0, block=False, timeout=None,
**unused_options):
- if riakclient_pb2 is None:
- raise RiakError("this transport is not available (no protobuf)")
-
- ### backwards compat. we don't use the ConnectionManager (yet).
- self._cm = cm
-
- self.client_id = client_id
- self.block = block
- self.timeout = timeout
-
- self.pool = Queue(maxsize)
- # Fill the queue up so that doing get() on it will block properly (check Queue#get)
- [self.pool.put(None) for _ in xrange(maxsize)]
-
- def _new_connection(self):
- """New PBC connection"""
- return RiakPbcTransport(self._cm, self.client_id)
-
- def _get_connection(self):
- connection = None
- try:
- connection = self.pool.get(block=self.block, timeout=self.timeout)
- except Empty:
- pass
- return connection or self._new_connection()
-
- def _put_connection(self, connection):
- try:
- self.pool.put(connection, block=False)
- except Full:
- pass
-
- @contextlib.contextmanager
- def _get_connection_from_pool(self):
- """checkout conn, try operation, put conn back in pool"""
- connection = self._get_connection()
- try:
- yield connection
- finally:
- self._put_connection(connection)
-
- def ping(self):
- """
- Ping the remote server
- @return boolean
- """
- with self._get_connection_from_pool() as connection:
- return connection.ping()
-
- def get(self, robj, r = None, vtag = None):
- """
- Serialize get request and deserialize response
- @return (vclock=None, [(metadata, value)]=None)
- """
- with self._get_connection_from_pool() as connection:
- return connection.get(robj, r, vtag)
-
- def put(self, robj, w = None, dw = None, return_body = True):
- """
- Serialize put request and deserialize response - if 'content'
- is true, retrieve the updated metadata/content
- @return (vclock=None, [(metadata, value)]=None)
- """
- with self._get_connection_from_pool() as connection:
- return connection.put(robj, w, dw, return_body)
-
- def put_new(self, robj, w=None, dw=None, return_meta=True):
- """Put a new object into the Riak store, returning its (new) key.
-
- If return_meta is False, then the vlock and metadata return values
- will be None.
-
- @return (key, vclock, metadata)
- """
- with self._get_connection_from_pool() as connection:
- return connection.put_new(robj, w, dw, return_meta)
-
- def delete(self, robj, rw = None):
- """
- Serialize delete request and deserialize response
- @return true
- """
- with self._get_connection_from_pool() as connection:
- return connection.delete(robj, rw)
-
- def get_buckets(self):
- """
- Serialize bucket listing request and deserialize response
- """
- with self._get_connection_from_pool() as connection:
- return connection.get_buckets()
-
- def get_bucket_props(self, bucket) :
- """
- Serialize get bucket property request and deserialize response
- @return dict()
- """
- with self._get_connection_from_pool() as connection:
- return connection.get_bucket_props(bucket)
-
- def set_bucket_props(self, bucket, props) :
- """
- Serialize set bucket property request and deserialize response
- bucket = bucket object
- props = dictionary of properties
- @return boolean
- """
- with self._get_connection_from_pool() as connection:
- return connection.set_bucket_props(bucket, props)
-
- def mapred(self, inputs, query, timeout = None) :
- """
- Serialize map/reduce request
- """
- with self._get_connection_from_pool() as connection:
- return connection.mapred(inputs, query, timeout)
-
- def set_client_id(self, client_id):
- """Mmm, this can turn ugly if you use different id for different objects in the pool"""
- with self._get_connection_from_pool() as connection:
- return connection.set_client_id(client_id)
-
- def get_client_id(self):
- """see set_client_id notes, you can do wrong with this"""
- with self._get_connection_from_pool() as connection:
- return connection.get_client_id()
+ RiakPbcTransport.__init__(self, cm, client_id, **unused_options)
+ riak.util.deprecated('please use RiakPbcTransport instead',
+ stacklevel=4)
Oops, something went wrong.

0 comments on commit bd74b80

Please sign in to comment.