Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

github70: multiple replication targets

  • Loading branch information...
commit 53d2c0c2e234b2761c2e30cb94830fc6d29f4483 1 parent 2ecf3b6
@llvtt llvtt authored
View
184 mongo_connector/connector.py
@@ -29,6 +29,7 @@
from mongo_connector import constants, errors, util
from mongo_connector.locking_dict import LockingDict
from mongo_connector.oplog_manager import OplogThread
+from mongo_connector.doc_managers import doc_manager_simulator as simulator
try:
from pymongo import MongoClient as Connection
@@ -45,11 +46,39 @@ def __init__(self, address, oplog_checkpoint, target_url, ns_set,
fields=None, dest_mapping={},
auto_commit_interval=constants.DEFAULT_COMMIT_INTERVAL):
+ if target_url and not doc_manager:
+ raise errors.ConnectorError("Cannot create a Connector with a "
+ "target URL but no doc manager!")
+
+ def is_string(s):
+ try:
+ return isinstance(s, basestring)
+ except NameError:
+ return isinstance(s, str)
+
+ def load_doc_manager(path):
+ name, _ = os.path.splitext(os.path.basename(path))
+ try:
+ import importlib.machinery
+ loader = importlib.machinery.SourceFileLoader(name, path)
+ module = loader.load_module(name)
+ except ImportError:
+ import imp
+ module = imp.load_source(name, path)
+ return module
+
+ doc_manager_modules = None
+
if doc_manager is not None:
- doc_manager = imp.load_source('DocManager', doc_manager)
- else:
- from mongo_connector.doc_manager import DocManager
- time.sleep(1)
+ # backwards compatilibity: doc_manager may be a string
+ if is_string(doc_manager):
+ doc_manager_modules = [load_doc_manager(doc_manager)]
+ # doc_manager is a list
+ else:
+ doc_manager_modules = []
+ for dm in doc_manager:
+ doc_manager_modules.append(load_doc_manager(dm))
+
super(Connector, self).__init__()
#can_run is set to false when we join the thread
@@ -61,8 +90,13 @@ def __init__(self, address, oplog_checkpoint, target_url, ns_set,
#main address - either mongos for sharded setups or a primary otherwise
self.address = address
- #The URL of the target system
- self.target_url = target_url
+ #The URLs of each target system, respectively
+ if is_string(target_url):
+ self.target_urls = [target_url]
+ elif target_url:
+ self.target_urls = list(target_url)
+ else:
+ self.target_urls = None
#The set of relevant namespaces to consider
self.ns_set = ns_set
@@ -97,30 +131,36 @@ def __init__(self, address, oplog_checkpoint, target_url, ns_set,
self.fields = fields
try:
- if target_url is None:
- if doc_manager is None: # imported using from... import
- self.doc_manager = DocManager(unique_key=u_key)
- else: # imported using load source
- self.doc_manager = doc_manager.DocManager(
- unique_key=u_key,
- namespace_set=ns_set,
- auto_commit_interval=auto_commit_interval
- )
+ docman_kwargs = {"unique_key": u_key,
+ "namespace_set": ns_set,
+ "auto_commit_interval": auto_commit_interval}
+
+ # No doc managers specified, using simulator
+ if doc_manager is None:
+ self.doc_managers = [simulator.DocManager(**docman_kwargs)]
else:
- if doc_manager is None:
- self.doc_manager = DocManager(
- self.target_url,
- unique_key=u_key,
- namespace_set=ns_set,
- auto_commit_interval=auto_commit_interval
- )
- else:
- self.doc_manager = doc_manager.DocManager(
- self.target_url,
- unique_key=u_key,
- namespace_set=ns_set,
- auto_commit_interval=auto_commit_interval
- )
+ self.doc_managers = []
+ for i, d in enumerate(doc_manager_modules):
+ # self.target_urls may be shorter than
+ # self.doc_managers, or left as None
+ if self.target_urls and i < len(self.target_urls):
+ target_url = self.target_urls[i]
+ else:
+ target_url = None
+
+ if target_url:
+ self.doc_managers.append(
+ d.DocManager(self.target_urls[i],
+ **docman_kwargs))
+ else:
+ self.doc_managers.append(
+ d.DocManager(**docman_kwargs))
+ # If more target URLs were given than doc managers, may need
+ # to create additional doc managers
+ for url in self.target_urls[i+1:]:
+ self.doc_managers.append(
+ doc_manager_modules[-1].DocManager(url,
+ **docman_kwargs))
except errors.ConnectionFailed:
err_msg = "MongoConnector: Could not connect to target system"
logging.critical(err_msg)
@@ -152,7 +192,8 @@ def join(self):
""" Joins thread, stops it from running
"""
self.can_run = False
- self.doc_manager.stop()
+ for dm in self.doc_managers:
+ dm.stop()
threading.Thread.join(self)
def write_oplog_progress(self):
@@ -257,7 +298,7 @@ def run(self):
main_address=(main_conn.host + ":" + str(main_conn.port)),
oplog_coll=oplog_coll,
is_sharded=False,
- doc_manager=self.doc_manager,
+ doc_manager=self.doc_managers,
oplog_progress_dict=self.oplog_progress,
namespace_set=self.ns_set,
auth_key=self.auth_key,
@@ -279,7 +320,8 @@ def run(self):
" %s unexpectedly stopped! Shutting down" %
(str(self.shard_set[0])))
self.oplog_thread_join()
- self.doc_manager.stop()
+ for dm in self.doc_managers:
+ dm.stop()
return
self.write_oplog_progress()
@@ -297,7 +339,8 @@ def run(self):
"down" %
(str(self.shard_set[shard_id])))
self.oplog_thread_join()
- self.doc_manager.stop()
+ for dm in self.doc_managers:
+ dm.stop()
return
self.write_oplog_progress()
@@ -309,10 +352,11 @@ def run(self):
cause = "The system only uses replica sets!"
logging.error("MongoConnector: %s", cause)
self.oplog_thread_join()
- self.doc_manager.stop()
+ for dm in self.doc_managers():
+ dm.stop()
return
- shard_conn = Connection(hosts, replicaset=repl_set)
+ shard_conn = Connection(hosts, replicaSet=repl_set)
oplog_coll = shard_conn['local']['oplog.rs']
oplog = OplogThread(
@@ -320,7 +364,7 @@ def run(self):
main_address=self.address,
oplog_coll=oplog_coll,
is_sharded=True,
- doc_manager=self.doc_manager,
+ doc_manager=self.doc_managers,
oplog_progress_dict=self.oplog_progress,
namespace_set=self.ns_set,
auth_key=self.auth_key,
@@ -400,15 +444,22 @@ def main():
"of falling behind the earliest timestamp in the oplog")
#-t is to specify the URL to the target system being used.
- parser.add_option("-t", "--target-url", action="store", type="string",
- dest="url", default=None,
- help="""Specify the URL to the target system being """
+ parser.add_option("-t", "--target-url", "--target-urls", action="store",
+ type="string", dest="urls", default=None, help=
+ """Specify the URL to each target system being """
"""used. For example, if you were using Solr out of """
- """the box, you could use '-t"""
- """ http://localhost:8080/solr' with the """
- """ SolrDocManager to establish a proper connection."""
- """ Don't use quotes around address."""
- """If target system doesn't need URL, don't specify""")
+ """the box, you could use '-t """
+ """http://localhost:8080/solr' with the """
+ """SolrDocManager to establish a proper connection. """
+ """URLs should be specified in the same order as """
+ """their respective doc managers in the """
+ """--doc-managers option. URLs are assigned to doc """
+ """managers respectively. Additional doc managers """
+ """are implied to have no target URL. Additional """
+ """URLs are implied to have the same doc manager """
+ """type as the last doc manager for which a URL was """
+ """specified. """
+ """Don't use quotes around addresses. """)
#-n is to specify the namespaces we want to consider. The default
#considers all the namespaces
@@ -461,18 +512,24 @@ def main():
"""The default username is '__system'""")
#-d is to specify the doc manager file.
- parser.add_option("-d", "--docManager", action="store", type="string",
- dest="doc_manager", default=None, help=
- """Used to specify the doc manager file that"""
- """ is going to be used. You should send the"""
- """ path of the file you want to be used."""
- """ By default, it will use the """
- """ doc_manager_simulator.py file. It is"""
- """ recommended that all doc manager files be"""
- """ kept in the doc_managers folder in"""
- """ mongo-connector. For more information"""
- """ about making your own doc manager,"""
- """ see Doc Manager section.""")
+ parser.add_option("-d", "--docManager", "--doc-managers", action="store",
+ type="string", dest="doc_managers", default=None, help=
+ """Used to specify the path to each doc manager """
+ """file that will be used. DocManagers should be """
+ """specified in the same order as their respective """
+ """target addresses in the --target-urls option. """
+ """URLs are assigned to doc managers """
+ """respectively. Additional doc managers are """
+ """implied to have no target URL. Additional URLs """
+ """are implied to have the same doc manager type as """
+ """the last doc manager for which a URL was """
+ """specified. By default, Mongo Connector will use """
+ """'doc_manager_simulator.py'. It is recommended """
+ """that all doc manager files be kept in the """
+ """doc_managers folder in mongo-connector. For """
+ """more information about making your own doc """
+ """manager, see 'Writing Your Own DocManager' """
+ """section of the wiki""")
#-g is the destination namespace
parser.add_option("-g", "--dest-namespace-set", action="store",
@@ -548,8 +605,15 @@ def main():
logger.info('Beginning Mongo Connector')
- if options.doc_manager is None:
- logger.info('No doc manager specified, using simulator.')
+ # Get DocManagers and target URLs
+ # Each DocManager is assigned the respective (same-index) target URL
+ # Additional DocManagers may be specified that take no target URL
+ doc_managers = options.doc_managers
+ doc_managers = doc_managers.split(",") if doc_managers else doc_managers
+ target_urls = options.urls.split(",") if options.urls else None
+
+ if options.doc_managers is None:
+ logger.info('No doc managers specified, using simulator.')
if options.ns_set is None:
ns_set = []
@@ -600,11 +664,11 @@ def main():
connector = Connector(
address=options.main_addr,
oplog_checkpoint=options.oplog_config,
- target_url=options.url,
+ target_url=target_urls,
ns_set=ns_set,
u_key=options.u_key,
auth_key=key,
- doc_manager=options.doc_manager,
+ doc_manager=doc_managers,
auth_username=options.admin_name,
collection_dump=(not options.no_dump),
batch_size=options.batch_size,
View
1  mongo_connector/doc_managers/doc_manager_simulator.py
@@ -39,6 +39,7 @@ def __init__(self, url=None, unique_key='_id', **kwargs):
"""
self.unique_key = unique_key
self.doc_dict = {}
+ self.url = url
def stop(self):
"""Stops any running threads in the DocManager.
View
11 mongo_connector/errors.py
@@ -1,4 +1,4 @@
-# Copyright 2013 MongoDB, Inc.
+# Copyright 2013-2014 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,14 +19,23 @@ class MongoConnectorError(Exception):
"""Base class for all exceptions in the mongo_connector package
"""
+
class ConnectionFailed(MongoConnectorError):
"""Raised when mongo-connector can't connect to target system
"""
+
class OperationFailed(MongoConnectorError):
"""Raised for failed commands on the destination database
"""
+
class EmptyDocsError(MongoConnectorError):
"""Raised on attempts to upsert empty sequences of documents
"""
+
+
+class ConnectorError(MongoConnectorError):
+ """Raised when creating a mongo_connector.Connector object with
+ nonsensical parameters
+ """
View
284 mongo_connector/oplog_manager.py
@@ -17,12 +17,18 @@
import bson
import logging
+try:
+ import Queue as queue
+except ImportError:
+ import queue
import pymongo
import sys
import time
import threading
+import traceback
from mongo_connector import errors, util
from mongo_connector.constants import DEFAULT_BATCH_SIZE
+from mongo_connector.util import retry_until_ok
try:
from pymongo import MongoClient as Connection
@@ -62,14 +68,12 @@ def __init__(self, primary_conn, main_address, oplog_coll, is_sharded,
#Boolean describing whether the cluster is sharded or not
self.is_sharded = is_sharded
- #The document manager for the target system.
- #This is the same for all threads.
- self.doc_manager = doc_manager
-
- # Determine if the doc manager supports bulk upserts
- # If so, we can be more efficient with the way we pass along
- # updates to the doc manager.
- self.can_bulk = hasattr(self.doc_manager, "bulk_upsert")
+ #A document manager for each target system.
+ #These are the same for all threads.
+ if type(doc_manager) == list:
+ self.doc_managers = doc_manager
+ else:
+ self.doc_managers = [doc_manager]
#Boolean describing whether or not the thread is running.
self.running = True
@@ -157,24 +161,33 @@ def run(self):
ns = self.dest_mapping.get(entry['ns'], ns)
#delete
- if operation == 'd':
- entry['_id'] = entry['o']['_id']
- self.doc_manager.remove(entry)
- #insert/update. They are equal because of lack
- #of support for partial update
- elif operation == 'i' or operation == 'u':
- doc = self.retrieve_doc(entry)
- if doc is not None:
- doc['_ts'] = util.bson_ts_to_long(entry['ts'])
- doc['ns'] = ns
- try:
- self.doc_manager.upsert(
- self.filter_fields(doc)
- )
- except errors.OperationFailed:
- logging.error(
- "Unable to insert %s" % (doc)
- )
+ try:
+ if operation == 'd':
+ entry['_id'] = entry['o']['_id']
+ for dm in self.doc_managers:
+ dm.remove(entry)
+ #insert/update. They are equal because of lack
+ #of support for partial update
+ elif operation == 'i' or operation == 'u':
+ doc = self.retrieve_doc(entry)
+ if doc is not None:
+ doc['_ts'] = util.bson_ts_to_long(
+ entry['ts'])
+ doc['ns'] = ns
+ for dm in self.doc_managers:
+ dm.upsert(self.filter_fields(doc))
+ except errors.OperationFailed:
+ logging.error(
+ "Unable to %s doc with id %s" % (
+ "delete" if operation == "d" else "upsert",
+ str(entry['_id'])
+ ))
+ except errors.ConnectionFailed:
+ logging.error(
+ "Network error while trying to %s %s" % (
+ "delete" if operation == "d" else "upsert",
+ str(entry['_id'])
+ ))
last_ts = entry['ts']
@@ -320,27 +333,88 @@ def docs_to_dump():
for namespace in dump_set:
logging.info("dumping collection %s" % namespace)
database, coll = namespace.split('.', 1)
- target_coll = self.main_connection[database][coll]
- cursor = util.retry_until_ok(target_coll.find)
- for doc in cursor:
- if not self.running:
- raise StopIteration
- doc["ns"] = self.dest_mapping.get(namespace, namespace)
- doc["_ts"] = long_ts
- yield doc
-
- try:
- # Bulk upsert if possible
- if self.can_bulk:
- self.doc_manager.bulk_upsert(docs_to_dump())
- else:
- for doc in docs_to_dump():
+ last_id = None
+ attempts = 0
+
+ # Loop to handle possible AutoReconnect
+ while attempts < 60:
+ target_coll = self.main_connection[database][coll]
+ if not last_id:
+ cursor = util.retry_until_ok(
+ target_coll.find,
+ sort=[("_id", pymongo.ASCENDING)]
+ )
+ else:
+ cursor = util.retry_until_ok(
+ target_coll.find,
+ {"_id": {"$gt": last_id}},
+ sort=[("_id", pymongo.ASCENDING)]
+ )
try:
- self.doc_manager.upsert(self.filter_fields(doc))
- except errors.OperationFailed:
- logging.error("Unable to insert %s" % doc)
- except (pymongo.errors.AutoReconnect,
- pymongo.errors.OperationFailure):
+ for doc in cursor:
+ if not self.running:
+ raise StopIteration
+ doc["ns"] = self.dest_mapping.get(
+ namespace, namespace)
+ doc["_ts"] = long_ts
+ last_id = doc["_id"]
+ yield doc
+ break
+ except pymongo.errors.AutoReconnect:
+ attempts += 1
+ time.sleep(1)
+
+ # Extra threads (if any) that assist with collection dumps
+ dumping_threads = []
+ # Did the dump succeed for all target systems?
+ dump_success = True
+ # Holds any exceptions we can't recover from
+ errors = queue.Queue()
+ try:
+ for dm in self.doc_managers:
+ # Bulk upsert if possible
+ if hasattr(dm, "bulk_upsert"):
+ # Slight performance gain breaking dump into separate
+ # threads, only if > 1 replication target
+ if len(self.doc_managers) == 1:
+ dm.bulk_upsert(docs_to_dump())
+ else:
+ def do_dump(error_queue):
+ all_docs = docs_to_dump()
+ try:
+ dm.bulk_upsert(all_docs)
+ except Exception:
+ # Likely exceptions:
+ # pymongo.errors.OperationFailure,
+ # mongo_connector.errors.ConnectionFailed
+ # mongo_connector.errors.OperationFailed
+ error_queue.put(sys.exc_info())
+
+ t = threading.Thread(target=do_dump, args=(errors,))
+ dumping_threads.append(t)
+ t.start()
+ else:
+ for doc in docs_to_dump():
+ dm.upsert(self.filter_fields(doc))
+
+ # cleanup
+ for t in dumping_threads:
+ t.join()
+
+ except Exception:
+ # See "likely exceptions" comment above
+ errors.put(sys.exc_info())
+
+ # Print caught exceptions
+ try:
+ while True:
+ klass, value, trace = errors.get_nowait()
+ dump_success = False
+ traceback.print_exception(klass, value, trace)
+ except queue.Empty:
+ pass
+
+ if not dump_success:
err_msg = "OplogManager: Failed during dump collection"
effect = "cannot recover!"
logging.error('%s %s %s' % (err_msg, effect, self.oplog))
@@ -411,77 +485,91 @@ def rollback(self):
timestamp. This defines the rollback window and we just roll these
back until the oplog and target system are in consistent states.
"""
- self.doc_manager.commit()
- last_inserted_doc = self.doc_manager.get_last_doc()
+ # Find the most recently inserted document in each target system
+ last_docs = []
+ for dm in self.doc_managers:
+ dm.commit()
+ last_docs.append(dm.get_last_doc())
+
+ # Of these documents, which is the most recent?
+ last_inserted_doc = max(last_docs,
+ key=lambda x: x["_ts"] if x else float("-inf"))
+ # Nothing has been replicated. No need to rollback target systems
if last_inserted_doc is None:
return None
+ # Find the oplog entry that touched the most recent document.
+ # We'll use this to figure where to pick up the oplog later.
target_ts = util.long_to_bson_ts(last_inserted_doc['_ts'])
last_oplog_entry = self.oplog.find_one(
{'ts': {'$lte': target_ts}},
sort=[('$natural', pymongo.DESCENDING)]
)
+
+ # The oplog entry for the most recent document doesn't exist anymore.
+ # If we've fallen behind in the oplog, this will be caught later
if last_oplog_entry is None:
return None
+ # rollback_cutoff_ts happened *before* the rollback
rollback_cutoff_ts = last_oplog_entry['ts']
start_ts = util.bson_ts_to_long(rollback_cutoff_ts)
+ # timestamp of the most recent document on any target system
end_ts = last_inserted_doc['_ts']
- rollback_set = {} # this is a dictionary of ns:list of docs
- for doc in self.doc_manager.search(start_ts, end_ts):
- if doc['ns'] in rollback_set:
- rollback_set[doc['ns']].append(doc)
- else:
- rollback_set[doc['ns']] = [doc]
-
- for namespace, doc_list in rollback_set.items():
- # Get the original namespace
- original_namespace = namespace
- for source_name, dest_name in self.dest_mapping.items():
- if dest_name == namespace:
- original_namespace = source_name
-
- database, coll = original_namespace.split('.', 1)
- obj_id = bson.objectid.ObjectId
- bson_obj_id_list = [obj_id(doc['_id']) for doc in doc_list]
-
- to_update = util.retry_until_ok(
- self.main_connection[database][coll].find,
- {'_id': {'$in': bson_obj_id_list}})
- #doc list are docs in target system, to_update are docs in mongo
- doc_hash = {} # hash by _id
- for doc in doc_list:
- doc_hash[bson.objectid.ObjectId(doc['_id'])] = doc
-
- to_index = []
- count = 0
- while True:
- try:
+ for dm in self.doc_managers:
+ rollback_set = {} # this is a dictionary of ns:list of docs
+
+ # group potentially conflicted documents by namespace
+ for doc in dm.search(start_ts, end_ts):
+ if doc['ns'] in rollback_set:
+ rollback_set[doc['ns']].append(doc)
+ else:
+ rollback_set[doc['ns']] = [doc]
+
+ # retrieve these documents from MongoDB, either updating
+ # or removing them in each target system
+ for namespace, doc_list in rollback_set.items():
+ # Get the original namespace
+ original_namespace = namespace
+ for source_name, dest_name in self.dest_mapping.items():
+ if dest_name == namespace:
+ original_namespace = source_name
+
+ database, coll = original_namespace.split('.', 1)
+ obj_id = bson.objectid.ObjectId
+ bson_obj_id_list = [obj_id(doc['_id']) for doc in doc_list]
+
+ to_update = util.retry_until_ok(
+ self.main_connection[database][coll].find,
+ {'_id': {'$in': bson_obj_id_list}})
+ #doc list are docs in target system, to_update are
+ #docs in mongo
+ doc_hash = {} # hash by _id
+ for doc in doc_list:
+ doc_hash[bson.objectid.ObjectId(doc['_id'])] = doc
+
+ to_index = []
+
+ def collect_existing_docs():
for doc in to_update:
if doc['_id'] in doc_hash:
del doc_hash[doc['_id']]
to_index.append(doc)
- break
- except (pymongo.errors.OperationFailure,
- pymongo.errors.AutoReconnect):
- count += 1
- if count > 60:
- sys.exit(1)
- time.sleep(1)
-
- #delete the inconsistent documents
- for doc in doc_hash.values():
- self.doc_manager.remove(doc)
-
- #insert the ones from mongo
- for doc in to_index:
- doc['_ts'] = util.bson_ts_to_long(rollback_cutoff_ts)
- doc['ns'] = self.dest_mapping.get(namespace, namespace)
- try:
- self.doc_manager.upsert(self.filter_fields(doc))
- except errors.OperationFailed:
- logging.error("Unable to insert %s" % (doc))
+ retry_until_ok(collect_existing_docs)
+
+ #delete the inconsistent documents
+ for doc in doc_hash.values():
+ dm.remove(doc)
+
+ #insert the ones from mongo
+ for doc in to_index:
+ doc['_ts'] = util.bson_ts_to_long(rollback_cutoff_ts)
+ doc['ns'] = self.dest_mapping.get(namespace, namespace)
+ try:
+ dm.upsert(self.filter_fields(doc))
+ except errors.OperationFailed:
+ logging.error("Unable to insert %s" % (doc))
return rollback_cutoff_ts
View
2  tests/test_elastic.py
@@ -87,8 +87,6 @@ def tearDownClass(cls):
def tearDown(self):
""" Ends the connector
"""
- self.connector.doc_manager.auto_commit_interval = None
- time.sleep(2)
self.connector.join()
def setUp(self):
View
117 tests/test_mongo_connector.py
@@ -17,7 +17,6 @@
import os
import sys
-import inspect
import socket
sys.path[0:0] = [""]
@@ -32,6 +31,11 @@
from mongo_connector.connector import Connector
from tests.setup_cluster import start_cluster, kill_all
from bson.timestamp import Timestamp
+from mongo_connector import errors
+from mongo_connector.doc_managers import (
+ doc_manager_simulator,
+ elastic_doc_manager
+)
from mongo_connector.util import long_to_bson_ts
HOSTNAME = os.environ.get('HOSTNAME', socket.gethostname())
@@ -41,7 +45,7 @@
TEMP_CONFIG = os.environ.get('TEMP_CONFIG', "temp_config.txt")
-class MongoInternalTester(unittest.TestCase):
+class TestMongoConnector(unittest.TestCase):
""" Test Class for the Mongo Connector
"""
@@ -52,7 +56,7 @@ def runTest(self):
unittest.TestCase.__init__(self)
@classmethod
- def setUpClass(cls):
+ def setUpClass(cls):
""" Initializes the cluster
"""
@@ -177,5 +181,112 @@ def test_read_oplog_progress(self):
os.system('rm ' + TEMP_CONFIG)
+ def test_many_targets(self):
+ """Test that DocManagers are created and assigned to target URLs
+ correctly when instantiating a Connector object with multiple target
+ URLs
+ """
+
+ # no doc manager or target URLs
+ connector_kwargs = {
+ "address": MAIN_ADDR,
+ "oplog_checkpoint": None,
+ "ns_set": None,
+ "u_key": None,
+ "auth_key": None
+ }
+ c = Connector(target_url=None, **connector_kwargs)
+ self.assertEqual(len(c.doc_managers), 1)
+ self.assertIsInstance(c.doc_managers[0],
+ doc_manager_simulator.DocManager)
+
+ # N.B. This assumes we're in mongo-connector/tests
+ def get_docman(name):
+ return os.path.join(
+ os.path.dirname(os.path.abspath(__file__)),
+ os.pardir,
+ "mongo_connector",
+ "doc_managers",
+ "%s.py" % name
+ )
+
+ # only target URL provided
+ with self.assertRaises(errors.ConnectorError):
+ Connector(target_url="localhost:9200", **connector_kwargs)
+
+ # one doc manager taking a target URL, no URL provided
+ with self.assertRaises(TypeError):
+ c = Connector(doc_manager=get_docman("mongo_doc_manager"),
+ **connector_kwargs)
+
+ # 1:1 target URLs and doc managers
+ c = Connector(
+ doc_manager=[
+ get_docman("elastic_doc_manager"),
+ get_docman("doc_manager_simulator"),
+ get_docman("elastic_doc_manager")
+ ],
+ target_url=[
+ MAIN_ADDR,
+ "foobar",
+ "bazbaz"
+ ],
+ **connector_kwargs
+ )
+ self.assertEqual(len(c.doc_managers), 3)
+ # Connector uses doc manager filename as module name
+ self.assertEqual(c.doc_managers[0].__module__,
+ "elastic_doc_manager")
+ self.assertEqual(c.doc_managers[1].__module__,
+ "doc_manager_simulator")
+ self.assertEqual(c.doc_managers[2].__module__,
+ "elastic_doc_manager")
+
+ # more target URLs than doc managers
+ c = Connector(
+ doc_manager=[
+ get_docman("doc_manager_simulator")
+ ],
+ target_url=[
+ MAIN_ADDR,
+ "foobar",
+ "bazbaz"
+ ],
+ **connector_kwargs
+ )
+ self.assertEqual(len(c.doc_managers), 3)
+ self.assertEqual(c.doc_managers[0].__module__,
+ "doc_manager_simulator")
+ self.assertEqual(c.doc_managers[1].__module__,
+ "doc_manager_simulator")
+ self.assertEqual(c.doc_managers[2].__module__,
+ "doc_manager_simulator")
+ self.assertEqual(c.doc_managers[0].url, MAIN_ADDR)
+ self.assertEqual(c.doc_managers[1].url, "foobar")
+ self.assertEqual(c.doc_managers[2].url, "bazbaz")
+
+ # more doc managers than target URLs
+ c = Connector(
+ doc_manager=[
+ get_docman("elastic_doc_manager"),
+ get_docman("doc_manager_simulator"),
+ get_docman("doc_manager_simulator")
+ ],
+ target_url=[
+ MAIN_ADDR
+ ],
+ **connector_kwargs
+ )
+ self.assertEqual(len(c.doc_managers), 3)
+ self.assertEqual(c.doc_managers[0].__module__,
+ "elastic_doc_manager")
+ self.assertEqual(c.doc_managers[1].__module__,
+ "doc_manager_simulator")
+ self.assertEqual(c.doc_managers[2].__module__,
+ "doc_manager_simulator")
+ # extra doc managers should have None as target URL
+ self.assertEqual(c.doc_managers[1].url, None)
+ self.assertEqual(c.doc_managers[2].url, None)
+
if __name__ == '__main__':
unittest.main()
View
187 tests/test_oplog_manager.py
@@ -15,36 +15,26 @@
"""Test oplog manager methods
"""
+import time
import os
import sys
-import inspect
-import time
if sys.version_info[:2] == (2, 6):
import unittest2 as unittest
else:
import unittest
import re
-import socket
try:
from pymongo import MongoClient as Connection
except ImportError:
- from pymongo import Connection
-
-sys.path[0:0] = [""]
+ from pymongo import Connection
from mongo_connector.doc_managers.doc_manager_simulator import DocManager
from mongo_connector.locking_dict import LockingDict
-from tests.setup_cluster import (kill_mongo_proc,
- start_mongo_proc,
- start_cluster,
- kill_all)
+from mongo_connector.util import long_to_bson_ts
+from tests.setup_cluster import (start_cluster,
+ kill_all)
from tests.util import wait_for
-from pymongo.errors import OperationFailure
from mongo_connector.oplog_manager import OplogThread
-from mongo_connector.util import(long_to_bson_ts,
- bson_ts_to_long,
- retry_until_ok)
-from bson.objectid import ObjectId
PORTS_ONE = {"PRIMARY": "27117", "SECONDARY": "27118", "ARBITER": "27119",
"CONFIG": "27220", "MAIN": "27217"}
@@ -61,10 +51,6 @@ class TestOplogManager(unittest.TestCase):
"""Defines all the testing methods, as well as a method that sets up the
cluster
"""
- def runTest(self):
- """Runs all Tests
- """
- unittest.TestCase.__init__(self)
@classmethod
def setUpClass(cls):
@@ -268,22 +254,31 @@ def test_dump_collection(self):
"""
test_oplog, primary_conn, search_ts = self.get_oplog_thread()
- solr = DocManager()
- test_oplog.doc_manager = solr
#with documents
primary_conn['test']['test'].insert({'name': 'paulie'})
search_ts = test_oplog.get_last_oplog_timestamp()
test_oplog.dump_collection()
- test_oplog.doc_manager.commit()
- solr_results = solr._search()
+ doc_manager = test_oplog.doc_managers[0]
+ doc_manager.commit()
+ solr_results = doc_manager._search()
self.assertEqual(len(solr_results), 1)
solr_doc = solr_results[0]
self.assertEqual(long_to_bson_ts(solr_doc['_ts']), search_ts)
self.assertEqual(solr_doc['name'], 'paulie')
self.assertEqual(solr_doc['ns'], 'test.test')
+ # test multiple targets
+ doc_managers = [DocManager(), DocManager(), DocManager()]
+ test_oplog.doc_managers = doc_managers
+ primary_conn["test"]["test"].remove()
+ for i in range(1000):
+ primary_conn["test"]["test"].insert({"i": i})
+ test_oplog.dump_collection()
+ for dm in doc_managers:
+ self.assertEqual(len(dm._search()), 1000)
+
def test_init_cursor(self):
"""Test init_cursor in oplog_manager. Assertion failure if it
doesn't pass
@@ -321,9 +316,8 @@ def test_init_cursor(self):
primary_conn['test']['test'].insert(({"_id": i} for i in range(100)))
# test no-dump option
- docman = DocManager()
+ docman = test_oplog.doc_managers[0]
docman._delete()
- test_oplog.doc_manager = docman
test_oplog.collection_dump = False
test_oplog.oplog_progress = LockingDict()
# init_cursor has the side-effect of causing a collection dump
@@ -337,109 +331,9 @@ def test_init_cursor(self):
test_oplog.init_cursor()
self.assertEqual(len(docman._search()), 100)
- def test_rollback(self):
- """Test rollback in oplog_manager. Assertion failure if it doesn't pass
- We force a rollback by inserting a doc, killing the primary,
- inserting another doc, killing the new primary, and then restarting
- both.
- """
- os.system('rm config.txt; touch config.txt')
- test_oplog, primary_conn, mongos, solr = self.get_new_oplog()
-
- # add a namespace mapping to the oplog manager
- test_oplog.dest_mapping = {
- "test.test": "foo.bar"
- }
-
- if not start_cluster():
- self.fail('Cluster could not be started successfully!')
-
- solr = DocManager()
- test_oplog.doc_manager = solr
- solr._delete() # equivalent to solr.delete(q='*: *')
-
- mongos['test']['test'].remove({})
- mongos['test']['test'].insert(
- {'_id': ObjectId('4ff74db3f646462b38000001'),
- 'name': 'paulie'},
- safe=True
- )
- while (mongos['test']['test'].find().count() != 1):
- time.sleep(1)
- cutoff_ts = test_oplog.get_last_oplog_timestamp()
-
- first_doc = {'name': 'paulie', '_ts': bson_ts_to_long(cutoff_ts),
- 'ns': 'test.test',
- '_id': ObjectId('4ff74db3f646462b38000001')}
-
- #try kill one, try restarting
- kill_mongo_proc(primary_conn.host, PORTS_ONE['PRIMARY'])
-
- new_primary_conn = Connection(HOSTNAME, int(PORTS_ONE['SECONDARY']))
- admin = new_primary_conn['admin']
- while admin.command("isMaster")['ismaster'] is False:
- time.sleep(1)
- time.sleep(5)
- count = 0
- while True:
- try:
- mongos['test']['test'].insert({
- '_id': ObjectId('4ff74db3f646462b38000002'),
- 'name': 'paul'},
- safe=True)
- break
- except OperationFailure:
- count += 1
- if count > 60:
- self.fail('Call to insert doc failed too many times')
- time.sleep(1)
- continue
- while (mongos['test']['test'].find().count() != 2):
- time.sleep(1)
- kill_mongo_proc(primary_conn.host, PORTS_ONE['SECONDARY'])
- start_mongo_proc(PORTS_ONE['PRIMARY'], "demo-repl", "/replset1a",
- "/replset1a.log", None)
-
- #wait for master to be established
- while primary_conn['admin'].command("isMaster")['ismaster'] is False:
- time.sleep(1)
-
- start_mongo_proc(PORTS_ONE['SECONDARY'], "demo-repl", "/replset1b",
- "/replset1b.log", None)
-
- #wait for secondary to be established
- admin = new_primary_conn['admin']
- while admin.command("replSetGetStatus")['myState'] != 2:
- time.sleep(1)
- while retry_until_ok(mongos['test']['test'].find().count) != 1:
- time.sleep(1)
-
- self.assertEqual(str(new_primary_conn.port), PORTS_ONE['SECONDARY'])
- self.assertEqual(str(primary_conn.port), PORTS_ONE['PRIMARY'])
-
- last_ts = test_oplog.get_last_oplog_timestamp()
- second_doc = {'name': 'paul', '_ts': bson_ts_to_long(last_ts),
- 'ns': 'test.test',
- '_id': ObjectId('4ff74db3f646462b38000002')}
-
- test_oplog.doc_manager.upsert(first_doc)
- test_oplog.doc_manager.upsert(second_doc)
-
- test_oplog.rollback()
- test_oplog.doc_manager.commit()
- results = solr._search()
-
- self.assertEqual(len(results), 1)
- self.assertEqual(results[0]["ns"], "foo.bar")
-
- self.assertEqual(results[0]['name'], 'paulie')
- self.assertTrue(results[0]['_ts'] <= bson_ts_to_long(cutoff_ts))
-
- #test_oplog.join()
-
def test_filter_fields(self):
opman, _, _ = self.get_oplog_thread()
- docman = opman.doc_manager
+ docman = opman.doc_managers[0]
conn = opman.main_connection
include_fields = ["a", "b", "c"]
@@ -478,7 +372,7 @@ def test_namespace_mapping(self):
dest_mapping = {"test.test1": "test.test1_dest",
"test.test2": "test.test2_dest"}
test_oplog, primary_conn, oplog_coll = self.get_oplog_thread()
- docman = test_oplog.doc_manager
+ docman = test_oplog.doc_managers[0]
test_oplog.dest_mapping = dest_mapping
test_oplog.namespace_set = source_ns
# start replicating
@@ -523,7 +417,7 @@ def update_complete():
# cleanup
primary_conn[db][coll].remove()
- test_oplog.doc_manager._delete()
+ test_oplog.doc_managers[0]._delete()
# doc not in namespace set
for ns in phony_ns:
@@ -545,5 +439,42 @@ def update_complete():
# cleanup
test_oplog.join()
+ def test_many_targets(self):
+ """Test that one OplogThread is capable of replicating to more than
+ one target.
+ """
+
+ opman, primary_conn, oplog_coll = self.get_oplog_thread()
+ doc_managers = [DocManager(), DocManager(), DocManager()]
+ opman.doc_managers = doc_managers
+
+ # start replicating
+ opman.start()
+ primary_conn["test"]["test"].insert({
+ "name": "kermit",
+ "color": "green"
+ })
+ primary_conn["test"]["test"].insert({
+ "name": "elmo",
+ "color": "firetruck red"
+ })
+
+ self.assertTrue(
+ wait_for(lambda: sum(len(d._search()) for d in doc_managers) == 6),
+ "OplogThread should be able to replicate to multiple targets"
+ )
+
+ primary_conn["test"]["test"].remove({"name": "elmo"})
+
+ self.assertTrue(
+ wait_for(lambda: sum(len(d._search()) for d in doc_managers) == 3),
+ "OplogThread should be able to replicate to multiple targets"
+ )
+ for d in doc_managers:
+ self.assertEqual(d._search()[0]["name"], "kermit")
+
+ # cleanup
+ opman.join()
+
if __name__ == '__main__':
unittest.main()
View
18 tests/test_oplog_manager_sharded.py
@@ -264,7 +264,6 @@ def test_get_last_oplog_timestamp(self):
# test_oplog.stop()
-
def test_dump_collection(self):
"""Test dump_collection in oplog_manager.
@@ -272,23 +271,21 @@ def test_dump_collection(self):
"""
test_oplog, search_ts, solr, mongos = self.get_oplog_thread()
- solr = DocManager()
- test_oplog.doc_manager = solr
# with documents
safe_mongo_op(mongos['alpha']['foo'].insert, {'name': 'paulie'})
search_ts = test_oplog.get_last_oplog_timestamp()
test_oplog.dump_collection()
- test_oplog.doc_manager.commit()
- solr_results = solr._search()
+ docman = test_oplog.doc_managers[0]
+ docman.commit()
+ solr_results = docman._search()
assert (len(solr_results) == 1)
solr_doc = solr_results[0]
assert (long_to_bson_ts(solr_doc['_ts']) == search_ts)
assert (solr_doc['name'] == 'paulie')
assert (solr_doc['ns'] == 'alpha.foo')
-
def test_init_cursor(self):
"""Test init_cursor in oplog_manager.
@@ -335,8 +332,7 @@ def test_rollback(self):
test_oplog, primary_conn, solr, mongos = self.get_new_oplog()
- solr = DocManager()
- test_oplog.doc_manager = solr
+ solr = test_oplog.doc_managers[0]
solr._delete() # equivalent to solr.delete(q='*:*')
safe_mongo_op(mongos['alpha']['foo'].remove, {})
@@ -397,10 +393,10 @@ def test_rollback(self):
second_doc = {'name': 'paul', '_ts': bson_ts_to_long(last_ts),
'ns': 'alpha.foo', '_id': obj2}
- test_oplog.doc_manager.upsert(first_doc)
- test_oplog.doc_manager.upsert(second_doc)
+ solr.upsert(first_doc)
+ solr.upsert(second_doc)
test_oplog.rollback()
- test_oplog.doc_manager.commit()
+ solr.commit()
results = solr._search()
self.assertEqual(len(results), 1)
View
234 tests/test_rollbacks.py
@@ -0,0 +1,234 @@
+"""Test Mongo Connector's behavior when its source MongoDB system is
+experiencing a rollback.
+
+"""
+
+import os
+import sys
+if sys.version_info[:2] == (2, 6):
+ import unittest2 as unittest
+else:
+ import unittest
+import time
+
+from pymongo.read_preferences import ReadPreference
+try:
+ from pymongo import MongoClient as Connection
+except ImportError:
+ from pymongo import Connection
+
+from mongo_connector.util import retry_until_ok
+from mongo_connector.locking_dict import LockingDict
+from mongo_connector.doc_managers.doc_manager_simulator import DocManager
+from mongo_connector.oplog_manager import OplogThread
+from tests.util import wait_for
+from tests.setup_cluster import (
+ start_cluster,
+ kill_all,
+ kill_mongo_proc,
+ start_mongo_proc,
+ PORTS_ONE
+)
+
+
+class TestRollbacks(unittest.TestCase):
+
+ def tearDown(self):
+ kill_all()
+
+ def setUp(self):
+ # Create a new oplog progress file
+ try:
+ os.unlink("config.txt")
+ except OSError:
+ pass
+ open("config.txt", "w").close()
+
+ # Start a replica set
+ start_cluster(sharded=False, use_mongos=False)
+ # Connection to the replica set as a whole
+ self.main_conn = Connection("localhost:%s" % PORTS_ONE["PRIMARY"],
+ replicaSet="demo-repl")
+ # Connection to the primary specifically
+ self.primary_conn = Connection("localhost:%s" % PORTS_ONE["PRIMARY"])
+ # Connection to the secondary specifically
+ self.secondary_conn = Connection(
+ "localhost:%s" % PORTS_ONE["SECONDARY"],
+ read_preference=ReadPreference.SECONDARY_PREFERRED
+ )
+
+ # Wipe any test data
+ self.main_conn["test"]["mc"].drop()
+
+ # Oplog thread
+ doc_manager = DocManager()
+ oplog_progress = LockingDict()
+ self.opman = OplogThread(
+ primary_conn=self.main_conn,
+ main_address="localhost:%s" % PORTS_ONE["PRIMARY"],
+ oplog_coll=self.main_conn["local"]["oplog.rs"],
+ is_sharded=False,
+ doc_manager=doc_manager,
+ oplog_progress_dict=oplog_progress,
+ namespace_set=["test.mc"],
+ auth_key=None,
+ auth_username=None,
+ repl_set="demo-repl"
+ )
+
+ def test_single_target(self):
+ """Test with a single replication target"""
+
+ self.opman.start()
+
+ # Insert first document with primary up
+ self.main_conn["test"]["mc"].insert({"i": 0})
+ self.assertEqual(self.primary_conn["test"]["mc"].find().count(), 1)
+
+ # Make sure the insert is replicated
+ secondary = self.secondary_conn
+ self.assertTrue(wait_for(lambda: secondary["test"]["mc"].count() == 1),
+ "first write didn't replicate to secondary")
+
+ # Kill the primary
+ kill_mongo_proc("localhost", PORTS_ONE["PRIMARY"])
+
+ # Wait for the secondary to be promoted
+ while not secondary["admin"].command("isMaster")["ismaster"]:
+ time.sleep(1)
+
+ # Insert another document. This will be rolled back later
+ retry_until_ok(self.main_conn["test"]["mc"].insert, {"i": 1})
+ self.assertEqual(secondary["test"]["mc"].count(), 2)
+
+ # Wait for replication to doc manager
+ c = lambda: len(self.opman.doc_managers[0]._search()) == 2
+ self.assertTrue(wait_for(c),
+ "not all writes were replicated to doc manager")
+
+ # Kill the new primary
+ kill_mongo_proc("localhost", PORTS_ONE["SECONDARY"])
+
+ # Start both servers back up
+ start_mongo_proc(
+ port=PORTS_ONE['PRIMARY'],
+ repl_set_name="demo-repl",
+ data="/replset1a",
+ log="/replset1a.log",
+ key_file=None
+ )
+ primary_admin = self.primary_conn["admin"]
+ while not primary_admin.command("isMaster")["ismaster"]:
+ time.sleep(1)
+ start_mongo_proc(
+ port=PORTS_ONE['SECONDARY'],
+ repl_set_name="demo-repl",
+ data="/replset1b",
+ log="/replset1b.log",
+ key_file=None
+ )
+ while secondary["admin"].command("replSetGetStatus")["myState"] != 2:
+ time.sleep(1)
+ while retry_until_ok(self.main_conn["test"]["mc"].find().count) == 0:
+ time.sleep(1)
+
+ # Only first document should exist in MongoDB
+ self.assertEqual(self.main_conn["test"]["mc"].count(), 1)
+ self.assertEqual(self.main_conn["test"]["mc"].find_one()["i"], 0)
+
+ # Same case should hold for the doc manager
+ doc_manager = self.opman.doc_managers[0]
+ self.assertEqual(len(doc_manager._search()), 1)
+ self.assertEqual(doc_manager._search()[0]["i"], 0)
+
+ # cleanup
+ self.opman.join()
+
+ def test_many_targets(self):
+ """Test with several replication targets"""
+
+ # OplogThread has multiple doc managers
+ doc_managers = [DocManager(), DocManager(), DocManager()]
+ self.opman.doc_managers = doc_managers
+
+ self.opman.start()
+
+ # Insert a document into each namespace
+ self.main_conn["test"]["mc"].insert({"i": 0})
+ self.assertEqual(self.primary_conn["test"]["mc"].count(), 1)
+
+ # Make sure the insert is replicated
+ secondary = self.secondary_conn
+ self.assertTrue(wait_for(lambda: secondary["test"]["mc"].count() == 1),
+ "first write didn't replicate to secondary")
+
+ # Kill the primary
+ kill_mongo_proc("localhost", PORTS_ONE["PRIMARY"])
+
+ # Wait for the secondary to be promoted
+ while not secondary["admin"].command("isMaster")["ismaster"]:
+ time.sleep(1)
+
+ # Insert more documents. This will be rolled back later
+ # Some of these documents will be manually removed from
+ # certain doc managers, to emulate the effect of certain
+ # target systems being ahead/behind others
+ secondary_ids = []
+ for i in range(1, 10):
+ secondary_ids.append(
+ retry_until_ok(self.main_conn["test"]["mc"].insert,
+ {"i": i}))
+ self.assertEqual(self.secondary_conn["test"]["mc"].count(), 10)
+
+ # Wait for replication to the doc managers
+ def docmans_done():
+ for dm in self.opman.doc_managers:
+ if len(dm._search()) != 10:
+ return False
+ return True
+ self.assertTrue(wait_for(docmans_done),
+ "not all writes were replicated to doc managers")
+
+ # Remove some documents from the doc managers to simulate
+ # uneven replication
+ for id in secondary_ids[8:]:
+ self.opman.doc_managers[1].remove({"_id": id})
+ for id in secondary_ids[2:]:
+ self.opman.doc_managers[2].remove({"_id": id})
+
+ # Kill the new primary
+ kill_mongo_proc("localhost", PORTS_ONE["SECONDARY"])
+
+ # Start both servers back up
+ start_mongo_proc(
+ port=PORTS_ONE['PRIMARY'],
+ repl_set_name="demo-repl",
+ data="/replset1a",
+ log="/replset1a.log",
+ key_file=None
+ )
+ primary_admin = self.primary_conn["admin"]
+ while not primary_admin.command("isMaster")["ismaster"]:
+ time.sleep(1)
+ start_mongo_proc(
+ port=PORTS_ONE['SECONDARY'],
+ repl_set_name="demo-repl",
+ data="/replset1b",
+ log="/replset1b.log",
+ key_file=None
+ )
+ while secondary["admin"].command("replSetGetStatus")["myState"] != 2:
+ time.sleep(1)
+ while retry_until_ok(self.primary_conn["test"]["mc"].find().count) == 0:
+ time.sleep(1)
+
+ # Only first document should exist in MongoDB
+ self.assertEqual(self.primary_conn["test"]["mc"].count(), 1)
+ self.assertEqual(self.primary_conn["test"]["mc"].find_one()["i"], 0)
+
+ # Same case should hold for the doc managers
+ for dm in self.opman.doc_managers:
+ self.assertEqual(len(dm._search()), 1)
+ self.assertEqual(dm._search()[0]["i"], 0)
+
+ self.opman.join()
View
16 tests/test_solr.py
@@ -118,8 +118,6 @@ def setUp(self):
time.sleep(1)
def tearDown(self):
- self.connector.doc_manager.auto_commit_interval = None
- time.sleep(2)
self.connector.join()
def test_shard_length(self):
@@ -314,16 +312,17 @@ def test_valid_fields(self):
self.conn['test']['test'].update({'_id' : inserted_obj},
{'$set':{'popularity' : 1 }})
+ docman = self.connector.doc_managers[0]
for _ in range(60):
- if len(self.connector.doc_manager._search("*:*")) != 0:
+ if len(docman._search("*:*")) != 0:
break
time.sleep(1)
else:
self.fail("Timeout when removing docs from Solr")
- result = self.connector.doc_manager.get_last_doc()
+ result = docman.get_last_doc()
self.assertIn('popularity', result)
- self.assertEqual(len(self.connector.doc_manager._search(
+ self.assertEqual(len(docman._search(
"name=test_valid")), 1)
def test_invalid_fields(self):
@@ -334,16 +333,17 @@ def test_invalid_fields(self):
self.conn['test']['test'].update({'_id' : inserted_obj},
{'$set':{'break_this_test' : 1 }})
+ docman = self.connector.doc_managers[0]
for _ in range(60):
- if len(self.connector.doc_manager._search("*:*")) != 0:
+ if len(docman._search("*:*")) != 0:
break
time.sleep(1)
else:
self.fail("Timeout when removing docs from Solr")
- result = self.connector.doc_manager.get_last_doc()
+ result = docman.get_last_doc()
self.assertNotIn('break_this_test', result)
- self.assertEqual(len(self.connector.doc_manager._search(
+ self.assertEqual(len(docman._search(
"name=test_invalid")), 1)
def test_dynamic_fields(self):
View
2  tests/test_synchronizer.py
@@ -78,7 +78,7 @@ def setUpClass(cls):
u_key='_id',
auth_key=None
)
- cls.synchronizer = cls.connector.doc_manager
+ cls.synchronizer = cls.connector.doc_managers[0]
timer.start()
cls.connector.start()
while len(cls.connector.shard_set) == 0:
Please sign in to comment.
Something went wrong with that request. Please try again.