Skip to content
This repository

Adding support for Replica Sets #16

Closed
wants to merge 5 commits into from

7 participants

Shawn MacIntyre Jehiah Czebotar Phil Whelan Daniel Kador Matvey Adzhigirey Michael Karpitsky A. Jesse Jiryu Davis
Shawn MacIntyre
  • Supporting for multiple hosts and mongo uri's (using uri_parse)
    • Client('foo', host='mongodb://localhost:27017,localhost:27018/test')
    • Client('foo', host=['localhost:27017', 'localhost:27018'], dbname='test') and keeping the old:
    • Client('foo', host='localhost', port=27017, dbname='test')
  • Increasing required pymongo to 2.0 for uri_parse
  • Creating a new Connection() is now asynchronous
    • must be passed a callback which is called with the connected connection
  • Updating Connection's to replSets to behave the same way as pymongo
  • Adding Pool.command to run db commands
  • Making .gitignore more vim friendly
added some commits September 19, 2011
Adding support for Replica Sets.
* Supporting for multiple hosts and mongo uri's (using uri_parse)
  - Client('foo', host='mongodb://localhost:27017,localhost:27018/test')
  - Client('foo', host=['localhost:27017', 'localhost:27018'], dbname='test')
  and keeping the old:
  - Client('foo', host='localhost', port=27017, dbname='test')
* Increasing required pymongo to 2.0 for uri_parse
* Creating a new Connection() is now asynchronous
  - must be passed a callback which is called with the connected connection
* Updating Connection's to replSets to behave the same way as pymongo
* Adding Pool.command to run db commands
* Making .gitignore more vim friendly
7c2af31
Fixing error reporting when no node is found 3075dcf
Fixing overly agressive exception catching in callbacks 33c6cc4
Shawn MacIntyre

Fixed bug where we were catching all exceptions happening in callbacks passed to asyncmongo cursors.

Jehiah Czebotar
Owner

thanks for your work on Replica Set support; I've just merged in support for authentication, which similar to replica set support requires a round trip to the server before a connection is ready to use.

I think there is a lot of good code in what you've added so far, but I think an ideal implementation would be to have the replica support handled transparently at connect time so that is' simple to handle reconnects and keeping a pool of connections.

Phil Whelan

This looks like just what I need. Thanks for implementing it!

and others added some commits December 05, 2011
Phil Whelan Fix bug with connection being returned to the pool prematurely when l…
…ooking for master node. Would cause ProgrammingError('connection already in use'), although this exception is not being passed up and is appearing as 'connection closed'
7892c29
Merge pull request #1 from philwhln/master
fixed bug with "Adding support for Replica Sets"
1b0ffb1
Daniel Kador

Any idea when this might get pulled into head? Would love to take advantage of it very soon.

Shawn MacIntyre

@dkador This needs to be cleaned up before it gets pulled. I'm currently swapped, I'll look at cleaning it up after GDC.

Daniel Kador
Matvey Adzhigirey

Hi,

I would really like to test out this feature. Is there an estimate of when replica sets will be fully functional with asyncmongo?

Thanks!

Phil Whelan
Matvey Adzhigirey

For me it didn't work when I tried it a few days. User authentication was not working. When I looked at the code, I saw some notes saying that authentication is not implemented yet.....

MBA

Phil Whelan

Sorry, I'm not using user authentication, so that's probably still true.

Matvey Adzhigirey

So is the communication between your replica sets not authenticated? Or perhaps there is a way to get node-to-node authentication to work without turning on the user authentication; just using the keyfiles? thanks!

Phil Whelan
Matvey Adzhigirey

I see. Thanks. I guess I'll wait until someone will get to finally fixing authentication... I really need it to work, as my servers are in different facilities.

Phil Whelan

I've found this does not work with the latest tornado facebook/tornado/master

_NodeFound exceptions are not caught

Michael Karpitsky

Yes, this not work for me too with last version tornado

Phil Whelan

Using "git bisect" this seems to have started failing with this tornado commit...

facebook/tornado@2db0ace

facebook/tornado@2db0ace is the first bad commit
commit facebook/tornado@2db0ace
Author: Ben Darnell ben@bendarnell.com
Date: Sun Feb 19 19:50:01 2012 -0800

Further refactoring of duplicated IOStream logic
Matvey Adzhigirey

Hi everyone,

Just wondering whether there were any updates on this? Thanks!

MBA

Matvey Adzhigirey

@philwhln Thanks! Looks like Motor might end up a better solution in the long term than asyncmongo. Unfortunately my project is nearing release, so I can't change my code to use it. I'll definetely check it out for my next projects.

Any idea what is planned for Motor in the long term? Looks like it will be part of the official pymongo eventually, right?

MBA

Phil Whelan
A. Jesse Jiryu Davis

Hi, yes, the plan is for Motor to join the PyMongo official distro in January. Meanwhile, here's the info on it:

http://emptysquare.net/motor/

Jehiah Czebotar
Owner

closing based on work in #58

Jehiah Czebotar jehiah closed this April 03, 2013
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 5 unique commits by 2 authors.

Sep 19, 2011
Adding support for Replica Sets.
* Supporting for multiple hosts and mongo uri's (using uri_parse)
  - Client('foo', host='mongodb://localhost:27017,localhost:27018/test')
  - Client('foo', host=['localhost:27017', 'localhost:27018'], dbname='test')
  and keeping the old:
  - Client('foo', host='localhost', port=27017, dbname='test')
* Increasing required pymongo to 2.0 for uri_parse
* Creating a new Connection() is now asynchronous
  - must be passed a callback which is called with the connected connection
* Updating Connection's to replSets to behave the same way as pymongo
* Adding Pool.command to run db commands
* Making .gitignore more vim friendly
7c2af31
Sep 29, 2011
Fixing error reporting when no node is found 3075dcf
Sep 30, 2011
Fixing overly agressive exception catching in callbacks 33c6cc4
Dec 05, 2011
Phil Whelan Fix bug with connection being returned to the pool prematurely when l…
…ooking for master node. Would cause ProgrammingError('connection already in use'), although this exception is not being passed up and is appearing as 'connection closed'
7892c29
Dec 06, 2011
Merge pull request #1 from philwhln/master
fixed bug with "Adding support for Replica Sets"
1b0ffb1
This page is out of date. Refresh to see the latest.
4  .gitignore
... ...
@@ -1,3 +1,7 @@
1 1
 *.pyc
2 2
 build
3 3
 dist
  4
+*~
  5
+*.swp
  6
+tags
  7
+PYSMELLTAGS
2  README.md
Source Rendered
@@ -49,7 +49,7 @@ Requirements
49 49
 ------------
50 50
 The following two python libraries are required
51 51
 
52  
-* [pymongo](http://github.com/mongodb/mongo-python-driver) version 1.9+ for bson library
  52
+* [pymongo](http://github.com/mongodb/mongo-python-driver) version 2.0+ for bson library and uri_parse
53 53
 * [tornado](http://github.com/facebook/tornado)
54 54
 
55 55
 Issues
209  asyncmongo/connection.py
@@ -27,7 +27,12 @@
27 27
 import helpers
28 28
 import struct
29 29
 import logging
  30
+import functools
  31
+import message
  32
+import contextlib
30 33
 
  34
+from bson.son import SON
  35
+from tornado.stack_context import StackContext
31 36
 from errors import ProgrammingError, IntegrityError, InterfaceError
32 37
 
33 38
 class Connection(object):
@@ -35,25 +40,48 @@ class Connection(object):
35 40
     :Parameters:
36 41
       - `host`: hostname or ip of mongo host
37 42
       - `port`: port to connect to
  43
+      - `create_callback`: callback to be called with the connected self
38 44
       - `autoreconnect` (optional): auto reconnect on interface errors
39 45
       
40 46
     """
41  
-    def __init__(self, host, port, autoreconnect=True, pool=None):
42  
-        assert isinstance(host, (str, unicode))
43  
-        assert isinstance(port, int)
  47
+    def __init__(self,
  48
+                 nodes,
  49
+                 slave_okay=True,
  50
+                 autoreconnect=True,
  51
+                 create_callback=None,
  52
+                 pool=None):
  53
+        assert isinstance(nodes, set)
  54
+        assert isinstance(slave_okay, bool)
44 55
         assert isinstance(autoreconnect, bool)
  56
+        assert callable(create_callback)
45 57
         assert pool
46  
-        self.__host = host
47  
-        self.__port = port
  58
+        self.__nodes = nodes
  59
+        self.__host = None
  60
+        self.__port = None
48 61
         self.__stream = None
49 62
         self.__callback = None
50 63
         self.__alive = False
51  
-        self.__connect()
  64
+        self.__slave_okay = slave_okay
52 65
         self.__autoreconnect = autoreconnect
53 66
         self.__pool = pool
  67
+        self.__repl = None
54 68
         self.usage_count = 0
  69
+        self.__connect(callback=create_callback)
  70
+
  71
+    def __connect(self, callback):
  72
+        """Begin the connection process, sets up connection state
  73
+        and associated stack context.
  74
+
  75
+        :Parameters:
  76
+         - `callback`: called when connected
  77
+        """
  78
+        connection_state = _ConnectionState(self.__nodes)
  79
+        connection_manager = functools.partial(self.__connection_manager,
  80
+                state=connection_state, callback=callback)
  81
+        with StackContext(connection_manager):
  82
+            self.__find_node(connection_state)
55 83
     
56  
-    def __connect(self):
  84
+    def __socket_connect(self):
57 85
         try:
58 86
             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
59 87
             s.connect((self.__host, self.__port))
@@ -62,13 +90,122 @@ def __connect(self):
62 90
             self.__alive = True
63 91
         except socket.error, error:
64 92
             raise InterfaceError(error)
65  
-    
  93
+
  94
+    def __try_node(self, node):
  95
+        """Try to connect to this node and see if it works
  96
+        for our connection type.
  97
+
  98
+        :Parameters:
  99
+         - `node`: The (host, port) pair to try.
  100
+
  101
+        Based on pymongo.Connection.__try_node
  102
+        """
  103
+        if self.__alive:
  104
+            self.close()
  105
+        self.__host, self.__port = node
  106
+        self.__socket_connect()
  107
+
  108
+        command = message.query(
  109
+                options=0,
  110
+                collection_name='admin.$cmd',
  111
+                num_to_skip=0,
  112
+                num_to_return=-1,
  113
+                query=SON([('ismaster', 1)]))
  114
+        self.send_message(command,
  115
+            callback=functools.partial(self.__handle_ismaster, node=node), checking_master=True)
  116
+
  117
+    def __handle_ismaster(self, result, error=None, node=None):
  118
+        if error:
  119
+            raise error
  120
+
  121
+        if len(result['data']) == 1:
  122
+            response = result['data'][0]
  123
+        else:
  124
+            raise InterfaceError('Invalid response returned: %s' %
  125
+                    result['data'])
  126
+
  127
+        # Replica Set?
  128
+        if len(self.__nodes) > 1 or self.__repl:
  129
+            # Check that this host is part of the given replica set.
  130
+            if self.__repl:
  131
+                set_name = response.get('setName')
  132
+                # The 'setName' field isn't returned by mongod before 1.6.2
  133
+                # so we can't assume that if it's missing this host isn't in
  134
+                # the specified set.
  135
+                if set_name and set_name != self.__repl:
  136
+                    raise InterfaceError("%s:%d is not a member of "
  137
+                            "replica set %s" % (node[0], node[1], self.__repl))
  138
+            if "hosts" in response:
  139
+                self.__nodes.update([_partition_node(h)
  140
+                                     for h in response["hosts"]])
  141
+            if response["ismaster"]:
  142
+                raise _NodeFound(node)
  143
+            elif "primary" in response:
  144
+                candidate = _partition_node(response["primary"])
  145
+                return self.__try_node(candidate)
  146
+
  147
+            # Explain why we aren't using this connection.
  148
+            raise InterfaceError('%s:%d is not primary' % node)
  149
+
  150
+        # Direct connection
  151
+        else:
  152
+            if response.get("arbiterOnly", False):
  153
+                raise ProgrammingError("%s:%d is an arbiter" % node)
  154
+            raise _NodeFound(node)
  155
+
  156
+    def __find_node(self, state):
  157
+        """Find a host, port pair suitable for our connection type.
  158
+
  159
+        If only one host was supplied to __init__ see if we can connect
  160
+        to it. Don't check if the host is a master/primary so we can make
  161
+        a direct connection to read from a slave.
  162
+
  163
+        If more than one host was supplied treat them as a seed list for
  164
+        connecting to a replica set. Try to find the primary and fail if
  165
+        we can't. Possibly updates any replSet information on success.
  166
+
  167
+        If the list of hosts is not a seed list for a replica set the
  168
+        behavior is still the same. We iterate through the list trying
  169
+        to find a host we can send write operations to.
  170
+
  171
+        In either case a connection to an arbiter will never succeed.
  172
+
  173
+        Based on pymongo.Connection.__find_node
  174
+        """
  175
+        try:
  176
+            node = state.remaining.pop()
  177
+        except KeyError:
  178
+            if state.tested_all_seeds:
  179
+                # We have failed to find a node...
  180
+                raise _NoNodeFound(', '.join(state.errors))
  181
+            else:
  182
+                # No luck with seeds; let's see if we discovered a new node
  183
+                state.tested_all_seeds = True
  184
+                state.remaining = self.__nodes.copy() - state.seeds
  185
+                self.__find_node(state)
  186
+        else:
  187
+            self.__try_node(node)
  188
+
  189
+    @contextlib.contextmanager
  190
+    def __connection_manager(self, state, callback):
  191
+        try:
  192
+            yield
  193
+        except _NodeFound:
  194
+            callback(self)
  195
+        except _NoNodeFound, why:
  196
+            callback(None, error=why)
  197
+        except InterfaceError, why:
  198
+            state.errors.append(str(why))
  199
+            self.__find_node(state)
  200
+
66 201
     def _socket_close(self):
67 202
         """cleanup after the socket is closed by the other end"""
68 203
         if self.__callback:
69 204
             self.__callback(None, InterfaceError('connection closed'))
70 205
         self.__callback = None
71 206
         self.__alive = False
  207
+        self.__host = None
  208
+        self.__port = None
72 209
         self.__pool.cache(self)
73 210
     
74 211
     def _close(self):
@@ -77,6 +214,8 @@ def _close(self):
77 214
             self.__callback(None, InterfaceError('connection closed'))
78 215
         self.__callback = None
79 216
         self.__alive = False
  217
+        self.__host = None
  218
+        self.__port = None
80 219
         self.__stream._close_callback = None
81 220
         self.__stream.close()
82 221
     
@@ -85,7 +224,7 @@ def close(self):
85 224
         self._close()
86 225
         self.__pool.cache(self)
87 226
     
88  
-    def send_message(self, message, callback):
  227
+    def send_message(self, message, callback, checking_master=False):
89 228
         """ send a message over the wire; callback=None indicates a safe=False call where we write and forget about it"""
90 229
         
91 230
         self.usage_count +=1
@@ -95,7 +234,10 @@ def send_message(self, message, callback):
95 234
         
96 235
         if not self.__alive:
97 236
             if self.__autoreconnect:
98  
-                self.__connect()
  237
+                logging.warn('connection lost, reconnecting')
  238
+                self.__connect(functools.partial(Connection.send_message,
  239
+                    message=message, callback=callback))
  240
+                return
99 241
             else:
100 242
                 raise InterfaceError('connection invalid. autoreconnect=False')
101 243
         
@@ -106,7 +248,7 @@ def send_message(self, message, callback):
106 248
         try:
107 249
             self.__stream.write(data)
108 250
             if callback:
109  
-                self.__stream.read_bytes(16, callback=self._parse_header)
  251
+                self.__stream.read_bytes(16, callback=functools.partial(self._parse_header, checking_master))
110 252
             else:
111 253
                 self.__request_id = None
112 254
                 self.__pool.cache(self)
@@ -116,7 +258,7 @@ def send_message(self, message, callback):
116 258
             raise
117 259
         # return self.__request_id 
118 260
     
119  
-    def _parse_header(self, header):
  261
+    def _parse_header(self, checking_master, header):
120 262
         # return self.__receive_data_on_socket(length - 16, sock)
121 263
         # logging.info('got data %r' % header)
122 264
         length = int(struct.unpack("<i", header[:4])[0])
@@ -129,18 +271,20 @@ def _parse_header(self, header):
129 271
         # logging.info('%s' % length)
130 272
         # logging.info('waiting for another %d bytes' % length - 16)
131 273
         try:
132  
-            self.__stream.read_bytes(length - 16, callback=self._parse_response)
  274
+            self.__stream.read_bytes(length - 16, callback=functools.partial(self._parse_response, checking_master))
133 275
         except IOError, e:
134 276
             self.__alive = False
135 277
             raise
136 278
     
137  
-    def _parse_response(self, response):
  279
+    def _parse_response(self, checking_master, response):
138 280
         # logging.info('got data %r' % response)
139 281
         callback = self.__callback
140 282
         request_id = self.__request_id
141 283
         self.__request_id = None
142 284
         self.__callback = None
143  
-        self.__pool.cache(self)
  285
+
  286
+        if not checking_master:
  287
+            self.__pool.cache(self)
144 288
         
145 289
         try:
146 290
             response = helpers._unpack_response(response, request_id) # TODO: pass tz_awar
@@ -156,3 +300,38 @@ def _parse_response(self, response):
156 300
         # logging.info('response: %s' % response)
157 301
         callback(response)
158 302
 
  303
+
  304
+class _ConnectionState(object):
  305
+    def __init__(self, nodes):
  306
+        self.errors = []
  307
+        self.node_found = False
  308
+        self.tested_all_seeds = False
  309
+        self.nodes = nodes
  310
+        self.seeds = nodes.copy()
  311
+        self.remaining = nodes.copy()
  312
+
  313
+
  314
+class _NodeFound(StandardError):
  315
+    def __init__(self, node):
  316
+        super(_NodeFound, self).__init__('Node %s:%d' % node)
  317
+        self.node = node
  318
+
  319
+
  320
+class _NoNodeFound(StandardError):
  321
+    pass
  322
+
  323
+
  324
+def _partition_node(node):
  325
+    """Split a host:port string returned from mongod/s into
  326
+    a (host, int(port)) pair needed for socket.connect().
  327
+
  328
+    From pymongo.connection._partition_node
  329
+    """
  330
+    host = node
  331
+    port = 27017
  332
+    idx = node.rfind(':')
  333
+    if idx != -1:
  334
+        host, port = node[:idx], int(node[idx + 1:])
  335
+    if host.startswith('['):
  336
+        host = host[1:-1]
  337
+    return host, port
76  asyncmongo/cursor.py
@@ -19,7 +19,7 @@
19 19
 from bson.son import SON
20 20
 
21 21
 import helpers
22  
-import message
  22
+import message as message_factory
23 23
 import functools
24 24
 
25 25
 _QUERY_OPTIONS = {
@@ -114,15 +114,12 @@ def insert(self, doc_or_docs,
114 114
         if callback:
115 115
             callback = functools.partial(self._handle_response, orig_callback=callback)
116 116
 
117  
-        connection = self.__pool.connection()
118  
-        try:
119  
-            connection.send_message(
120  
-                message.insert(self.full_collection_name, docs,
121  
-                    check_keys, safe, kwargs), callback=callback)
122  
-        except:
123  
-            connection.close()
124  
-            raise
125  
-    
  117
+        message = message_factory.insert(self.full_collection_name, docs,
  118
+                    check_keys, safe, kwargs)
  119
+        connection_callback = functools.partial(self._send_message,
  120
+                message=message, callback=callback)
  121
+        self.__pool.connection(connection_callback)
  122
+
126 123
     def remove(self, spec_or_id=None, safe=True, callback=None, **kwargs):
127 124
         if not isinstance(safe, bool):
128 125
             raise TypeError("safe must be an instance of bool")
@@ -144,16 +141,12 @@ def remove(self, spec_or_id=None, safe=True, callback=None, **kwargs):
144 141
         if callback:
145 142
             callback = functools.partial(self._handle_response, orig_callback=callback)
146 143
 
147  
-        connection = self.__pool.connection()
148  
-        try:
149  
-            connection.send_message(
150  
-                message.delete(self.full_collection_name, spec_or_id, safe, kwargs),
151  
-                    callback=callback)
152  
-        except:
153  
-            connection.close()
154  
-            raise
  144
+        message = message_factory.delete(self.full_collection_name,
  145
+                spec_or_id, safe, kwargs)
  146
+        connection_callback = functools.partial(self._send_message,
  147
+                message=message, callback=callback)
  148
+        self.__pool.connection(connection_callback)
155 149
 
156  
-    
157 150
     def update(self, spec, document, upsert=False, manipulate=False,
158 151
                safe=True, multi=False, callback=None, **kwargs):
159 152
         """Update a document(s) in this collection.
@@ -230,32 +223,28 @@ def update(self, spec, document, upsert=False, manipulate=False,
230 223
         # TODO: apply SON manipulators
231 224
         # if upsert and manipulate:
232 225
         #     document = self.__database._fix_incoming(document, self)
233  
-        
  226
+
234 227
         if kwargs:
235 228
             safe = True
236  
-        
  229
+
237 230
         if safe and not callable(callback):
238 231
             raise TypeError("callback must be callable")
239 232
         if not safe and callback is not None:
240 233
             raise TypeError("callback can not be used with safe=False")
241  
-        
  234
+
242 235
         if callback:
243 236
             callback = functools.partial(self._handle_response, orig_callback=callback)
244 237
 
245 238
         self.__limit = None
246  
-        connection = self.__pool.connection()
247  
-        try:
248  
-            connection.send_message(
249  
-                message.update(self.full_collection_name, upsert, multi,
250  
-                    spec, document, safe, kwargs), callback=callback)
251  
-        except:
252  
-            connection.close()
253  
-            raise
  239
+        message = message_factory.update(self.full_collection_name, upsert,
  240
+                multi, spec, document, safe, kwargs)
  241
+        connection_callback = functools.partial(self._send_message,
  242
+                message=message, callback=callback)
  243
+        self.__pool.connection(connection_callback)
254 244
 
255  
-    
256 245
     def find_one(self, spec_or_id, **kwargs):
257 246
         """Get a single document from the database.
258  
-        
  247
+
259 248
         All arguments to :meth:`find` are also valid arguments for
260 249
         :meth:`find_one`, although any `limit` argument will be
261 250
         ignored. Returns a single document, or ``None`` if no matching
@@ -358,7 +347,7 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
358 347
         self.__skip = skip
359 348
         self.__limit = limit
360 349
         self.__batch_size = 0
361  
-        
  350
+
362 351
         self.__timeout = timeout
363 352
         self.__tailable = tailable
364 353
         self.__snapshot = snapshot
@@ -371,14 +360,21 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
371 360
         self.__tz_aware = False #collection.database.connection.tz_aware
372 361
         self.__must_use_master = _must_use_master
373 362
         self.__is_command = _is_command
374  
-        
375  
-        connection = self.__pool.connection()
  363
+
  364
+        message = message_factory.query(self.__query_options(),
  365
+                self.full_collection_name, self.__skip, self.__limit,
  366
+                self.__query_spec(), self.__fields)
  367
+        message_callback = functools.partial(self._handle_response,
  368
+                orig_callback=callback)
  369
+        connection_callback = functools.partial(self._send_message,
  370
+                message=message, callback=message_callback)
  371
+        self.__pool.connection(connection_callback)
  372
+
  373
+    def _send_message(self, connection, message, callback=None, error=None):
  374
+        if error:
  375
+            raise error
376 376
         try:
377  
-            connection.send_message(
378  
-                message.query(self.__query_options(),
379  
-                              self.full_collection_name,
380  
-                              self.__skip, self.__limit,
381  
-                              self.__query_spec(), self.__fields), callback=functools.partial(self._handle_response, orig_callback=callback))
  377
+            connection.send_message(message, callback)
382 378
         except:
383 379
             connection.close()
384 380
     
70  asyncmongo/pool.py
@@ -18,6 +18,9 @@
18 18
 import logging
19 19
 from errors import TooManyConnections, ProgrammingError
20 20
 from connection import Connection
  21
+from pymongo import uri_parser
  22
+from bson.son import SON
  23
+
21 24
 
22 25
 class ConnectionPools(object):
23 26
     """ singleton to keep track of named connection pools """
@@ -61,6 +64,8 @@ class ConnectionPool(object):
61 64
     
62 65
     """
63 66
     def __init__(self, 
  67
+                host=None, 
  68
+                port=27017, 
64 69
                 mincached=0, 
65 70
                 maxcached=0, 
66 71
                 maxconnections=0, 
@@ -68,12 +73,19 @@ def __init__(self,
68 73
                 dbname=None, 
69 74
                 slave_okay=False, 
70 75
                 *args, **kwargs):
  76
+
  77
+        if isinstance(host, basestring):
  78
+            host = [host]
  79
+        else:
  80
+            assert isinstance(host, list)
  81
+        assert isinstance(port, int)
71 82
         assert isinstance(mincached, int)
72 83
         assert isinstance(maxcached, int)
73 84
         assert isinstance(maxconnections, int)
74 85
         assert isinstance(maxusage, int)
75 86
         assert isinstance(dbname, (str, unicode, None.__class__))
76 87
         assert isinstance(slave_okay, bool)
  88
+
77 89
         if mincached and maxcached:
78 90
             assert mincached <= maxcached
79 91
         if maxconnections:
@@ -86,36 +98,62 @@ def __init__(self,
86 98
         self._maxconnections = maxconnections
87 99
         self._idle_cache = [] # the actual connections that can be used
88 100
         self._condition = Condition()
89  
-        self._dbname = dbname
90  
-        self._slave_okay = slave_okay
  101
+        self._kwargs['slave_okay'] = self._slave_okay = slave_okay
91 102
         self._connections = 0
92  
-        
  103
+
  104
+        nodes = set()
  105
+        username = None  # TODO: username/password ignored for now
  106
+        password = None
  107
+        for entity in host:
  108
+            if "://" in entity:
  109
+                if entity.startswith("mongodb://"):
  110
+                    res = uri_parser.parse_uri(entity, port)
  111
+                    nodes.update(res["nodelist"])
  112
+                    username = res["username"] or username
  113
+                    password = res["password"] or password
  114
+                    dbname = res["database"] or dbname
  115
+                else:
  116
+                    idx = entity.find("://")
  117
+                    raise ProgrammingError("Invalid URI scheme: "
  118
+                                     "%s" % (entity[:idx],))
  119
+            else:
  120
+                nodes.update(uri_parser.split_hosts(entity, port))
  121
+        if not nodes:
  122
+            raise ProgrammingError("Need to specify at least one host")
  123
+        self._nodes = nodes
  124
+        self._dbname = dbname
  125
+
93 126
         # Establish an initial number of idle database connections:
94 127
         idle = [self.connection() for i in range(mincached)]
95 128
         while idle:
96 129
             self.cache(idle.pop())
97  
-    
98  
-    def new_connection(self):
  130
+
  131
+    def new_connection(self, callback):
99 132
         kwargs = self._kwargs
100 133
         kwargs['pool'] = self
101  
-        return Connection(*self._args, **kwargs)
102  
-    
103  
-    def connection(self):
  134
+        return Connection(*self._args, nodes=self._nodes,
  135
+                create_callback=callback, **kwargs)
  136
+
  137
+    def connection(self, callback):
104 138
         """ get a cached connection from the pool """
105  
-        
  139
+
  140
+        con = None
106 141
         self._condition.acquire()
107 142
         try:
108 143
             if (self._maxconnections and self._connections >= self._maxconnections):
109  
-                raise TooManyConnections("%d connections are active greater than max: %d" % (self._connections, self._maxconnections))
  144
+                raise TooManyConnections("%d connections are active greater "
  145
+                        "than max: %d" % (self._connections, self._maxconnections))
110 146
             # connection limit not reached, get a dedicated connection
111 147
             try: # first try to get it from the idle cache
112 148
                 con = self._idle_cache.pop(0)
113  
-            except IndexError: # else get a fresh connection
114  
-                con = self.new_connection()
  149
+            except IndexError: # else get a fresh connection, async
  150
+                self.new_connection(callback)
115 151
             self._connections += 1
116 152
         finally:
117 153
             self._condition.release()
118  
-        return con
  154
+        # reusing a connection, so send it to the callback
  155
+        if con:
  156
+            callback(con)
119 157
 
120 158
     def cache(self, con):
121 159
         """Put a dedicated connection back into the idle cache."""
@@ -167,3 +205,9 @@ def __set_slave_okay(self, value):
167 205
         self._slave_okay = value
168 206
 
169 207
     slave_okay = property(__get_slave_okay, __set_slave_okay)
  208
+
  209
+    def command(self, command, value=1, **kwargs):
  210
+        if isinstance(command, basestring):
  211
+            command = SON([(command, value)])
  212
+
  213
+        self["$cmd"].find_one(command, _is_command=True, **kwargs)
4  setup.py
@@ -18,7 +18,7 @@
18 18
         "License :: OSI Approved :: Apache Software License",
19 19
     ],
20 20
     packages=['asyncmongo'],
21  
-    install_requires=['pymongo>=1.9', 'tornado'],
22  
-    requires=['pymongo (>=1.9)', 'tornado'],
  21
+    install_requires=['pymongo>=2.0', 'tornado'],
  22
+    requires=['pymongo (>=2.0)', 'tornado'],
23 23
     download_url="http://github.com/downloads/bitly/asyncmongo/asyncmongo-%s.tar.gz" % version,
24 24
 )
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.