Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #58 from manover/add_replicaset_support

Add replicaset support
  • Loading branch information...
commit 810e11b8bca4393cd14815a3619467de02ca231d 2 parents 745c33c + 93282ad
@jehiah jehiah authored
View
8 .travis.yml
@@ -5,8 +5,12 @@ python:
- "2.7"
services: mongodb
install:
- - "pip install tornado --use-mirrors"
+ - "pip install tornado==2.4.1 --use-mirrors"
- "pip install pymongo --use-mirrors"
-script: py.test
+env:
+ - TEST_REPLICA_SETS="true"
+ - TEST_REPLICA_SETS="false"
+script:
+ - if [[ $TEST_REPLICA_SETS == 'true' ]]; then py.test test/test_replica_set.py; else py.test --ignore=test/test_replica_set.py; fi
notifications:
email: false
View
2  MANIFEST
@@ -9,6 +9,7 @@ asyncmongo/errors.py
asyncmongo/helpers.py
asyncmongo/message.py
asyncmongo/pool.py
+asyncmongo/asyncjobs.py
asyncmongo/backends/__init__.py
asyncmongo/backends/glib2_backend.py
asyncmongo/backends/glib3_backend.py
@@ -28,5 +29,6 @@ test/test_query.py
test/test_safe_updates.py
test/test_shunt.py
test/test_slave_only.py
+test/test_replica_set.py
test/testgtk2/test.py
test/testgtk3/test.py
View
191 asyncmongo/asyncjobs.py
@@ -0,0 +1,191 @@
+#!/bin/env python
+#
+# Copyright 2013 bit.ly
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Tools for creating `messages
+<http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol>`_ to be sent to
+MongoDB.
+
+.. note:: This module is for internal use and is generally not needed by
+ application developers.
+"""
+
+import logging
+from bson import SON
+
+import message
+import helpers
+from errors import AuthenticationError, RSConnectionError, InterfaceError
+
+class AsyncMessage(object):
+ def __init__(self, connection, message, callback):
+ super(AsyncMessage, self).__init__()
+ self.connection = connection
+ self.message = message
+ self.callback = callback
+
+ def process(self, *args, **kwargs):
+ try:
+ self.connection._send_message(self.message, self.callback)
+ except Exception, e:
+ if self.callback is None:
+ logging.error("Error occurred in safe update mode: %s", e)
+ else:
+ self.callback(None, e)
+
+class AuthorizeJob(object):
+ def __init__(self, connection, dbuser, dbpass, pool):
+ super(AuthorizeJob, self).__init__()
+ self.connection = connection
+ self._state = "start"
+ self.dbuser = dbuser
+ self.dbpass = dbpass
+ self.pool = pool
+
+ def __repr__(self):
+ return "AuthorizeJob at 0x%X, state = %r" % (id(self), self._state)
+
+ def process(self, response=None, error=None):
+ if error:
+ logging.debug(error)
+ logging.debug(response)
+ raise AuthenticationError(error)
+
+ if self._state == "start":
+ self._state = "nonce"
+ logging.debug("Sending nonce")
+ msg = message.query(
+ 0,
+ "%s.$cmd" % self.pool._dbname,
+ 0,
+ 1,
+ SON({'getnonce': 1}),
+ SON({})
+ )
+ self.connection._send_message(msg, self.process)
+ elif self._state == "nonce":
+ # this is the nonce response
+ self._state = "finish"
+ nonce = response['data'][0]['nonce']
+ logging.debug("Nonce received: %r", nonce)
+ key = helpers._auth_key(nonce, self.dbuser, self.dbpass)
+
+ msg = message.query(
+ 0,
+ "%s.$cmd" % self.pool._dbname,
+ 0,
+ 1,
+ SON([('authenticate', 1),
+ ('user', self.dbuser),
+ ('nonce', nonce),
+ ('key', key)]),
+ SON({})
+ )
+ self.connection._send_message(msg, self.process)
+ elif self._state == "finish":
+ self._state = "done"
+ assert response['number_returned'] == 1
+ response = response['data'][0]
+ if response['ok'] != 1:
+ logging.debug('Failed authentication %s' % response['errmsg'])
+ raise AuthenticationError(response['errmsg'])
+ self.connection._next_job()
+ else:
+ raise ValueError("Unexpected state: %s" % self._state)
+
+class ConnectRSJob(object):
+ def __init__(self, connection, seed, rs):
+ self.connection = connection
+ self.known_hosts = set(seed)
+ self.rs = rs
+ self._tried_hosts = set()
+ self._state = "seed"
+ self._primary = None
+
+ def __repr__(self):
+ return "ConnectRSJob at 0x%X, state = %s" % (id(self), self._state)
+
+ def process(self, response=None, error=None):
+ if error:
+ logging.debug("Problem connecting: %s", error)
+
+ if self._state == "ismaster":
+ self._state = "seed"
+
+ if self._state == "seed":
+ fresh = self.known_hosts ^ self._tried_hosts
+ logging.debug("Working through the rest of the host list: %r", fresh)
+
+ while fresh:
+ if self._primary and self._primary not in self._tried_hosts:
+ # Try primary first
+ h = self._primary
+ else:
+ h = fresh.pop()
+
+ self._tried_hosts.add(h)
+
+ logging.debug("Connecting to %s:%s", *h)
+ self.connection._host, self.connection._port = h
+ try:
+ self.connection._socket_connect()
+ logging.debug("Connected to %s", h)
+ except InterfaceError, e:
+ logging.error("Failed to connect to the host: %s", e)
+ else:
+ break
+
+ else:
+ raise RSConnectionError("No more hosts to try, tried: %s" % self.known_hosts)
+
+ self._state = "ismaster"
+ msg = message.query(
+ options=0,
+ collection_name="admin.$cmd",
+ num_to_skip=0,
+ num_to_return=-1,
+ query=SON([("ismaster", 1)])
+ )
+ self.connection._send_message(msg, self.process)
+
+ elif self._state == "ismaster":
+ logging.debug("ismaster response: %r", response)
+
+ if len(response["data"]) == 1:
+ res = response["data"][0]
+ else:
+ raise RSConnectionError("Invalid response data: %r" % response["data"])
+
+ rs_name = res.get("setName")
+ if rs_name:
+ if rs_name != self.rs:
+ raise RSConnectionError("Wrong replica set: %s, expected: %s" %
+ (rs_name, self.rs))
+ hosts = res.get("hosts")
+ if hosts:
+ self.known_hosts.update(helpers._parse_host(h) for h in hosts)
+
+ ismaster = res.get("ismaster")
+ if ismaster:
+ logging.info("Connected to master")
+ self._state = "done"
+ self.connection._next_job()
+ else:
+ primary = res.get("primary")
+ if primary:
+ self._primary = helpers._parse_host(primary)
+
+ self._state = "seed"
+ self.process()
View
169 asyncmongo/connection.py
@@ -14,58 +14,70 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""Tools for creating `messages
-<http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol>`_ to be sent to
-MongoDB.
-
-.. note:: This module is for internal use and is generally not needed by
- application developers.
-"""
-
import sys
import socket
import struct
import logging
+from types import NoneType
-from bson import SON
-from errors import ProgrammingError, IntegrityError, InterfaceError, AuthenticationError
-import message
+from errors import ProgrammingError, IntegrityError, InterfaceError
import helpers
+import asyncjobs
+
class Connection(object):
"""
:Parameters:
- - `host`: hostname or ip of mongo host
- - `port`: port to connect to
+ - `host`: hostname or ip of mongo host (not allowed when replica sets are used)
+ - `port`: port to connect to (not allowed when replica sets are used)
- `dbuser`: db user to connect with
- `dbpass`: db password
- `autoreconnect` (optional): auto reconnect on interface errors
+ - `rs`: replica set name (required when replica sets are used)
+ - `seed`: seed list to connect to a replica set (required when replica sets are used)
- `**kwargs`: passed to `backends.AsyncBackend.register_stream`
-
+
"""
- def __init__(self, host, port, dbuser=None, dbpass=None, autoreconnect=True, pool=None,
- backend="tornado", **kwargs):
- assert isinstance(host, (str, unicode))
- assert isinstance(port, int)
+ def __init__(self,
+ host=None,
+ port=None,
+ dbuser=None,
+ dbpass=None,
+ autoreconnect=True,
+ pool=None,
+ backend="tornado",
+ rs=None,
+ seed=None,
+ **kwargs):
assert isinstance(autoreconnect, bool)
- assert isinstance(dbuser, (str, unicode, None.__class__))
- assert isinstance(dbpass, (str, unicode, None.__class__))
- assert isinstance(kwargs, (dict, None.__class__))
+ assert isinstance(dbuser, (str, unicode, NoneType))
+ assert isinstance(dbpass, (str, unicode, NoneType))
+ assert isinstance(rs, (str, NoneType))
assert pool
- self.__host = host
- self.__port = port
+
+ if rs:
+ assert host is None
+ assert port is None
+ assert isinstance(seed, (set, list))
+ else:
+ assert isinstance(host, (str, unicode))
+ assert isinstance(port, int)
+ assert seed is None
+
+ self._host = host
+ self._port = port
+ self.__rs = rs
+ self.__seed = seed
self.__dbuser = dbuser
self.__dbpass = dbpass
self.__stream = None
self.__callback = None
self.__alive = False
- self.__authenticate = False
self.__autoreconnect = autoreconnect
self.__pool = pool
- self.__deferred_message = None
- self.__deferred_callback = None
self.__kwargs = kwargs
self.__backend = self.__load_backend(backend)
+ self.__job_queue = []
self.usage_count = 0
self.__connect()
@@ -75,18 +87,27 @@ def __load_backend(self, name):
return mod.AsyncBackend()
def __connect(self):
+ if self.__dbuser and self.__dbpass:
+ self._put_job(asyncjobs.AuthorizeJob(self, self.__dbuser, self.__dbpass, self.__pool))
+
+ if self.__rs:
+ self._put_job(asyncjobs.ConnectRSJob(self, self.__seed, self.__rs))
+ # Mark the connection as alive, even though it's not alive yet to prevent double-connecting
+ self.__alive = True
+ else:
+ self._socket_connect()
+
+ def _socket_connect(self):
+ """create a socket, connect, register a stream with the async backend"""
self.usage_count = 0
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- s.connect((self.__host, self.__port))
+ s.connect((self._host, self._port))
self.__stream = self.__backend.register_stream(s, **self.__kwargs)
self.__stream.set_close_callback(self._socket_close)
self.__alive = True
except socket.error, error:
raise InterfaceError(error)
-
- if self.__dbuser and self.__dbpass:
- self.__authenticate = True
def _socket_close(self):
"""cleanup after the socket is closed by the other end"""
@@ -108,7 +129,7 @@ def close(self):
"""close this connection; re-cache this connection object"""
self._close()
self.__pool.cache(self)
-
+
def send_message(self, message, callback):
""" send a message over the wire; callback=None indicates a safe=False call where we write and forget about it"""
@@ -121,15 +142,28 @@ def send_message(self, message, callback):
else:
raise InterfaceError('connection invalid. autoreconnect=False')
- if self.__authenticate:
- self.__deferred_message = message
- self.__deferred_callback = callback
- self._get_nonce(self._start_authentication)
- else:
- self.__callback = callback
- self._send_message(message)
+ # Put the current message on the bottom of the queue
+ self._put_job(asyncjobs.AsyncMessage(self, message, callback), 0)
+ self._next_job()
+
+ def _put_job(self, job, pos=None):
+ if pos is None:
+ pos = len(self.__job_queue)
+ self.__job_queue.insert(pos, job)
+
+ def _next_job(self):
+ """execute the next job from the top of the queue"""
+ if self.__job_queue:
+ # Produce message from the top of the queue
+ job = self.__job_queue.pop()
+ # logging.debug("queue = %s, popped %r", self.__job_queue, job)
+ job.process()
- def _send_message(self, message):
+ def _send_message(self, message, callback):
+ # logging.debug("_send_message, msg = %r: queue = %r, self.__callback = %r, callback = %r",
+ # message, self.__job_queue, self.__callback, callback)
+
+ self.__callback = callback
self.usage_count +=1
# __request_id used by get_more()
(self.__request_id, data) = message
@@ -140,7 +174,7 @@ def _send_message(self, message):
else:
self.__request_id = None
self.__pool.cache(self)
-
+
except IOError:
self.__alive = False
raise
@@ -166,12 +200,12 @@ def _parse_response(self, response):
request_id = self.__request_id
self.__request_id = None
self.__callback = None
- if not self.__deferred_message:
+ if not self.__job_queue:
# skip adding to the cache because there is something else
# that needs to be called on this connection for this request
# (ie: we authenticted, but still have to send the real req)
self.__pool.cache(self)
-
+
try:
response = helpers._unpack_response(response, request_id) # TODO: pass tz_awar
except Exception, e:
@@ -183,54 +217,3 @@ def _parse_response(self, response):
callback(response, IntegrityError(response['data'][0]['err'], code=response['data'][0]['code']))
return
callback(response)
-
- def _start_authentication(self, response, error=None):
- # this is the nonce response
- if error:
- logging.debug(error)
- logging.debug(response)
- raise AuthenticationError(error)
- nonce = response['data'][0]['nonce']
- key = helpers._auth_key(nonce, self.__dbuser, self.__dbpass)
-
- self.__callback = self._finish_authentication
- self._send_message(
- message.query(0,
- "%s.$cmd" % self.__pool._dbname,
- 0,
- 1,
- SON([('authenticate', 1), ('user' , self.__dbuser), ('nonce' , nonce), ('key' , key)]),
- SON({})))
-
- def _finish_authentication(self, response, error=None):
- if error:
- self.__deferred_message = None
- self.__deferred_callback = None
- raise AuthenticationError(error)
- assert response['number_returned'] == 1
- response = response['data'][0]
- if response['ok'] != 1:
- logging.debug('Failed authentication %s' % response['errmsg'])
- self.__deferred_message = None
- self.__deferred_callback = None
- raise AuthenticationError(response['errmsg'])
-
- message = self.__deferred_message
- callback = self.__deferred_callback
- self.__deferred_message = None
- self.__deferred_callback = None
- self.__callback = callback
- # continue the original request
- self._send_message(message)
-
- def _get_nonce(self, callback):
- assert self.__callback is None
- self.__callback = callback
- self._send_message(
- message.query(0,
- "%s.$cmd" % self.__pool._dbname,
- 0,
- 1,
- SON({'getnonce' : 1}),
- SON({})
- ))
View
3  asyncmongo/errors.py
@@ -29,6 +29,9 @@ class Error(StandardError):
class InterfaceError(Error):
pass
+class RSConnectionError(Error):
+ pass
+
class DatabaseError(Error):
pass
View
9 asyncmongo/helpers.py
@@ -7,6 +7,15 @@
from asyncmongo.errors import (DatabaseError, InterfaceError)
+def _parse_host(h):
+ try:
+ host, port = h.split(":", 1)
+ port = int(port)
+ except ValueError:
+ raise ValueError("Wrong host:port value: %s" % h)
+
+ return host, port
+
def _unpack_response(response, cursor_id=None, as_class=dict, tz_aware=False):
"""Unpack a response from the database.
View
1  test/test_authentication.py
@@ -21,6 +21,7 @@ def test_authentication(self):
db = asyncmongo.Client(pool_id='testauth', host='127.0.0.1', port=27018, dbname='test', dbuser='testuser', dbpass='testpass', maxconnections=2)
def update_callback(response, error):
+ logging.info("UPDATE:")
tornado.ioloop.IOLoop.instance().stop()
logging.info(response)
assert len(response) == 1
View
161 test/test_replica_set.py
@@ -0,0 +1,161 @@
+import tornado.ioloop
+import time
+import logging
+import subprocess
+import socket
+
+import test_shunt
+import asyncmongo
+import asyncmongo.connection
+import asyncmongo.errors
+
+TEST_TIMESTAMP = int(time.time())
+
+class ReplicaSetTest(test_shunt.MongoTest):
+ mongod_options = [
+ ('--port', '27018', '--replSet', 'rs0'),
+ ('--port', '27019', '--replSet', 'rs0'),
+ ('--port', '27020', '--replSet', 'rs0'),
+ ]
+
+ def mongo_cmd(self, cmd, port=27018, res='"ok" : 1'):
+ pipe = subprocess.Popen("mongo --port %d" % port, shell=True,
+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
+ reply = pipe.communicate(cmd)[0]
+ assert reply.find(res) > 0
+ return reply
+
+ def wait_master(self, port):
+ while True:
+ if self.mongo_cmd("db.isMaster();", port).find('"ismaster" : true') > 0:
+ logging.info("%d is a master", port)
+ break
+ else:
+ logging.info("Waiting for %d to become master", port)
+ time.sleep(5)
+
+ def setUp(self):
+ super(ReplicaSetTest, self).setUp()
+ hostname = socket.gethostname()
+ logging.info("configuring a replica set at %s" % hostname)
+ cfg = """
+ {
+ "_id" : "rs0",
+ "members" : [
+ {
+ "_id" : 0,
+ "host" : "%(hostname)s:27018"
+ },
+ {
+ "_id" : 1,
+ "host" : "%(hostname)s:27019",
+ "priority" : 2
+ },
+ {
+ "_id" : 2,
+ "host" : "%(hostname)s:27020",
+ "priority" : 0
+ }
+ ]
+ }
+ """ % dict(hostname=hostname)
+ self.mongo_cmd("rs.initiate(%s);" % cfg)
+ logging.info("waiting for replica set to finish configuring")
+ self.wait_master(27019)
+
+ def test_connection(self):
+ class Pool(object):
+ def __init__(self):
+ super(Pool, self).__init__()
+ self._cache = []
+
+ def cache(self, c):
+ self._cache.append(c)
+
+ class AsyncClose(object):
+ def process(self, *args, **kwargs):
+ tornado.ioloop.IOLoop.instance().stop()
+
+ hostname = socket.gethostname()
+ try:
+ conn = asyncmongo.connection.Connection(pool=Pool(),
+ seed=[(hostname, 27018), (hostname, 27020)],
+ rs="rs0")
+
+ conn._put_job(AsyncClose(), 0)
+ conn._next_job()
+ tornado.ioloop.IOLoop.instance().start()
+
+ assert conn._host == hostname
+ assert conn._port == 27019
+
+ except:
+ tornado.ioloop.IOLoop.instance().stop()
+ raise
+
+ def test_update(self):
+ try:
+ test_shunt.setup()
+ db = asyncmongo.Client(pool_id='testrs', rs="rs0", seed=[("127.0.0.1", 27020)], dbname='test', maxconnections=2)
+
+ # Update
+ def update_callback(response, error):
+ logging.info("UPDATE:")
+ tornado.ioloop.IOLoop.instance().stop()
+ logging.info(response)
+ assert len(response) == 1
+ test_shunt.register_called('update')
+
+ db.test_stats.update({"_id" : TEST_TIMESTAMP}, {'$inc' : {'test_count' : 1}}, upsert=True, callback=update_callback)
+
+ tornado.ioloop.IOLoop.instance().start()
+ test_shunt.assert_called('update')
+
+ # Retrieve the updated value
+ def query_callback(response, error):
+ tornado.ioloop.IOLoop.instance().stop()
+ logging.info(response)
+ logging.info(error)
+ assert error is None
+ assert isinstance(response, dict)
+ assert response['_id'] == TEST_TIMESTAMP
+ assert response['test_count'] == 1
+ test_shunt.register_called('retrieved')
+
+ db.test_stats.find_one({"_id" : TEST_TIMESTAMP}, callback=query_callback)
+ tornado.ioloop.IOLoop.instance().start()
+ test_shunt.assert_called('retrieved')
+
+ # Switch the master
+ self.mongo_cmd(
+ "cfg = rs.conf(); cfg.members[1].priority = 1; cfg.members[0].priority = 2; rs.reconfig(cfg);",
+ 27019, "reconnected to server")
+ self.wait_master(27018)
+
+ # Expect the connection to be closed
+ def query_err_callback(response, error):
+ tornado.ioloop.IOLoop.instance().stop()
+ logging.info(response)
+ logging.info(error)
+ assert isinstance(error, Exception)
+
+ db.test_stats.find_one({"_id" : TEST_TIMESTAMP}, callback=query_err_callback)
+ tornado.ioloop.IOLoop.instance().start()
+
+ # Retrieve the updated value again, from the new master
+ def query_again_callback(response, error):
+ tornado.ioloop.IOLoop.instance().stop()
+ logging.info(response)
+ logging.info(error)
+ assert error is None
+ assert isinstance(response, dict)
+ assert response['_id'] == TEST_TIMESTAMP
+ assert response['test_count'] == 1
+ test_shunt.register_called('retrieved_again')
+
+ db.test_stats.find_one({"_id" : TEST_TIMESTAMP}, callback=query_again_callback)
+ tornado.ioloop.IOLoop.instance().start()
+ test_shunt.assert_called('retrieved_again')
+ except:
+ tornado.ioloop.IOLoop.instance().stop()
+ raise
View
6 test/test_shunt.py
@@ -46,7 +46,7 @@ def setUp(self):
# PuritanicalIOLoop instead of a default loop.
if not tornado.ioloop.IOLoop.initialized():
self.loop = PuritanicalIOLoop()
- self.loop.install()
+ tornado.ioloop.IOLoop._instance = self.loop
else:
self.loop = tornado.ioloop.IOLoop.instance()
self.assert_(
@@ -61,8 +61,8 @@ def setUp(self):
os.makedirs(dirname)
self.temp_dirs.append(dirname)
- options = ['mongod', '--bind_ip', '127.0.0.1', '--oplogSize', '10',
- '--dbpath', dirname, '--smallfiles', '-v', '--nojournal'] + list(options)
+ options = ['mongod', '--oplogSize', '2', '--dbpath', dirname,
+ '--smallfiles', '-v', '--nojournal'] + list(options)
logging.debug(options)
pipe = subprocess.Popen(options)
self.mongods.append(pipe)
Please sign in to comment.
Something went wrong with that request. Please try again.