diff --git a/conf/mongodb-consistent-backup.example.conf b/conf/mongodb-consistent-backup.example.conf index f00b61df..10c219bf 100644 --- a/conf/mongodb-consistent-backup.example.conf +++ b/conf/mongodb-consistent-backup.example.conf @@ -20,10 +20,11 @@ production: # compression: [auto|none|gzip] (default: auto = enable gzip if supported) # threads: [1-16] (default: auto-generated, shards/cpu) #replication: - # max_lag_secs: [1+] (default: 10) - # min_priority: [0-999] (default: 0) - # max_priority: [2-1000] (default: 1000) - # hidden_only: [true|false] (default: false) + # max_lag_secs: [1+] (default: 10) + # min_priority: [0-999] (default: 0) + # max_priority: [2-1000] (default: 1000) + # hidden_only: [true|false] (default: false) + # read_pref_tags: [key:value,key:value,...] (default: none) #sharding: # balancer: # wait_secs: [1+] (default: 300) diff --git a/mongodb_consistent_backup/Backup/Mongodump/Mongodump.py b/mongodb_consistent_backup/Backup/Mongodump/Mongodump.py index f34feeac..78c7fc1b 100644 --- a/mongodb_consistent_backup/Backup/Mongodump/Mongodump.py +++ b/mongodb_consistent_backup/Backup/Mongodump/Mongodump.py @@ -16,11 +16,11 @@ class Mongodump(Task): def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, backup_stop=None, sharding=None): super(Mongodump, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir) - self.compression_method = self.config.backup.mongodump.compression - self.binary = self.config.backup.mongodump.binary self.user = self.config.username self.password = self.config.password self.authdb = self.config.authdb + self.compression_method = self.config.backup.mongodump.compression + self.binary = self.config.backup.mongodump.binary self.replsets = replsets self.backup_stop = backup_stop self.sharding = sharding diff --git a/mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py b/mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py index f1ec043e..a67e1619 100644 --- a/mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py +++ b/mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py @@ -1,3 +1,4 @@ +import json import os import logging import sys @@ -8,7 +9,7 @@ from signal import signal, SIGINT, SIGTERM, SIG_IGN from subprocess import Popen, PIPE -from mongodb_consistent_backup.Common import is_datetime, parse_config_bool +from mongodb_consistent_backup.Common import is_datetime, parse_config_bool, parse_read_pref_tags from mongodb_consistent_backup.Oplog import Oplog @@ -31,6 +32,7 @@ def __init__(self, state, uri, timer, config, base_dir, version, threads=0, dump self.ssl_ca_file = self.config.ssl.ca_file self.ssl_crl_file = self.config.ssl.crl_file self.ssl_client_cert_file = self.config.ssl.client_cert_file + self.read_pref_tags = self.config.replication.read_pref_tags self.binary = self.config.backup.mongodump.binary self.timer_name = "%s-%s" % (self.__class__.__name__, self.uri.replset) @@ -67,6 +69,12 @@ def is_version_gte(self, compare): return True return False + def parse_read_pref(self, mode="secondary"): + rp = {"mode": mode} + if self.read_pref_tags: + rp["tags"] = parse_read_pref_tags(self.read_pref_tags) + return json.dumps(rp, separators=(',', ':')) + def parse_mongodump_line(self, line): try: line = line.rstrip() @@ -130,37 +138,59 @@ def wait(self): def mongodump_cmd(self): mongodump_uri = self.uri.get() mongodump_cmd = [self.binary] - mongodump_flags = ["--host", mongodump_uri.host, "--port", str(mongodump_uri.port), "--oplog", "--out", "%s/dump" % self.backup_dir] + mongodump_flags = [ + "--host=%s" % mongodump_uri.host, + "--port=%s" % str(mongodump_uri.port), + "--oplog", + "--out=%s/dump" % self.backup_dir + ] + + # --numParallelCollections if self.threads > 0: - mongodump_flags.extend(["--numParallelCollections=" + str(self.threads)]) + mongodump_flags.append("--numParallelCollections=%s" % str(self.threads)) + # --gzip if self.dump_gzip: - mongodump_flags.extend(["--gzip"]) - - if self.is_version_gte("3.4.0"): - mongodump_flags.extend(["--readPreference=secondary"]) - + mongodump_flags.append("--gzip") + + # --readPreference + if self.is_version_gte("3.2.0"): + read_pref = self.parse_read_pref() + if read_pref: + mongodump_flags.append("--readPreference=%s" % read_pref) + elif self.read_pref_tags: + logging.fatal("Mongodump must be >= 3.2.0 to set read preference!") + sys.exit(1) + + # --username/--password/--authdb if self.authdb and self.authdb != "admin": logging.debug("Using database %s for authentication" % self.authdb) - mongodump_flags.extend(["--authenticationDatabase", self.authdb]) + mongodump_flags.append("--authenticationDatabase=%s" % self.authdb) if self.user and self.password: # >= 3.0.2 supports password input via stdin to mask from ps if self.is_version_gte("3.0.2"): - mongodump_flags.extend(["-u", self.user, "-p", '""']) + mongodump_flags.extend([ + "--username=%s" % self.user, + "--password=\"\"" + ]) self.do_stdin_passwd = True else: logging.warning("Mongodump is too old to set password securely! Upgrade to mongodump >= 3.0.2 to resolve this") - mongodump_flags.extend(["-u", self.user, "-p", self.password]) + mongodump_flags.extend([ + "--username=%s" % self.user, + "--password=%s" % self.password + ]) + # --ssl if self.do_ssl(): if self.is_version_gte("2.6.0"): mongodump_flags.append("--ssl") if self.ssl_ca_file: - mongodump_flags.extend(["--sslCAFile", self.ssl_ca_file]) + mongodump_flags.append("--sslCAFile=%s" % self.ssl_ca_file) if self.ssl_crl_file: - mongodump_flags.extend(["--sslCRLFile", self.ssl_crl_file]) + mongodump_flags.append("--sslCRLFile=%s" % self.ssl_crl_file) if self.client_cert_file: - mongodump_flags.extend(["--sslPEMKeyFile", self.ssl_cert_file]) + mongodump_flags.append("--sslPEMKeyFile=%s" % self.ssl_cert_file) if self.do_ssl_insecure(): mongodump_flags.extend(["--sslAllowInvalidCertificates", "--sslAllowInvalidHostnames"]) else: @@ -182,7 +212,7 @@ def run(self): if os.path.isdir(self.dump_dir): rmtree(self.dump_dir) os.makedirs(self.dump_dir) - logging.debug("Running mongodump cmd: %s" % mongodump_cmd) + logging.debug("Running mongodump cmd: %s" % " ".join(mongodump_cmd)) self._process = Popen(mongodump_cmd, stdin=PIPE, stderr=PIPE) self.wait() self.exit_code = self._process.returncode diff --git a/mongodb_consistent_backup/Common/DB.py b/mongodb_consistent_backup/Common/DB.py index d7708b3f..44c42326 100644 --- a/mongodb_consistent_backup/Common/DB.py +++ b/mongodb_consistent_backup/Common/DB.py @@ -11,15 +11,34 @@ from mongodb_consistent_backup.Errors import DBAuthenticationError, DBConnectionError, DBOperationError, Error +def parse_read_pref_tags(tags_str): + tags = {} + for pair in tags_str.replace(" ", "").split(","): + if ":" in pair: + key, value = pair.split(":") + tags[key] = str(value) + return tags + + class DB: - def __init__(self, uri, config, do_replset=False, read_pref='primaryPreferred', do_connect=True, conn_timeout=5000, retries=5): - self.uri = uri - self.config = config - self.do_replset = do_replset - self.read_pref = read_pref - self.do_connect = do_connect - self.conn_timeout = conn_timeout - self.retries = retries + def __init__(self, uri, config, do_replset=False, read_pref='primaryPreferred', do_rp_tags=False, + do_connect=True, conn_timeout=5000, retries=5): + self.uri = uri + self.config = config + self.do_replset = do_replset + self.read_pref = read_pref + self.do_rp_tags = do_rp_tags + self.do_connect = do_connect + self.conn_timeout = conn_timeout + self.retries = retries + + self.username = self.config.username + self.password = self.config.password + self.authdb = self.config.authdb + self.ssl_ca_file = self.config.ssl.ca_file + self.ssl_crl_file = self.config.ssl.crl_file + self.ssl_client_cert_file = self.config.ssl.client_cert_file + self.read_pref_tags = self.config.replication.read_pref_tags self.username = self.config.username self.password = self.config.password @@ -56,6 +75,14 @@ def client_opts(self): "readPreference": self.read_pref, "w": "majority" }) + if self.do_rp_tags and self.read_pref_tags: + logging.debug("Using read preference mode: %s, tags: %s" % ( + self.read_pref, + parse_read_pref_tags(self.read_pref_tags) + )) + self.read_pref_tags = self.read_pref_tags.replace(" ", "") + opts["readPreferenceTags"] = self.read_pref_tags + if self.do_ssl(): logging.debug("Using SSL-secured mongodb connection (ca_cert=%s, client_cert=%s, crl_file=%s, insecure=%s)" % ( self.ssl_ca_file, @@ -76,10 +103,11 @@ def client_opts(self): def connect(self): try: - logging.debug("Getting MongoDB connection to %s (replicaSet=%s, readPreference=%s, ssl=%s)" % ( + logging.debug("Getting MongoDB connection to %s (replicaSet=%s, readPreference=%s, readPreferenceTags=%s, ssl=%s)" % ( self.uri, self.replset, self.read_pref, + self.do_rp_tags, self.do_ssl(), )) conn = MongoClient(**self.client_opts()) diff --git a/mongodb_consistent_backup/Common/__init__.py b/mongodb_consistent_backup/Common/__init__.py index 5d65b8b3..c07a4fde 100644 --- a/mongodb_consistent_backup/Common/__init__.py +++ b/mongodb_consistent_backup/Common/__init__.py @@ -1,5 +1,5 @@ from Config import Config, parse_config_bool # NOQA -from DB import DB # NOQA +from DB import DB, parse_read_pref_tags # NOQA from LocalCommand import LocalCommand # NOQA from Lock import Lock # NOQA from MongoUri import MongoUri # NOQA diff --git a/mongodb_consistent_backup/Oplog/Tailer/TailThread.py b/mongodb_consistent_backup/Oplog/Tailer/TailThread.py index d97deb99..74c4393c 100644 --- a/mongodb_consistent_backup/Oplog/Tailer/TailThread.py +++ b/mongodb_consistent_backup/Oplog/Tailer/TailThread.py @@ -3,7 +3,7 @@ # noinspection PyPackageRequirements from multiprocessing import Process -from pymongo.errors import AutoReconnect, ConnectionFailure, CursorNotFound, ExceededMaxWaiters, ExecutionTimeout, NetworkTimeout, NotMasterError +from pymongo.errors import AutoReconnect, ConnectionFailure, CursorNotFound, ExceededMaxWaiters, ExecutionTimeout, NetworkTimeout, NotMasterError, ServerSelectionTimeoutError from signal import signal, SIGINT, SIGTERM, SIG_IGN from time import sleep, time @@ -88,7 +88,7 @@ def status(self): def connect(self): if not self.db: - self.db = DB(self.uri, self.config, True, 'secondary') + self.db = DB(self.uri, self.config, True, 'secondary', True) return self.db.connection() def run(self): @@ -144,21 +144,25 @@ def run(self): continue sleep(1) finally: - self._cursor.close() + if self._cursor: + logging.debug("Stopping oplog cursor on %s" % self.uri) + self._cursor.close() except OperationError, e: logging.error("Tailer %s encountered error: %s" % (self.uri, e)) self.exit_code = 1 self.backup_stop.set() raise OperationError(e) + except ServerSelectionTimeoutError, e: + logging.error("Tailer %s could not connect: %s" % (self.uri, e)) + self.exit_code = 1 + self.backup_stop.set() + raise OperationError(e) except Exception, e: logging.error("Tailer %s encountered an unexpected error: %s" % (self.uri, e)) self.exit_code = 1 self.backup_stop.set() raise e finally: - if self._cursor: - logging.debug("Stopping oplog cursor on %s" % self.uri) - self._cursor.close() oplog.flush() oplog.close() self.stopped = True diff --git a/mongodb_consistent_backup/Replication/Replset.py b/mongodb_consistent_backup/Replication/Replset.py index 408053af..092a54f8 100644 --- a/mongodb_consistent_backup/Replication/Replset.py +++ b/mongodb_consistent_backup/Replication/Replset.py @@ -4,18 +4,19 @@ from math import ceil from time import mktime -from mongodb_consistent_backup.Common import DB, MongoUri +from mongodb_consistent_backup.Common import DB, MongoUri, parse_read_pref_tags from mongodb_consistent_backup.Errors import Error, OperationError class Replset: def __init__(self, config, db): - self.config = config - self.db = db - self.max_lag_secs = self.config.replication.max_lag_secs - self.min_priority = self.config.replication.min_priority - self.max_priority = self.config.replication.max_priority - self.hidden_only = self.config.replication.hidden_only + self.config = config + self.db = db + self.read_pref_tags = self.config.replication.read_pref_tags + self.max_lag_secs = self.config.replication.max_lag_secs + self.min_priority = self.config.replication.min_priority + self.max_priority = self.config.replication.max_priority + self.hidden_only = self.config.replication.hidden_only self.state_primary = 1 self.state_secondary = 2 @@ -142,6 +143,18 @@ def is_member_electable(self, member): return True return False + def has_read_pref_tags(self, member_config): + if "tags" not in member_config: + raise OperationError("Member config has no 'tags' field!") + tags = parse_read_pref_tags(self.read_pref_tags) + member_tags = member_config["tags"] + for key in tags: + if key not in member_tags: + return False + if member_tags[key] != tags[key]: + return False + return True + def find_primary(self, force=False, quiet=False): if force or not self.primary: rs_status = self.get_rs_status(force, quiet) @@ -173,8 +186,8 @@ def find_secondary(self, force=False, quiet=False): self.get_rs_config(force, quiet) self.get_mongo_config(force, quiet) - quorum = self.get_rs_quorum() - rs_name = rs_status['set'] + quorum = self.get_rs_quorum() + rs_name = rs_status['set'] if self.secondary and not force: return self.secondary @@ -196,6 +209,14 @@ def find_secondary(self, force=False, quiet=False): score = self.max_lag_secs * 10 score_scale = 100.00 / float(score) priority = 0 + + if self.read_pref_tags and not self.has_read_pref_tags(member_config): + logging.info("Found SECONDARY %s without read preference tags: %s, skipping" % ( + member_uri, + parse_read_pref_tags(self.read_pref_tags) + )) + continue + if 'hidden' in member_config and member_config['hidden']: score += (score * self.hidden_weight) log_data['hidden'] = True diff --git a/mongodb_consistent_backup/Replication/__init__.py b/mongodb_consistent_backup/Replication/__init__.py index e1559a31..6da8aa82 100644 --- a/mongodb_consistent_backup/Replication/__init__.py +++ b/mongodb_consistent_backup/Replication/__init__.py @@ -7,6 +7,6 @@ def config(parser): parser.add_argument("--replication.min_priority", dest="replication.min_priority", help="Min priority of secondary members for backup (default: 0)", default=0, type=int) parser.add_argument("--replication.max_priority", dest="replication.max_priority", help="Max priority of secondary members for backup (default: 1000)", default=1000, type=int) parser.add_argument("--replication.hidden_only", dest="replication.hidden_only", help="Only use hidden secondary members for backup (default: false)", default=False, action="store_true") - # todo: add tag-specific backup option - # parser.add_argument("-replication.use_tag", dest="replication.use_tag", help="Only use secondary members with tag for backup", type=str) + parser.add_argument("--replication.read_pref_tags", dest="replication.read_pref_tags", default=None, type=str, + help="Only use members that match replication tags in comma-separated key:value format (default: none)") return parser