Skip to content


Subversion checkout URL

You can clone with
Download ZIP
Fetching contributors…
Cannot retrieve contributors at this time
51 lines (38 sloc) 1.59 KB
import tornado.ioloop
import logging
import time
import test_shunt
import asyncmongo
import pymongo
class QueryTest(test_shunt.MongoTest):
mongod_options = [('--port', '27017')]
def pymongo_conn(self):
if not hasattr(self, '_pymongo_conn'):
self._pymongo_conn = pymongo.Connection(port=int(self.mongod_options[0][1]))
return self._pymongo_conn
def get_open_cursors(self):
output = self.pymongo_conn.admin.command('serverStatus')
return output.get('cursors', {}).get('totalOpen')
def setUp(self):
super(QueryTest, self).setUp()[{'i': i} for i in xrange(200)])
def test_query(self):
db = asyncmongo.Client(pool_id='test_query', host='', port=int(self.mongod_options[0][1]), dbname='test', mincached=3)
def noop_callback(response, error):
loop = tornado.ioloop.IOLoop.instance()
# delay the stop so kill cursor has time on the ioloop to get pushed through to mongo
loop.add_timeout(time.time() + .1, loop.stop)
before = self.get_open_cursors()
# run 2 queries{}, callback=noop_callback)
tornado.ioloop.IOLoop.instance().start(){}, callback=noop_callback)
# check cursors
after = self.get_open_cursors()
assert before == after, "%d cursors left open (should be 0)" % (after - before)
if __name__ == '__main__':
import unittest
Jump to Line
Something went wrong with that request. Please try again.