Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Add replicaset support #58

Merged
merged 1 commit into from

4 participants

@manover

ReplicaSet support

@manover

@jehiah @mreiferson @ploxiln
Basic RS support implemented, no working tests yet

asyncmongo/connection.py
((33 lines not shown))
- `dbuser`: db user to connect with
- `dbpass`: db password
- `autoreconnect` (optional): auto reconnect on interface errors
+ - `rs` (optional): replica set name (required when replica sets are used)
+ - `seed` (optional): seed list to connect to a replica set (required when replica sets are used)
@jehiah Owner
jehiah added a note

It would be nice to either a) make this a more structured [(host, port)...] list, or at least ['host:port', ...] or switch this to code that handles mongo uri's and have ['mongo://host:port',...]. In either case since it's a list of spots to connect to, let's make it a list instead of a comma separated string.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@jehiah
Owner

@manover this looks like a very good way to structure the authentication/replica set discovery flow. nice work so far.

@SeanOC SeanOC commented on the diff
test/test_replica_set.py
((17 lines not shown))
+ ('--port', '27020', '--replSet', 'rs0'),
+ ]
+
+ def wait_master(self, port):
+ while True:
+ pipe = subprocess.Popen("echo -n 'db.isMaster();\n' |mongo --port %d --host 127.0.0.1" % port,
+ stdout=subprocess.PIPE, shell=True)
+ repl = pipe.communicate()[0]
+ if repl.find('"ismaster" : true') >= 0:
+ logging.info("%d is a master", port)
+ break
+ else:
+ logging.info("Waiting for %d to become master", port)
+ time.sleep(5)
+
+ def setUp(self):
@SeanOC Owner
SeanOC added a note

You might want to try using a setUpClass class method to do the expensive mongo setup instead of the setUp method. By using setUpClass your setup should only be run once for all of tests.

@manover
manover added a note

Yeah, I could do that or even use a module-wide setup() function, the only problems are:

  1. It would still take a lot of time, even running once, so Travis would fail
  2. I would really hate to duplicate the existing structure used by other tests (with MongoTest as a super class)

but thanks for the tip

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@manover manover was assigned
test/test_replica_set.py
((8 lines not shown))
+import asyncmongo
+import asyncmongo.connection
+import asyncmongo.errors
+
+TEST_TIMESTAMP = int(time.time())
+
+class ReplicaSetTest(test_shunt.MongoTest):
+ mongod_options = [
+ ('--port', '27018', '--replSet', 'rs0'),
+ ('--port', '27019', '--replSet', 'rs0'),
+ ('--port', '27020', '--replSet', 'rs0'),
+ ]
+
+ def mongo_cmd(self, cmd, port=27018, res='"ok" : 1'):
+ pipe = subprocess.Popen("echo '%s' | mongo --port %d" % (cmd, port), shell=True, stdout=subprocess.PIPE)
+ repl = pipe.communicate()[0]
@ploxiln
ploxiln added a note

could be:

mongo_cli = subprocess.Popen(["mongo", "--port", str(port)], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
reply, _ = mongo_cli.communicate(cmd)

which I arbitrarily consider to be more elegant :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@jehiah
Owner

@manover changes look great. go ahead and squash this change.

@jehiah jehiah merged commit 810e11b into bitly:master

1 check passed

Details default The Travis build passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Apr 3, 2013
  1. Add support for replica sets

    Denis Bychkov authored
This page is out of date. Refresh to see the latest.
View
8 .travis.yml
@@ -5,8 +5,12 @@ python:
- "2.7"
services: mongodb
install:
- - "pip install tornado --use-mirrors"
+ - "pip install tornado==2.4.1 --use-mirrors"
- "pip install pymongo --use-mirrors"
-script: py.test
+env:
+ - TEST_REPLICA_SETS="true"
+ - TEST_REPLICA_SETS="false"
+script:
+ - if [[ $TEST_REPLICA_SETS == 'true' ]]; then py.test test/test_replica_set.py; else py.test --ignore=test/test_replica_set.py; fi
notifications:
email: false
View
2  MANIFEST
@@ -9,6 +9,7 @@ asyncmongo/errors.py
asyncmongo/helpers.py
asyncmongo/message.py
asyncmongo/pool.py
+asyncmongo/asyncjobs.py
asyncmongo/backends/__init__.py
asyncmongo/backends/glib2_backend.py
asyncmongo/backends/glib3_backend.py
@@ -28,5 +29,6 @@ test/test_query.py
test/test_safe_updates.py
test/test_shunt.py
test/test_slave_only.py
+test/test_replica_set.py
test/testgtk2/test.py
test/testgtk3/test.py
View
191 asyncmongo/asyncjobs.py
@@ -0,0 +1,191 @@
+#!/bin/env python
+#
+# Copyright 2013 bit.ly
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Tools for creating `messages
+<http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol>`_ to be sent to
+MongoDB.
+
+.. note:: This module is for internal use and is generally not needed by
+ application developers.
+"""
+
+import logging
+from bson import SON
+
+import message
+import helpers
+from errors import AuthenticationError, RSConnectionError, InterfaceError
+
+class AsyncMessage(object):
+ def __init__(self, connection, message, callback):
+ super(AsyncMessage, self).__init__()
+ self.connection = connection
+ self.message = message
+ self.callback = callback
+
+ def process(self, *args, **kwargs):
+ try:
+ self.connection._send_message(self.message, self.callback)
+ except Exception, e:
+ if self.callback is None:
+ logging.error("Error occurred in safe update mode: %s", e)
+ else:
+ self.callback(None, e)
+
+class AuthorizeJob(object):
+ def __init__(self, connection, dbuser, dbpass, pool):
+ super(AuthorizeJob, self).__init__()
+ self.connection = connection
+ self._state = "start"
+ self.dbuser = dbuser
+ self.dbpass = dbpass
+ self.pool = pool
+
+ def __repr__(self):
+ return "AuthorizeJob at 0x%X, state = %r" % (id(self), self._state)
+
+ def process(self, response=None, error=None):
+ if error:
+ logging.debug(error)
+ logging.debug(response)
+ raise AuthenticationError(error)
+
+ if self._state == "start":
+ self._state = "nonce"
+ logging.debug("Sending nonce")
+ msg = message.query(
+ 0,
+ "%s.$cmd" % self.pool._dbname,
+ 0,
+ 1,
+ SON({'getnonce': 1}),
+ SON({})
+ )
+ self.connection._send_message(msg, self.process)
+ elif self._state == "nonce":
+ # this is the nonce response
+ self._state = "finish"
+ nonce = response['data'][0]['nonce']
+ logging.debug("Nonce received: %r", nonce)
+ key = helpers._auth_key(nonce, self.dbuser, self.dbpass)
+
+ msg = message.query(
+ 0,
+ "%s.$cmd" % self.pool._dbname,
+ 0,
+ 1,
+ SON([('authenticate', 1),
+ ('user', self.dbuser),
+ ('nonce', nonce),
+ ('key', key)]),
+ SON({})
+ )
+ self.connection._send_message(msg, self.process)
+ elif self._state == "finish":
+ self._state = "done"
+ assert response['number_returned'] == 1
+ response = response['data'][0]
+ if response['ok'] != 1:
+ logging.debug('Failed authentication %s' % response['errmsg'])
+ raise AuthenticationError(response['errmsg'])
+ self.connection._next_job()
+ else:
+ raise ValueError("Unexpected state: %s" % self._state)
+
+class ConnectRSJob(object):
+ def __init__(self, connection, seed, rs):
+ self.connection = connection
+ self.known_hosts = set(seed)
+ self.rs = rs
+ self._tried_hosts = set()
+ self._state = "seed"
+ self._primary = None
+
+ def __repr__(self):
+ return "ConnectRSJob at 0x%X, state = %s" % (id(self), self._state)
+
+ def process(self, response=None, error=None):
+ if error:
+ logging.debug("Problem connecting: %s", error)
+
+ if self._state == "ismaster":
+ self._state = "seed"
+
+ if self._state == "seed":
+ fresh = self.known_hosts ^ self._tried_hosts
+ logging.debug("Working through the rest of the host list: %r", fresh)
+
+ while fresh:
+ if self._primary and self._primary not in self._tried_hosts:
+ # Try primary first
+ h = self._primary
+ else:
+ h = fresh.pop()
+
+ self._tried_hosts.add(h)
+
+ logging.debug("Connecting to %s:%s", *h)
+ self.connection._host, self.connection._port = h
+ try:
+ self.connection._socket_connect()
+ logging.debug("Connected to %s", h)
+ except InterfaceError, e:
+ logging.error("Failed to connect to the host: %s", e)
+ else:
+ break
+
+ else:
+ raise RSConnectionError("No more hosts to try, tried: %s" % self.known_hosts)
+
+ self._state = "ismaster"
+ msg = message.query(
+ options=0,
+ collection_name="admin.$cmd",
+ num_to_skip=0,
+ num_to_return=-1,
+ query=SON([("ismaster", 1)])
+ )
+ self.connection._send_message(msg, self.process)
+
+ elif self._state == "ismaster":
+ logging.debug("ismaster response: %r", response)
+
+ if len(response["data"]) == 1:
+ res = response["data"][0]
+ else:
+ raise RSConnectionError("Invalid response data: %r" % response["data"])
+
+ rs_name = res.get("setName")
+ if rs_name:
+ if rs_name != self.rs:
+ raise RSConnectionError("Wrong replica set: %s, expected: %s" %
+ (rs_name, self.rs))
+ hosts = res.get("hosts")
+ if hosts:
+ self.known_hosts.update(helpers._parse_host(h) for h in hosts)
+
+ ismaster = res.get("ismaster")
+ if ismaster:
+ logging.info("Connected to master")
+ self._state = "done"
+ self.connection._next_job()
+ else:
+ primary = res.get("primary")
+ if primary:
+ self._primary = helpers._parse_host(primary)
+
+ self._state = "seed"
+ self.process()
View
169 asyncmongo/connection.py
@@ -14,58 +14,70 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""Tools for creating `messages
-<http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol>`_ to be sent to
-MongoDB.
-
-.. note:: This module is for internal use and is generally not needed by
- application developers.
-"""
-
import sys
import socket
import struct
import logging
+from types import NoneType
-from bson import SON
-from errors import ProgrammingError, IntegrityError, InterfaceError, AuthenticationError
-import message
+from errors import ProgrammingError, IntegrityError, InterfaceError
import helpers
+import asyncjobs
+
class Connection(object):
"""
:Parameters:
- - `host`: hostname or ip of mongo host
- - `port`: port to connect to
+ - `host`: hostname or ip of mongo host (not allowed when replica sets are used)
+ - `port`: port to connect to (not allowed when replica sets are used)
- `dbuser`: db user to connect with
- `dbpass`: db password
- `autoreconnect` (optional): auto reconnect on interface errors
+ - `rs`: replica set name (required when replica sets are used)
+ - `seed`: seed list to connect to a replica set (required when replica sets are used)
- `**kwargs`: passed to `backends.AsyncBackend.register_stream`
-
+
"""
- def __init__(self, host, port, dbuser=None, dbpass=None, autoreconnect=True, pool=None,
- backend="tornado", **kwargs):
- assert isinstance(host, (str, unicode))
- assert isinstance(port, int)
+ def __init__(self,
+ host=None,
+ port=None,
+ dbuser=None,
+ dbpass=None,
+ autoreconnect=True,
+ pool=None,
+ backend="tornado",
+ rs=None,
+ seed=None,
+ **kwargs):
assert isinstance(autoreconnect, bool)
- assert isinstance(dbuser, (str, unicode, None.__class__))
- assert isinstance(dbpass, (str, unicode, None.__class__))
- assert isinstance(kwargs, (dict, None.__class__))
+ assert isinstance(dbuser, (str, unicode, NoneType))
+ assert isinstance(dbpass, (str, unicode, NoneType))
+ assert isinstance(rs, (str, NoneType))
assert pool
- self.__host = host
- self.__port = port
+
+ if rs:
+ assert host is None
+ assert port is None
+ assert isinstance(seed, (set, list))
+ else:
+ assert isinstance(host, (str, unicode))
+ assert isinstance(port, int)
+ assert seed is None
+
+ self._host = host
+ self._port = port
+ self.__rs = rs
+ self.__seed = seed
self.__dbuser = dbuser
self.__dbpass = dbpass
self.__stream = None
self.__callback = None
self.__alive = False
- self.__authenticate = False
self.__autoreconnect = autoreconnect
self.__pool = pool
- self.__deferred_message = None
- self.__deferred_callback = None
self.__kwargs = kwargs
self.__backend = self.__load_backend(backend)
+ self.__job_queue = []
self.usage_count = 0
self.__connect()
@@ -75,18 +87,27 @@ def __load_backend(self, name):
return mod.AsyncBackend()
def __connect(self):
+ if self.__dbuser and self.__dbpass:
+ self._put_job(asyncjobs.AuthorizeJob(self, self.__dbuser, self.__dbpass, self.__pool))
+
+ if self.__rs:
+ self._put_job(asyncjobs.ConnectRSJob(self, self.__seed, self.__rs))
+ # Mark the connection as alive, even though it's not alive yet to prevent double-connecting
+ self.__alive = True
+ else:
+ self._socket_connect()
+
+ def _socket_connect(self):
+ """create a socket, connect, register a stream with the async backend"""
self.usage_count = 0
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- s.connect((self.__host, self.__port))
+ s.connect((self._host, self._port))
self.__stream = self.__backend.register_stream(s, **self.__kwargs)
self.__stream.set_close_callback(self._socket_close)
self.__alive = True
except socket.error, error:
raise InterfaceError(error)
-
- if self.__dbuser and self.__dbpass:
- self.__authenticate = True
def _socket_close(self):
"""cleanup after the socket is closed by the other end"""
@@ -108,7 +129,7 @@ def close(self):
"""close this connection; re-cache this connection object"""
self._close()
self.__pool.cache(self)
-
+
def send_message(self, message, callback):
""" send a message over the wire; callback=None indicates a safe=False call where we write and forget about it"""
@@ -121,15 +142,28 @@ def send_message(self, message, callback):
else:
raise InterfaceError('connection invalid. autoreconnect=False')
- if self.__authenticate:
- self.__deferred_message = message
- self.__deferred_callback = callback
- self._get_nonce(self._start_authentication)
- else:
- self.__callback = callback
- self._send_message(message)
+ # Put the current message on the bottom of the queue
+ self._put_job(asyncjobs.AsyncMessage(self, message, callback), 0)
+ self._next_job()
+
+ def _put_job(self, job, pos=None):
+ if pos is None:
+ pos = len(self.__job_queue)
+ self.__job_queue.insert(pos, job)
+
+ def _next_job(self):
+ """execute the next job from the top of the queue"""
+ if self.__job_queue:
+ # Produce message from the top of the queue
+ job = self.__job_queue.pop()
+ # logging.debug("queue = %s, popped %r", self.__job_queue, job)
+ job.process()
- def _send_message(self, message):
+ def _send_message(self, message, callback):
+ # logging.debug("_send_message, msg = %r: queue = %r, self.__callback = %r, callback = %r",
+ # message, self.__job_queue, self.__callback, callback)
+
+ self.__callback = callback
self.usage_count +=1
# __request_id used by get_more()
(self.__request_id, data) = message
@@ -140,7 +174,7 @@ def _send_message(self, message):
else:
self.__request_id = None
self.__pool.cache(self)
-
+
except IOError:
self.__alive = False
raise
@@ -166,12 +200,12 @@ def _parse_response(self, response):
request_id = self.__request_id
self.__request_id = None
self.__callback = None
- if not self.__deferred_message:
+ if not self.__job_queue:
# skip adding to the cache because there is something else
# that needs to be called on this connection for this request
# (ie: we authenticted, but still have to send the real req)
self.__pool.cache(self)
-
+
try:
response = helpers._unpack_response(response, request_id) # TODO: pass tz_awar
except Exception, e:
@@ -183,54 +217,3 @@ def _parse_response(self, response):
callback(response, IntegrityError(response['data'][0]['err'], code=response['data'][0]['code']))
return
callback(response)
-
- def _start_authentication(self, response, error=None):
- # this is the nonce response
- if error:
- logging.debug(error)
- logging.debug(response)
- raise AuthenticationError(error)
- nonce = response['data'][0]['nonce']
- key = helpers._auth_key(nonce, self.__dbuser, self.__dbpass)
-
- self.__callback = self._finish_authentication
- self._send_message(
- message.query(0,
- "%s.$cmd" % self.__pool._dbname,
- 0,
- 1,
- SON([('authenticate', 1), ('user' , self.__dbuser), ('nonce' , nonce), ('key' , key)]),
- SON({})))
-
- def _finish_authentication(self, response, error=None):
- if error:
- self.__deferred_message = None
- self.__deferred_callback = None
- raise AuthenticationError(error)
- assert response['number_returned'] == 1
- response = response['data'][0]
- if response['ok'] != 1:
- logging.debug('Failed authentication %s' % response['errmsg'])
- self.__deferred_message = None
- self.__deferred_callback = None
- raise AuthenticationError(response['errmsg'])
-
- message = self.__deferred_message
- callback = self.__deferred_callback
- self.__deferred_message = None
- self.__deferred_callback = None
- self.__callback = callback
- # continue the original request
- self._send_message(message)
-
- def _get_nonce(self, callback):
- assert self.__callback is None
- self.__callback = callback
- self._send_message(
- message.query(0,
- "%s.$cmd" % self.__pool._dbname,
- 0,
- 1,
- SON({'getnonce' : 1}),
- SON({})
- ))
View
3  asyncmongo/errors.py
@@ -29,6 +29,9 @@ class Error(StandardError):
class InterfaceError(Error):
pass
+class RSConnectionError(Error):
+ pass
+
class DatabaseError(Error):
pass
View
9 asyncmongo/helpers.py
@@ -7,6 +7,15 @@
from asyncmongo.errors import (DatabaseError, InterfaceError)
+def _parse_host(h):
+ try:
+ host, port = h.split(":", 1)
+ port = int(port)
+ except ValueError:
+ raise ValueError("Wrong host:port value: %s" % h)
+
+ return host, port
+
def _unpack_response(response, cursor_id=None, as_class=dict, tz_aware=False):
"""Unpack a response from the database.
View
1  test/test_authentication.py
@@ -21,6 +21,7 @@ def test_authentication(self):
db = asyncmongo.Client(pool_id='testauth', host='127.0.0.1', port=27018, dbname='test', dbuser='testuser', dbpass='testpass', maxconnections=2)
def update_callback(response, error):
+ logging.info("UPDATE:")
tornado.ioloop.IOLoop.instance().stop()
logging.info(response)
assert len(response) == 1
View
161 test/test_replica_set.py
@@ -0,0 +1,161 @@
+import tornado.ioloop
+import time
+import logging
+import subprocess
+import socket
+
+import test_shunt
+import asyncmongo
+import asyncmongo.connection
+import asyncmongo.errors
+
+TEST_TIMESTAMP = int(time.time())
+
+class ReplicaSetTest(test_shunt.MongoTest):
+ mongod_options = [
+ ('--port', '27018', '--replSet', 'rs0'),
+ ('--port', '27019', '--replSet', 'rs0'),
+ ('--port', '27020', '--replSet', 'rs0'),
+ ]
+
+ def mongo_cmd(self, cmd, port=27018, res='"ok" : 1'):
+ pipe = subprocess.Popen("mongo --port %d" % port, shell=True,
+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
+ reply = pipe.communicate(cmd)[0]
+ assert reply.find(res) > 0
+ return reply
+
+ def wait_master(self, port):
+ while True:
+ if self.mongo_cmd("db.isMaster();", port).find('"ismaster" : true') > 0:
+ logging.info("%d is a master", port)
+ break
+ else:
+ logging.info("Waiting for %d to become master", port)
+ time.sleep(5)
+
+ def setUp(self):
@SeanOC Owner
SeanOC added a note

You might want to try using a setUpClass class method to do the expensive mongo setup instead of the setUp method. By using setUpClass your setup should only be run once for all of tests.

@manover
manover added a note

Yeah, I could do that or even use a module-wide setup() function, the only problems are:

  1. It would still take a lot of time, even running once, so Travis would fail
  2. I would really hate to duplicate the existing structure used by other tests (with MongoTest as a super class)

but thanks for the tip

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ super(ReplicaSetTest, self).setUp()
+ hostname = socket.gethostname()
+ logging.info("configuring a replica set at %s" % hostname)
+ cfg = """
+ {
+ "_id" : "rs0",
+ "members" : [
+ {
+ "_id" : 0,
+ "host" : "%(hostname)s:27018"
+ },
+ {
+ "_id" : 1,
+ "host" : "%(hostname)s:27019",
+ "priority" : 2
+ },
+ {
+ "_id" : 2,
+ "host" : "%(hostname)s:27020",
+ "priority" : 0
+ }
+ ]
+ }
+ """ % dict(hostname=hostname)
+ self.mongo_cmd("rs.initiate(%s);" % cfg)
+ logging.info("waiting for replica set to finish configuring")
+ self.wait_master(27019)
+
+ def test_connection(self):
+ class Pool(object):
+ def __init__(self):
+ super(Pool, self).__init__()
+ self._cache = []
+
+ def cache(self, c):
+ self._cache.append(c)
+
+ class AsyncClose(object):
+ def process(self, *args, **kwargs):
+ tornado.ioloop.IOLoop.instance().stop()
+
+ hostname = socket.gethostname()
+ try:
+ conn = asyncmongo.connection.Connection(pool=Pool(),
+ seed=[(hostname, 27018), (hostname, 27020)],
+ rs="rs0")
+
+ conn._put_job(AsyncClose(), 0)
+ conn._next_job()
+ tornado.ioloop.IOLoop.instance().start()
+
+ assert conn._host == hostname
+ assert conn._port == 27019
+
+ except:
+ tornado.ioloop.IOLoop.instance().stop()
+ raise
+
+ def test_update(self):
+ try:
+ test_shunt.setup()
+ db = asyncmongo.Client(pool_id='testrs', rs="rs0", seed=[("127.0.0.1", 27020)], dbname='test', maxconnections=2)
+
+ # Update
+ def update_callback(response, error):
+ logging.info("UPDATE:")
+ tornado.ioloop.IOLoop.instance().stop()
+ logging.info(response)
+ assert len(response) == 1
+ test_shunt.register_called('update')
+
+ db.test_stats.update({"_id" : TEST_TIMESTAMP}, {'$inc' : {'test_count' : 1}}, upsert=True, callback=update_callback)
+
+ tornado.ioloop.IOLoop.instance().start()
+ test_shunt.assert_called('update')
+
+ # Retrieve the updated value
+ def query_callback(response, error):
+ tornado.ioloop.IOLoop.instance().stop()
+ logging.info(response)
+ logging.info(error)
+ assert error is None
+ assert isinstance(response, dict)
+ assert response['_id'] == TEST_TIMESTAMP
+ assert response['test_count'] == 1
+ test_shunt.register_called('retrieved')
+
+ db.test_stats.find_one({"_id" : TEST_TIMESTAMP}, callback=query_callback)
+ tornado.ioloop.IOLoop.instance().start()
+ test_shunt.assert_called('retrieved')
+
+ # Switch the master
+ self.mongo_cmd(
+ "cfg = rs.conf(); cfg.members[1].priority = 1; cfg.members[0].priority = 2; rs.reconfig(cfg);",
+ 27019, "reconnected to server")
+ self.wait_master(27018)
+
+ # Expect the connection to be closed
+ def query_err_callback(response, error):
+ tornado.ioloop.IOLoop.instance().stop()
+ logging.info(response)
+ logging.info(error)
+ assert isinstance(error, Exception)
+
+ db.test_stats.find_one({"_id" : TEST_TIMESTAMP}, callback=query_err_callback)
+ tornado.ioloop.IOLoop.instance().start()
+
+ # Retrieve the updated value again, from the new master
+ def query_again_callback(response, error):
+ tornado.ioloop.IOLoop.instance().stop()
+ logging.info(response)
+ logging.info(error)
+ assert error is None
+ assert isinstance(response, dict)
+ assert response['_id'] == TEST_TIMESTAMP
+ assert response['test_count'] == 1
+ test_shunt.register_called('retrieved_again')
+
+ db.test_stats.find_one({"_id" : TEST_TIMESTAMP}, callback=query_again_callback)
+ tornado.ioloop.IOLoop.instance().start()
+ test_shunt.assert_called('retrieved_again')
+ except:
+ tornado.ioloop.IOLoop.instance().stop()
+ raise
View
6 test/test_shunt.py
@@ -46,7 +46,7 @@ def setUp(self):
# PuritanicalIOLoop instead of a default loop.
if not tornado.ioloop.IOLoop.initialized():
self.loop = PuritanicalIOLoop()
- self.loop.install()
+ tornado.ioloop.IOLoop._instance = self.loop
else:
self.loop = tornado.ioloop.IOLoop.instance()
self.assert_(
@@ -61,8 +61,8 @@ def setUp(self):
os.makedirs(dirname)
self.temp_dirs.append(dirname)
- options = ['mongod', '--bind_ip', '127.0.0.1', '--oplogSize', '10',
- '--dbpath', dirname, '--smallfiles', '-v', '--nojournal'] + list(options)
+ options = ['mongod', '--oplogSize', '2', '--dbpath', dirname,
+ '--smallfiles', '-v', '--nojournal'] + list(options)
logging.debug(options)
pipe = subprocess.Popen(options)
self.mongods.append(pipe)
Something went wrong with that request. Please try again.