Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
fe86246
Support read_preference tags with Common/DB.py
timvaillancourt Sep 7, 2017
7c14486
only cleanup string if not None
timvaillancourt Sep 11, 2017
3b894c7
Only apply RP tag if not None, pymongo gives 'None not a valid value …
timvaillancourt Sep 11, 2017
21e842b
Only apply RP tag if not None, pymongo gives 'None not a valid value …
timvaillancourt Sep 11, 2017
291b786
Parse mongodump json RP tag string, add function for checking mongodu…
timvaillancourt Sep 12, 2017
b713d7d
Fix quotes
timvaillancourt Sep 12, 2017
20845cb
log typo
timvaillancourt Sep 12, 2017
6ce36a7
Use long-flag=value for all mongodump flags
timvaillancourt Sep 12, 2017
492d399
Only set read pref tags if boolean is True, make only the tailing and…
timvaillancourt Sep 12, 2017
edb2ad7
Fix some Oplog/TailThread.py exception problems found in testing
timvaillancourt Sep 12, 2017
a5e7c4c
Move read_pref string parser to Common/DB.py
timvaillancourt Sep 12, 2017
5979886
Only put the tag-parsing part of the parse_read_pref logic in Common/…
timvaillancourt Sep 12, 2017
8588669
Don't json.dumps() twice
timvaillancourt Sep 12, 2017
991e479
Improved logging of RP tags
timvaillancourt Sep 12, 2017
3e9f7ca
Improved logging
timvaillancourt Sep 12, 2017
b91cc99
Improved logging #2
timvaillancourt Sep 12, 2017
86eec18
Improved logging #3
timvaillancourt Sep 12, 2017
6c66e6e
Improved logging #4
timvaillancourt Sep 12, 2017
c7c9c97
Improved logging #5
timvaillancourt Sep 12, 2017
59d7792
code cleanup
timvaillancourt Sep 12, 2017
099759c
remove spaces in json
timvaillancourt Sep 12, 2017
be3ce3f
merge ssl branch
timvaillancourt Sep 13, 2017
ffdadbd
Merge branch 'master' into 1.2.0-read_pref_tags
timvaillancourt Sep 20, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions conf/mongodb-consistent-backup.example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ production:
# compression: [auto|none|gzip] (default: auto = enable gzip if supported)
# threads: [1-16] (default: auto-generated, shards/cpu)
#replication:
# max_lag_secs: [1+] (default: 10)
# min_priority: [0-999] (default: 0)
# max_priority: [2-1000] (default: 1000)
# hidden_only: [true|false] (default: false)
# max_lag_secs: [1+] (default: 10)
# min_priority: [0-999] (default: 0)
# max_priority: [2-1000] (default: 1000)
# hidden_only: [true|false] (default: false)
# read_pref_tags: [key:value,key:value,...] (default: none)
#sharding:
# balancer:
# wait_secs: [1+] (default: 300)
Expand Down
4 changes: 2 additions & 2 deletions mongodb_consistent_backup/Backup/Mongodump/Mongodump.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
class Mongodump(Task):
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, backup_stop=None, sharding=None):
super(Mongodump, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir)
self.compression_method = self.config.backup.mongodump.compression
self.binary = self.config.backup.mongodump.binary
self.user = self.config.username
self.password = self.config.password
self.authdb = self.config.authdb
self.compression_method = self.config.backup.mongodump.compression
self.binary = self.config.backup.mongodump.binary
self.replsets = replsets
self.backup_stop = backup_stop
self.sharding = sharding
Expand Down
60 changes: 45 additions & 15 deletions mongodb_consistent_backup/Backup/Mongodump/MongodumpThread.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os
import logging
import sys
Expand All @@ -8,7 +9,7 @@
from signal import signal, SIGINT, SIGTERM, SIG_IGN
from subprocess import Popen, PIPE

from mongodb_consistent_backup.Common import is_datetime, parse_config_bool
from mongodb_consistent_backup.Common import is_datetime, parse_config_bool, parse_read_pref_tags
from mongodb_consistent_backup.Oplog import Oplog


Expand All @@ -31,6 +32,7 @@ def __init__(self, state, uri, timer, config, base_dir, version, threads=0, dump
self.ssl_ca_file = self.config.ssl.ca_file
self.ssl_crl_file = self.config.ssl.crl_file
self.ssl_client_cert_file = self.config.ssl.client_cert_file
self.read_pref_tags = self.config.replication.read_pref_tags
self.binary = self.config.backup.mongodump.binary

self.timer_name = "%s-%s" % (self.__class__.__name__, self.uri.replset)
Expand Down Expand Up @@ -67,6 +69,12 @@ def is_version_gte(self, compare):
return True
return False

def parse_read_pref(self, mode="secondary"):
rp = {"mode": mode}
if self.read_pref_tags:
rp["tags"] = parse_read_pref_tags(self.read_pref_tags)
return json.dumps(rp, separators=(',', ':'))

def parse_mongodump_line(self, line):
try:
line = line.rstrip()
Expand Down Expand Up @@ -130,37 +138,59 @@ def wait(self):
def mongodump_cmd(self):
mongodump_uri = self.uri.get()
mongodump_cmd = [self.binary]
mongodump_flags = ["--host", mongodump_uri.host, "--port", str(mongodump_uri.port), "--oplog", "--out", "%s/dump" % self.backup_dir]
mongodump_flags = [
"--host=%s" % mongodump_uri.host,
"--port=%s" % str(mongodump_uri.port),
"--oplog",
"--out=%s/dump" % self.backup_dir
]

# --numParallelCollections
if self.threads > 0:
mongodump_flags.extend(["--numParallelCollections=" + str(self.threads)])
mongodump_flags.append("--numParallelCollections=%s" % str(self.threads))

# --gzip
if self.dump_gzip:
mongodump_flags.extend(["--gzip"])

if self.is_version_gte("3.4.0"):
mongodump_flags.extend(["--readPreference=secondary"])

mongodump_flags.append("--gzip")

# --readPreference
if self.is_version_gte("3.2.0"):
read_pref = self.parse_read_pref()
if read_pref:
mongodump_flags.append("--readPreference=%s" % read_pref)
elif self.read_pref_tags:
logging.fatal("Mongodump must be >= 3.2.0 to set read preference!")
sys.exit(1)

# --username/--password/--authdb
if self.authdb and self.authdb != "admin":
logging.debug("Using database %s for authentication" % self.authdb)
mongodump_flags.extend(["--authenticationDatabase", self.authdb])
mongodump_flags.append("--authenticationDatabase=%s" % self.authdb)
if self.user and self.password:
# >= 3.0.2 supports password input via stdin to mask from ps
if self.is_version_gte("3.0.2"):
mongodump_flags.extend(["-u", self.user, "-p", '""'])
mongodump_flags.extend([
"--username=%s" % self.user,
"--password=\"\""
])
self.do_stdin_passwd = True
else:
logging.warning("Mongodump is too old to set password securely! Upgrade to mongodump >= 3.0.2 to resolve this")
mongodump_flags.extend(["-u", self.user, "-p", self.password])
mongodump_flags.extend([
"--username=%s" % self.user,
"--password=%s" % self.password
])

# --ssl
if self.do_ssl():
if self.is_version_gte("2.6.0"):
mongodump_flags.append("--ssl")
if self.ssl_ca_file:
mongodump_flags.extend(["--sslCAFile", self.ssl_ca_file])
mongodump_flags.append("--sslCAFile=%s" % self.ssl_ca_file)
if self.ssl_crl_file:
mongodump_flags.extend(["--sslCRLFile", self.ssl_crl_file])
mongodump_flags.append("--sslCRLFile=%s" % self.ssl_crl_file)
if self.client_cert_file:
mongodump_flags.extend(["--sslPEMKeyFile", self.ssl_cert_file])
mongodump_flags.append("--sslPEMKeyFile=%s" % self.ssl_cert_file)
if self.do_ssl_insecure():
mongodump_flags.extend(["--sslAllowInvalidCertificates", "--sslAllowInvalidHostnames"])
else:
Expand All @@ -182,7 +212,7 @@ def run(self):
if os.path.isdir(self.dump_dir):
rmtree(self.dump_dir)
os.makedirs(self.dump_dir)
logging.debug("Running mongodump cmd: %s" % mongodump_cmd)
logging.debug("Running mongodump cmd: %s" % " ".join(mongodump_cmd))
self._process = Popen(mongodump_cmd, stdin=PIPE, stderr=PIPE)
self.wait()
self.exit_code = self._process.returncode
Expand Down
46 changes: 37 additions & 9 deletions mongodb_consistent_backup/Common/DB.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,34 @@
from mongodb_consistent_backup.Errors import DBAuthenticationError, DBConnectionError, DBOperationError, Error


def parse_read_pref_tags(tags_str):
tags = {}
for pair in tags_str.replace(" ", "").split(","):
if ":" in pair:
key, value = pair.split(":")
tags[key] = str(value)
return tags


class DB:
def __init__(self, uri, config, do_replset=False, read_pref='primaryPreferred', do_connect=True, conn_timeout=5000, retries=5):
self.uri = uri
self.config = config
self.do_replset = do_replset
self.read_pref = read_pref
self.do_connect = do_connect
self.conn_timeout = conn_timeout
self.retries = retries
def __init__(self, uri, config, do_replset=False, read_pref='primaryPreferred', do_rp_tags=False,
do_connect=True, conn_timeout=5000, retries=5):
self.uri = uri
self.config = config
self.do_replset = do_replset
self.read_pref = read_pref
self.do_rp_tags = do_rp_tags
self.do_connect = do_connect
self.conn_timeout = conn_timeout
self.retries = retries

self.username = self.config.username
self.password = self.config.password
self.authdb = self.config.authdb
self.ssl_ca_file = self.config.ssl.ca_file
self.ssl_crl_file = self.config.ssl.crl_file
self.ssl_client_cert_file = self.config.ssl.client_cert_file
self.read_pref_tags = self.config.replication.read_pref_tags

self.username = self.config.username
self.password = self.config.password
Expand Down Expand Up @@ -56,6 +75,14 @@ def client_opts(self):
"readPreference": self.read_pref,
"w": "majority"
})
if self.do_rp_tags and self.read_pref_tags:
logging.debug("Using read preference mode: %s, tags: %s" % (
self.read_pref,
parse_read_pref_tags(self.read_pref_tags)
))
self.read_pref_tags = self.read_pref_tags.replace(" ", "")
opts["readPreferenceTags"] = self.read_pref_tags

if self.do_ssl():
logging.debug("Using SSL-secured mongodb connection (ca_cert=%s, client_cert=%s, crl_file=%s, insecure=%s)" % (
self.ssl_ca_file,
Expand All @@ -76,10 +103,11 @@ def client_opts(self):

def connect(self):
try:
logging.debug("Getting MongoDB connection to %s (replicaSet=%s, readPreference=%s, ssl=%s)" % (
logging.debug("Getting MongoDB connection to %s (replicaSet=%s, readPreference=%s, readPreferenceTags=%s, ssl=%s)" % (
self.uri,
self.replset,
self.read_pref,
self.do_rp_tags,
self.do_ssl(),
))
conn = MongoClient(**self.client_opts())
Expand Down
2 changes: 1 addition & 1 deletion mongodb_consistent_backup/Common/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from Config import Config, parse_config_bool # NOQA
from DB import DB # NOQA
from DB import DB, parse_read_pref_tags # NOQA
from LocalCommand import LocalCommand # NOQA
from Lock import Lock # NOQA
from MongoUri import MongoUri # NOQA
Expand Down
16 changes: 10 additions & 6 deletions mongodb_consistent_backup/Oplog/Tailer/TailThread.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

# noinspection PyPackageRequirements
from multiprocessing import Process
from pymongo.errors import AutoReconnect, ConnectionFailure, CursorNotFound, ExceededMaxWaiters, ExecutionTimeout, NetworkTimeout, NotMasterError
from pymongo.errors import AutoReconnect, ConnectionFailure, CursorNotFound, ExceededMaxWaiters, ExecutionTimeout, NetworkTimeout, NotMasterError, ServerSelectionTimeoutError
from signal import signal, SIGINT, SIGTERM, SIG_IGN
from time import sleep, time

Expand Down Expand Up @@ -88,7 +88,7 @@ def status(self):

def connect(self):
if not self.db:
self.db = DB(self.uri, self.config, True, 'secondary')
self.db = DB(self.uri, self.config, True, 'secondary', True)
return self.db.connection()

def run(self):
Expand Down Expand Up @@ -144,21 +144,25 @@ def run(self):
continue
sleep(1)
finally:
self._cursor.close()
if self._cursor:
logging.debug("Stopping oplog cursor on %s" % self.uri)
self._cursor.close()
except OperationError, e:
logging.error("Tailer %s encountered error: %s" % (self.uri, e))
self.exit_code = 1
self.backup_stop.set()
raise OperationError(e)
except ServerSelectionTimeoutError, e:
logging.error("Tailer %s could not connect: %s" % (self.uri, e))
self.exit_code = 1
self.backup_stop.set()
raise OperationError(e)
except Exception, e:
logging.error("Tailer %s encountered an unexpected error: %s" % (self.uri, e))
self.exit_code = 1
self.backup_stop.set()
raise e
finally:
if self._cursor:
logging.debug("Stopping oplog cursor on %s" % self.uri)
self._cursor.close()
oplog.flush()
oplog.close()
self.stopped = True
Expand Down
39 changes: 30 additions & 9 deletions mongodb_consistent_backup/Replication/Replset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@
from math import ceil
from time import mktime

from mongodb_consistent_backup.Common import DB, MongoUri
from mongodb_consistent_backup.Common import DB, MongoUri, parse_read_pref_tags
from mongodb_consistent_backup.Errors import Error, OperationError


class Replset:
def __init__(self, config, db):
self.config = config
self.db = db
self.max_lag_secs = self.config.replication.max_lag_secs
self.min_priority = self.config.replication.min_priority
self.max_priority = self.config.replication.max_priority
self.hidden_only = self.config.replication.hidden_only
self.config = config
self.db = db
self.read_pref_tags = self.config.replication.read_pref_tags
self.max_lag_secs = self.config.replication.max_lag_secs
self.min_priority = self.config.replication.min_priority
self.max_priority = self.config.replication.max_priority
self.hidden_only = self.config.replication.hidden_only

self.state_primary = 1
self.state_secondary = 2
Expand Down Expand Up @@ -142,6 +143,18 @@ def is_member_electable(self, member):
return True
return False

def has_read_pref_tags(self, member_config):
if "tags" not in member_config:
raise OperationError("Member config has no 'tags' field!")
tags = parse_read_pref_tags(self.read_pref_tags)
member_tags = member_config["tags"]
for key in tags:
if key not in member_tags:
return False
if member_tags[key] != tags[key]:
return False
return True

def find_primary(self, force=False, quiet=False):
if force or not self.primary:
rs_status = self.get_rs_status(force, quiet)
Expand Down Expand Up @@ -173,8 +186,8 @@ def find_secondary(self, force=False, quiet=False):
self.get_rs_config(force, quiet)
self.get_mongo_config(force, quiet)

quorum = self.get_rs_quorum()
rs_name = rs_status['set']
quorum = self.get_rs_quorum()
rs_name = rs_status['set']

if self.secondary and not force:
return self.secondary
Expand All @@ -196,6 +209,14 @@ def find_secondary(self, force=False, quiet=False):
score = self.max_lag_secs * 10
score_scale = 100.00 / float(score)
priority = 0

if self.read_pref_tags and not self.has_read_pref_tags(member_config):
logging.info("Found SECONDARY %s without read preference tags: %s, skipping" % (
member_uri,
parse_read_pref_tags(self.read_pref_tags)
))
continue

if 'hidden' in member_config and member_config['hidden']:
score += (score * self.hidden_weight)
log_data['hidden'] = True
Expand Down
4 changes: 2 additions & 2 deletions mongodb_consistent_backup/Replication/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ def config(parser):
parser.add_argument("--replication.min_priority", dest="replication.min_priority", help="Min priority of secondary members for backup (default: 0)", default=0, type=int)
parser.add_argument("--replication.max_priority", dest="replication.max_priority", help="Max priority of secondary members for backup (default: 1000)", default=1000, type=int)
parser.add_argument("--replication.hidden_only", dest="replication.hidden_only", help="Only use hidden secondary members for backup (default: false)", default=False, action="store_true")
# todo: add tag-specific backup option
# parser.add_argument("-replication.use_tag", dest="replication.use_tag", help="Only use secondary members with tag for backup", type=str)
parser.add_argument("--replication.read_pref_tags", dest="replication.read_pref_tags", default=None, type=str,
help="Only use members that match replication tags in comma-separated key:value format (default: none)")
return parser