diff --git a/Dockerfile b/Dockerfile index f55f6fe0..d1a6fc45 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,8 @@ FROM centos:centos7 MAINTAINER Tim Vaillancourt RUN yum install -y https://www.percona.com/redir/downloads/percona-release/redhat/latest/percona-release-0.1-4.noarch.rpm epel-release && \ - yum install -y Percona-Server-MongoDB-32-tools zbackup && yum clean all && \ - curl -Lo /usr/bin/mongodb-consistent-backup https://github.com/Percona-Lab/mongodb_consistent_backup/releases/download/1.0.1/mongodb-consistent-backup.el7.centos.x86_64 && \ + yum install -y Percona-Server-MongoDB-34-tools zbackup && yum clean all && \ + curl -Lo /usr/bin/mongodb-consistent-backup https://github.com/Percona-Lab/mongodb_consistent_backup/releases/download/1.0.2/mongodb-consistent-backup.el7.centos.x86_64 && \ chmod +x /usr/bin/mongodb-consistent-backup ENTRYPOINT ["mongodb-consistent-backup"] CMD ["--help"] diff --git a/README.rst b/README.rst index 6edd58f1..52d2d750 100644 --- a/README.rst +++ b/README.rst @@ -222,6 +222,7 @@ Links - https://www.percona.com/blog/2016/07/25/mongodb-consistent-backups/ - https://www.percona.com/blog/2017/01/09/mongodb-pit-backups-part-2/ +- https://www.percona.com/blog/2017/05/10/percona-lab-mongodb_consistent_backup-1-0-release-explained/ - https://docs.mongodb.com/manual/reference/program/mongodump/ - https://docs.mongodb.com/manual/reference/program/mongorestore/ - http://zbackup.org diff --git a/VERSION b/VERSION index 7dea76ed..6d7de6e6 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.1 +1.0.2 diff --git a/conf/mongodb-consistent-backup.example.conf b/conf/mongodb-consistent-backup.example.conf index fef12a24..24959f2c 100644 --- a/conf/mongodb-consistent-backup.example.conf +++ b/conf/mongodb-consistent-backup.example.conf @@ -10,9 +10,9 @@ production: name: default location: /var/lib/mongodb-consistent-backup # mongodump: - # binary: [path] (default: /usr/bin/mongodump) - # compression: [none|gzip] (default: true - if supported) - # threads: [1-16] (default: auto-generated - shards/cpu) + # binary: [path] (default: /usr/bin/mongodump) + # compression: [auto|none|gzip] (default: auto - enable gzip if supported) + # threads: [1-16] (default: auto-generated - shards/cpu) #replication: # max_lag_secs: [1+] (default: 5) # min_priority: [0-999] (default: 0) @@ -23,14 +23,14 @@ production: # wait_secs: [1+] (default: 300) # ping_secs: [1+] (default: 3) #oplog: - # compression: [none|gzip] (default: true - if used by backup stage) + # compression: [none|gzip] (default: gzip - if gzip is used by backup stage) # resolver_threads: [1+] (default: 2 per CPU) # tailer: # status_interval: 30 archive: method: tar # tar: - # compression: [none|gzip] (default: gzip, none if backup is compressed) + # compression: [none|gzip] (default: gzip - none if backup is already compressed) # threads: [1+] (default: 1 per CPU) # zbackup: # binary: [path] (default: /usr/bin/zbackup) diff --git a/mongodb_consistent_backup/Backup/Mongodump/Mongodump.py b/mongodb_consistent_backup/Backup/Mongodump/Mongodump.py index 23ec86e0..fc3c99c9 100644 --- a/mongodb_consistent_backup/Backup/Mongodump/Mongodump.py +++ b/mongodb_consistent_backup/Backup/Mongodump/Mongodump.py @@ -26,7 +26,7 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, shard self.replsets = replsets self.sharding = sharding - self.compression_supported = ['none', 'gzip'] + self.compression_supported = ['auto', 'none', 'gzip'] self.version = 'unknown' self.threads_max = 16 self.config_replset = False @@ -40,11 +40,15 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, shard with hide('running', 'warnings'), settings(warn_only=True): self.version = local("%s --version|awk 'NR >1 {exit}; /version/{print $NF}'" % self.binary, capture=True) + self.choose_compression() + + def choose_compression(self): if self.can_gzip(): - if self.compression() == 'none': + if self.compression() == 'auto': + logging.info("Mongodump binary supports gzip compression, auto-enabling gzip compression") self.compression('gzip') elif self.compression() == 'gzip': - logging.warning("mongodump gzip compression requested on binary that does not support gzip!") + raise OperationError("mongodump gzip compression requested on binary that does not support gzip!") def can_gzip(self): if os.path.isfile(self.binary) and os.access(self.binary, os.X_OK): @@ -121,8 +125,9 @@ def run(self): self.authdb, self.backup_dir, self.binary, + self.version, self.threads(), - self.do_gzip, + self.do_gzip(), self.verbose ) self.dump_threads.append(thread) @@ -152,8 +157,9 @@ def run(self): self.authdb, self.backup_dir, self.binary, + self.version, self.threads(), - self.do_gzip, + self.do_gzip(), self.verbose )] self.dump_threads[0].start() diff --git a/mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py b/mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py index 32d40ebc..d748be3a 100644 --- a/mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py +++ b/mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py @@ -14,24 +14,28 @@ # noinspection PyStringFormat class MongodumpThread(Process): - def __init__(self, state, uri, timer, user, password, authdb, base_dir, binary, + def __init__(self, state, uri, timer, user, password, authdb, base_dir, binary, version, threads=0, dump_gzip=False, verbose=False): Process.__init__(self) - self.state = state - self.uri = uri - self.timer = timer - self.user = user - self.password = password - self.authdb = authdb - self.base_dir = base_dir - self.binary = binary - self.threads = threads - self.dump_gzip = dump_gzip - self.verbose = verbose - - self.timer_name = "%s-%s" % (self.__class__.__name__, self.uri.replset) - self.exit_code = 1 - self._command = None + self.state = state + self.uri = uri + self.timer = timer + self.user = user + self.password = password + self.authdb = authdb + self.base_dir = base_dir + self.binary = binary + self.version = version + self.threads = threads + self.dump_gzip = dump_gzip + self.verbose = verbose + + self.timer_name = "%s-%s" % (self.__class__.__name__, self.uri.replset) + self.exit_code = 1 + self._command = None + self.do_stdin_passwd = False + self.stdin_passwd_sent = False + self.backup_dir = os.path.join(self.base_dir, self.uri.replset) self.dump_dir = os.path.join(self.backup_dir, "dump") self.oplog_file = os.path.join(self.dump_dir, "oplog.bson") @@ -60,6 +64,18 @@ def parse_mongodump_line(self, line): except: return None + def is_password_prompt(self, line): + if self.do_stdin_passwd and ("Enter Password:" in line or "reading password from standard input" in line): + return True + return False + + def handle_password_prompt(self): + if self.do_stdin_passwd and not self.stdin_passwd_sent: + logging.debug("Received password prompt from mongodump, writing password to stdin") + self._process.stdin.write(self.password + "\n") + self._process.stdin.flush() + self.stdin_passwd_sent = True + def wait(self): try: while self._process.stderr: @@ -68,7 +84,11 @@ def wait(self): for fd in poll[0]: read = self._process.stderr.readline() line = self.parse_mongodump_line(read) - if line: + if not line: + continue + elif self.is_password_prompt(read): + self.handle_password_prompt() + else: logging.info(line) if self._process.poll() != None: break @@ -85,11 +105,19 @@ def mongodump_cmd(self): mongodump_flags.extend(["--numParallelCollections="+str(self.threads)]) if self.dump_gzip: mongodump_flags.extend(["--gzip"]) + if tuple("3.4.0".split(".")) <= tuple(self.version.split(".")): + mongodump_flags.extend(["--readPreference=secondary"]) if self.authdb and self.authdb != "admin": logging.debug("Using database %s for authentication" % self.authdb) mongodump_flags.extend(["--authenticationDatabase", self.authdb]) if self.user and self.password: - mongodump_flags.extend(["-u", self.user, "-p", self.password]) + # >= 3.0.2 supports password input via stdin to mask from ps + if tuple("3.0.2".split(".")) <= tuple(self.version.split(".")): + mongodump_flags.extend(["-u", self.user, "-p", '""']) + self.do_stdin_passwd = True + else: + logging.warning("Mongodump is too old to set password securely! Upgrade to mongodump >= 3.2.0 to resolve this") + mongodump_flags.extend(["-u", self.user, "-p", self.password]) mongodump_cmd.extend(mongodump_flags) return mongodump_cmd @@ -106,7 +134,7 @@ def run(self): rmtree(self.dump_dir) os.makedirs(self.dump_dir) logging.debug("Running mongodump cmd: %s" % mongodump_cmd) - self._process = Popen(mongodump_cmd, stderr=PIPE) + self._process = Popen(mongodump_cmd, stdin=PIPE, stderr=PIPE) self.wait() self.exit_code = self._process.returncode if self.exit_code > 0: diff --git a/mongodb_consistent_backup/Backup/Mongodump/__init__.py b/mongodb_consistent_backup/Backup/Mongodump/__init__.py index d7cbeed5..ba960a08 100644 --- a/mongodb_consistent_backup/Backup/Mongodump/__init__.py +++ b/mongodb_consistent_backup/Backup/Mongodump/__init__.py @@ -5,8 +5,8 @@ def config(parser): parser.add_argument("--backup.mongodump.binary", dest="backup.mongodump.binary", help="Path to 'mongodump' binary (default: /usr/bin/mongodump)", default='/usr/bin/mongodump') parser.add_argument("--backup.mongodump.compression", dest="backup.mongodump.compression", - help="Compression method to use on backup (default: gzip)", default="gzip", - choices=["none", "gzip"]) + help="Compression method to use on backup (default: auto)", default="auto", + choices=["auto", "none", "gzip"]) parser.add_argument("--backup.mongodump.threads", dest="backup.mongodump.threads", help="Number of threads to use for each mongodump process. There is 1 x mongodump per shard, be careful! (default: shards/CPUs)", default=0, type=int) diff --git a/mongodb_consistent_backup/Backup/__init__.py b/mongodb_consistent_backup/Backup/__init__.py index b3a39603..27ba8660 100644 --- a/mongodb_consistent_backup/Backup/__init__.py +++ b/mongodb_consistent_backup/Backup/__init__.py @@ -1,3 +1,8 @@ from Backup import Backup +def config(parser): + parser.add_argument("-n", "--backup.name", dest="backup.name", help="Name of the backup set (required)", type=str) + parser.add_argument("-l", "--backup.location", dest="backup.location", help="Base path to store the backup data (required)", type=str) + parser.add_argument("-m", "--backup.method", dest="backup.method", help="Method to be used for backup (default: mongodump)", default='mongodump', choices=['mongodump']) + return parser diff --git a/mongodb_consistent_backup/Common/Config.py b/mongodb_consistent_backup/Common/Config.py index e92bad75..bac7e859 100644 --- a/mongodb_consistent_backup/Common/Config.py +++ b/mongodb_consistent_backup/Common/Config.py @@ -51,12 +51,9 @@ def makeParser(self): parser.add_argument("-v", "--verbose", dest="verbose", help="Verbose output", default=False, action="store_true") parser.add_argument("-H", "--host", dest="host", help="MongoDB Hostname, IP address or '/,,..' URI (default: localhost)", default="localhost", type=str) parser.add_argument("-P", "--port", dest="port", help="MongoDB Port (default: 27017)", default=27017, type=int) - parser.add_argument("-u", "--user", dest="user", help="MongoDB Authentication Username (for optional auth)", type=str) + parser.add_argument("-u", "--user", "--username", dest="username", help="MongoDB Authentication Username (for optional auth)", type=str) parser.add_argument("-p", "--password", dest="password", help="MongoDB Authentication Password (for optional auth)", type=str) parser.add_argument("-a", "--authdb", dest="authdb", help="MongoDB Auth Database (for optional auth - default: admin)", default='admin', type=str) - parser.add_argument("-n", "--backup.name", dest="backup.name", help="Name of the backup set (required)", type=str) - parser.add_argument("-l", "--backup.location", dest="backup.location", help="Base path to store the backup data (required)", type=str) - parser.add_argument("-m", "--backup.method", dest="backup.method", help="Method to be used for backup (default: mongodump)", default='mongodump', choices=['mongodump']) parser.add_argument("-L", "--log-dir", dest="log_dir", help="Path to write log files to (default: disabled)", default='', type=str) parser.add_argument("--lock-file", dest="lock_file", help="Location of lock file (default: /tmp/mongodb-consistent-backup.lock)", default='/tmp/mongodb-consistent-backup.lock', type=str) parser.add_argument("--sharding.balancer.wait_secs", dest="sharding.balancer.wait_secs", help="Maximum time to wait for balancer to stop, in seconds (default: 300)", default=300, type=int) diff --git a/mongodb_consistent_backup/Common/DB.py b/mongodb_consistent_backup/Common/DB.py index 71af001c..d6afabe9 100644 --- a/mongodb_consistent_backup/Common/DB.py +++ b/mongodb_consistent_backup/Common/DB.py @@ -45,7 +45,7 @@ def connect(self): conn['admin'].command({"ping":1}) except (ConnectionFailure, OperationFailure, ServerSelectionTimeoutError), e: logging.error("Unable to connect to %s! Error: %s" % (self.uri, e)) - raise OperationError(e) + raise DBConnectionError(e) if conn is not None: self._conn = conn return self._conn diff --git a/mongodb_consistent_backup/Logger.py b/mongodb_consistent_backup/Logger.py index e56789fc..84b29d13 100644 --- a/mongodb_consistent_backup/Logger.py +++ b/mongodb_consistent_backup/Logger.py @@ -16,10 +16,10 @@ def __init__(self, config, backup_time): self.do_file_log = False if self.config.log_dir is not '': - if os.path.isdir(self.config.log_dir): - self.do_file_log = True - else: - print("ERROR: Log directory: %s does not exist! Skipping file-based logging" % self.config.log_dir) + self.do_file_log = True + if not os.path.isdir(self.config.log_dir): + print "WARNING: Creating logging directory: %s" % self.config.log_dir + os.mkdir(self.config.log_dir) self.log_format = '[%(asctime)s] [%(levelname)s] [%(processName)s] [%(module)s:%(funcName)s:%(lineno)d] %(message)s' self.file_log = None @@ -41,7 +41,6 @@ def start_file_logger(self): self.file_log.setLevel(self.log_level) self.file_log.setFormatter(logging.Formatter(self.log_format)) logging.getLogger('').addHandler(self.file_log) - self.update_symlink() except OSError, e: logging.warning("Could not start file log handler, writing to stdout only") pass @@ -50,18 +49,21 @@ def close(self): if self.file_log: self.file_log.close() - def compress(self): + def compress(self, current=False): gz_log = None try: - if not os.path.isfile(self.last_log) or self.last_log == self.backup_log_file: - return - logging.info("Compressing previous log file") - gz_file = "%s.gz" % self.last_log + compress_file = self.backup_log_file + if not current: + compress_file = self.last_log + if not os.path.isfile(self.last_log) or self.last_log == self.backup_log_file: + return + logging.info("Compressing log file: %s" % compress_file) + gz_file = "%s.gz" % compress_file gz_log = GzipFile(gz_file, "w+") - with open(self.last_log) as f: + with open(compress_file) as f: for line in f: gz_log.write(line) - os.remove(self.last_log) + os.remove(compress_file) finally: if gz_log: gz_log.close() diff --git a/mongodb_consistent_backup/Main.py b/mongodb_consistent_backup/Main.py index 479455ff..23f001db 100644 --- a/mongodb_consistent_backup/Main.py +++ b/mongodb_consistent_backup/Main.py @@ -55,7 +55,7 @@ def __init__(self, prog_name="mongodb-consistent-backup"): self.setup_logger() self.setup_signal_handlers() self.get_lock() - self.logger.start_file_logger() + self.logger.update_symlink() self.init() self.set_backup_dirs() self.get_db_conn() @@ -74,6 +74,7 @@ def setup_logger(self): try: self.logger = Logger(self.config, self.backup_time) self.logger.start() + self.logger.start_file_logger() except Exception, e: self.exception("Could not start logger: %s" % e, e) @@ -117,6 +118,7 @@ def get_lock(self): self.lock = Lock(self.config.lock_file) except Exception: logging.fatal("Could not acquire lock: '%s'! Is another %s process running? Exiting" % (self.config.lock_file, self.program_name)) + self.logger.compress(True) sys.exit(1) def release_lock(self): diff --git a/mongodb_consistent_backup/Oplog/Oplog.py b/mongodb_consistent_backup/Oplog/Oplog.py index 42fd1dc1..2ae4102b 100644 --- a/mongodb_consistent_backup/Oplog/Oplog.py +++ b/mongodb_consistent_backup/Oplog/Oplog.py @@ -3,6 +3,7 @@ from gzip import GzipFile from bson import BSON, decode_file_iter +from bson.codec_options import CodecOptions from mongodb_consistent_backup.Errors import OperationError @@ -44,7 +45,7 @@ def load(self): try: oplog = self.open() logging.debug("Reading oplog file %s" % self.oplog_file) - for change in decode_file_iter(oplog): + for change in decode_file_iter(oplog, CodecOptions(unicode_decode_error_handler="ignore")): if 'ts' in change: self._last_ts = change['ts'] if self._first_ts is None and self._last_ts is not None: diff --git a/mongodb_consistent_backup/Oplog/Resolver/Resolver.py b/mongodb_consistent_backup/Oplog/Resolver/Resolver.py index 3cc892d9..dc81f2fe 100644 --- a/mongodb_consistent_backup/Oplog/Resolver/Resolver.py +++ b/mongodb_consistent_backup/Oplog/Resolver/Resolver.py @@ -4,7 +4,7 @@ # noinspection PyPackageRequirements from bson.timestamp import Timestamp from copy_reg import pickle -from multiprocessing import Pool +from multiprocessing import Pool, TimeoutError from time import sleep from types import MethodType @@ -40,6 +40,7 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, tailed_oplogs, self.completed = False self._pool = None self._pooled = [] + self._results = {} try: self._pool = Pool(processes=self.threads(None, 2)) except Exception, e: @@ -69,14 +70,23 @@ def done(self, done_uri): else: raise OperationError("Unexpected response from resolver thread: %s" % done_uri) - def wait(self): + def wait(self, max_wait_secs=6*3600, poll_secs=2): if len(self._pooled) > 0: + waited_secs = 0 self._pool.close() while len(self._pooled): logging.debug("Waiting for %i oplog resolver thread(s) to stop" % len(self._pooled)) - sleep(2) + try: + for thread_name in self._pooled: + thread = self._results[thread_name] + thread.get(poll_secs) + except TimeoutError: + if waited_secs < max_wait_secs: + waited_secs += poll_secs + else: + raise OperationError("Waited more than %i seconds for Oplog resolver! I will assume there is a problem and exit") self._pool.terminate() - logging.debug("Stopped all oplog resolve threads") + logging.debug("Stopped all oplog resolver threads") self.stopped = True self.running = False @@ -100,7 +110,8 @@ def run(self): raise OperationError("Backup oplog is newer than the tailed oplog!") else: try: - self._pool.apply_async(ResolverThread( + thread_name = uri.str() + self._results[thread_name] = self._pool.apply_async(ResolverThread( self.resolver_state[shard], uri, tailed_oplog.copy(), @@ -108,7 +119,7 @@ def run(self): self.get_consistent_end_ts(), self.compression() ).run, callback=self.done) - self._pooled.append(uri.str()) + self._pooled.append(thread_name) except Exception, e: logging.fatal("Resolve failed for %s! Error: %s" % (uri, e)) raise Error(e) diff --git a/mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py b/mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py index 74662c68..c7798904 100644 --- a/mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py +++ b/mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py @@ -4,6 +4,7 @@ # noinspection PyPackageRequirements from bson import decode_file_iter +from mongodb_consistent_backup.Errors import Error from mongodb_consistent_backup.Oplog import Oplog from mongodb_consistent_backup.Pipeline import PoolThread @@ -20,7 +21,7 @@ def __init__(self, state, uri, tailed_oplog, mongodump_oplog, max_end_ts, compre self.oplogs = {} self.changes = 0 - self.stopped = True + self.stopped = False def run(self): self.oplogs['backup'] = Oplog(self.mongodump_oplog['file'], self.do_gzip(), 'a+') @@ -46,7 +47,7 @@ def run(self): self.state.set('running', False) self.exit_code = 0 except Exception, e: - logging.exception("Resolving of oplogs failed! Error: %s" % e) + raise Error("Resolving of oplogs failed! Error: %s" % e) finally: self.close() diff --git a/mongodb_consistent_backup/Replication/Replset.py b/mongodb_consistent_backup/Replication/Replset.py index 472ccfeb..9f8fb1ac 100644 --- a/mongodb_consistent_backup/Replication/Replset.py +++ b/mongodb_consistent_backup/Replication/Replset.py @@ -151,6 +151,7 @@ def find_secondary(self, force=False, quiet=False): if self.secondary and not force: return self.secondary + secondary_count = 0 for member in rs_status['members']: member_uri = MongoUri(member['name'], 27017, rs_name) if member['state'] == 7: @@ -186,12 +187,12 @@ def find_secondary(self, force=False, quiet=False): if self.secondary is None or score > self.secondary['score']: self.secondary = { 'replSet': rs_name, - 'count': 1 if self.secondary is None else self.secondary['count'] + 1, 'uri': member_uri, 'optime': optime_ts, 'score': score } log_msg = "Found SECONDARY %s" % member_uri + secondary_count += 1 else: log_msg = "Found SECONDARY %s with too high replication lag! Skipping" % member_uri @@ -203,8 +204,7 @@ def find_secondary(self, force=False, quiet=False): log_data['score'] = int(score) logging.info("%s: %s" % (log_msg, str(log_data))) self.replset_summary['secondary'] = { "member": member, "uri": member_uri.str(), "data": log_data } - if self.secondary is None or (self.secondary['count'] + 1) < quorum: - secondary_count = self.secondary['count'] + 1 if self.secondary else 0 + if self.secondary is None or (secondary_count + 1) < quorum: logging.error("Not enough valid secondaries in replset %s to take backup! Num replset members: %i, required quorum: %i" % ( rs_name, secondary_count, diff --git a/mongodb_consistent_backup/Sharding.py b/mongodb_consistent_backup/Sharding.py index 15d1c16a..5e4ca6da 100644 --- a/mongodb_consistent_backup/Sharding.py +++ b/mongodb_consistent_backup/Sharding.py @@ -1,5 +1,6 @@ import logging +from pymongo import DESCENDING from time import sleep from mongodb_consistent_backup.Common import DB, MongoUri, validate_hostname @@ -18,6 +19,7 @@ def __init__(self, config, timer, db): self.timer_name = self.__class__.__name__ self.config_server = None self.config_db = None + self.mongos_db = None self._balancer_state_start = None self.restored = False @@ -36,8 +38,31 @@ def __init__(self, config, timer, db): def close(self): if self.config_db: self.config_db.close() + if self.mongos_db: + self.mongos_db.close() return self.restore_balancer_state() + def is_gte_34(self): + return self.db.server_version() >= tuple("3.4.0".split(".")) + + def get_mongos(self, force=False): + if not force and self.mongos_db: + return self.mongos_db + elif self.db.is_mongos(): + return self.db + else: + db = self.connection['config'] + for doc in db.mongos.find().sort('ping', DESCENDING): + try: + mongos_uri = MongoUri(doc['_id']) + logging.debug("Found cluster mongos: %s" % mongos_uri) + self.mongos_db = DB(mongos_uri, self.config, False, 'nearest') + logging.info("Connected to cluster mongos: %s" % mongos_uri) + return self.mongos_db + except DBConnectionFailure, e: + logging.debug("Failed to connect to mongos: %s, trying next available mongos" % mongos_uri) + raise OperationError('Could not connect to any mongos!') + def get_start_state(self): self._balancer_state_start = self.get_balancer_state() logging.info("Began with balancer state running: %s" % str(self._balancer_state_start)) @@ -45,9 +70,9 @@ def get_start_state(self): def shards(self): try: - if self.db.is_configsvr() and self.db.server_version() < tuple("3.4.0".split(".")): + if self.db.is_configsvr() or not self.is_gte_34(): return self.connection['config'].shards.find() - else: + elif self.is_gte_34(): listShards = self.db.admin_command("listShards") if 'shards' in listShards: return listShards['shards'] @@ -56,38 +81,56 @@ def shards(self): def check_balancer_running(self): try: - config = self.connection['config'] - lock = config['locks'].find_one({'_id': 'balancer'}) - if 'state' in lock and int(lock['state']) == 0: - return False + if self.is_gte_34(): + # 3.4+ configsvrs dont have balancerStatus, use self.get_mongos() to get a mongos connection for now + balancerState = self.get_mongos().admin_command("balancerStatus") + if 'inBalancerRound' in balancerState: + return balancerState['inBalancerRound'] + else: + config = self.connection['config'] + lock = config['locks'].find_one({'_id': 'balancer'}) + if 'state' in lock and int(lock['state']) == 0: + return False return True except Exception, e: raise DBOperationError(e) def get_balancer_state(self): try: - config = self.connection['config'] - state = config['settings'].find_one({'_id': 'balancer'}) - - if not state: - return True - elif 'stopped' in state and state.get('stopped') is True: - return False + if self.is_gte_34(): + # 3.4+ configsvrs dont have balancerStatus, use self.get_mongos() to get a mongos connection for now + balancerState = self.get_mongos().admin_command("balancerStatus") + if 'mode' in balancerState and balancerState['mode'] == 'off': + return False + return True else: - return True + config = self.connection['config'] + state = config['settings'].find_one({'_id': 'balancer'}) + if not state: + return True + elif 'stopped' in state and state.get('stopped') is True: + return False + return True except Exception, e: raise DBOperationError(e) def set_balancer(self, value): try: - if value is True: - set_value = False - elif value is False: - set_value = True + if self.is_gte_34(): + # 3.4+ configsvrs dont have balancerStart/Stop, even though they're the balancer! Use self.get_mongos() to get a mongos connection for now + if value is True: + self.get_mongos().admin_command("balancerStart") + else: + self.get_mongos().admin_command("balancerStop") else: - set_value = True - config = self.connection['config'] - config['settings'].update_one({'_id': 'balancer'}, {'$set': {'stopped': set_value}}) + if value is True: + set_value = False + elif value is False: + set_value = True + else: + set_value = True + config = self.connection['config'] + config['settings'].update_one({'_id': 'balancer'}, {'$set': {'stopped': set_value}}) except Exception, e: logging.fatal("Failed to set balancer state! Error: %s" % e) raise DBOperationError(e) diff --git a/scripts/build.sh b/scripts/build.sh index e1236dcf..2793a8e9 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -88,7 +88,10 @@ if [ -d ${srcdir} ]; then source ${venvdir}/bin/activate [ ! -d ${pipdir} ] && mkdir -p ${pipdir} - ${venvdir}/bin/python2.7 ${venvdir}/bin/pip install --download-cache=${pipdir} pex requests + pip_flags="--download-cache=${pipdir}" + ${venvdir}/bin/python2.7 ${venvdir}/bin/pip --help | grep -q '\-\-cache\-dir' + [ $? = 0 ] && pip_flags="--cache-dir=${pipdir}" + ${venvdir}/bin/python2.7 ${venvdir}/bin/pip install ${pip_flags} pex requests if [ $? -gt 0 ]; then echo "Failed to install pex utility for building!" exit 1 @@ -97,7 +100,7 @@ if [ -d ${srcdir} ]; then if [ ! -d ${pexdir} ]; then mkdir -p ${pexdir} else - rm -f ${pexdir}/build/mongodb_consistent_backup-*.whl + find ${pexdir} -type f -name "${mod_name}-*.whl" -delete fi [ ! -d ${bindir} ] && mkdir -p ${bindir} ${venvdir}/bin/python2.7 ${venvdir}/bin/pex -o ${output_file} -m ${mod_name} -r ${require_file} --pex-root=${pexdir} ${builddir}