diff --git a/README.rst b/README.rst index f87a7a1e..b31cece3 100644 --- a/README.rst +++ b/README.rst @@ -31,6 +31,7 @@ Features archiving method (*optional*) - `AWS S3 `__ Secure Multipart backup uploads (*optional*) - `Google Cloud Storage `__ Secure backup uploads (*optional*) +- Rsync (over SSH) secure backup uploads (*optional*) - `Nagios NSCA `__ push notification support (*optional*) - Modular backup, archiving, upload and notification components @@ -219,7 +220,6 @@ Roadmap - Upload compatibility for ZBackup archive phase *(upload unsupported today)* - Backup retention/rotation *(eg: delete old backups)* - Support more notification methods *(Prometheus, PagerDuty, etc)* -- Support more upload methods *(Rsync, etc)* - Support SSL MongoDB connections - Documentation for running under Docker with persistent volumes - Python unit tests diff --git a/VERSION b/VERSION index 9084fa2f..26aaba0e 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.1.0 +1.2.0 diff --git a/conf/mongodb-consistent-backup.example.conf b/conf/mongodb-consistent-backup.example.conf index 0e0c8321..00fd8fd1 100644 --- a/conf/mongodb-consistent-backup.example.conf +++ b/conf/mongodb-consistent-backup.example.conf @@ -11,8 +11,8 @@ production: location: /var/lib/mongodb-consistent-backup # mongodump: # binary: [path] (default: /usr/bin/mongodump) - # compression: [auto|none|gzip] (default: auto - enable gzip if supported) - # threads: [1-16] (default: auto-generated - shards/cpu) + # compression: [auto|none|gzip] (default: auto = enable gzip if supported) + # threads: [1-16] (default: auto-generated, shards/cpu) #replication: # max_lag_secs: [1+] (default: 10) # min_priority: [0-999] (default: 0) @@ -23,18 +23,19 @@ production: # wait_secs: [1+] (default: 300) # ping_secs: [1+] (default: 3) #oplog: - # compression: [none|gzip] (default: gzip - if gzip is used by backup stage) + # compression: [none|gzip] (default: gzip, if used by backup stage) # flush: - # max_docs: 100 - # max_secs: 1 - # resolver_threads: [1+] (default: 2 per CPU) + # max_docs: [1+] (default: 100) + # max_secs: [1+] (default: 1) + # resolver: + # threads: [1+] (default: 2 per CPU) # tailer: # enabled: true # status_interval: 30 archive: method: tar # tar: - # compression: [none|gzip] (default: gzip - none if backup is already compressed) + # compression: [none|gzip] (default: gzip, none if backup already compressed) # threads: [1+] (default: 1 per CPU) # zbackup: # binary: [path] (default: /usr/bin/zbackup) @@ -58,7 +59,15 @@ production: # secret_key: [Google Cloud Storage Secret Key] # bucket_name: [Google Cloud Storage Bucket Name] # bucket_prefix: [prefix] (default: /) - # threads: [1+] (default: 1 per CPU) + # threads: [1+] (default: 4) + # rsync: + # path: [Rsync Destination Path] + # user: [SSH Username] + # host: [SSH Hostname/IP] + # port: [SSH Port Number] (default: 22) + # delete: [true|false] (default: false) + # threads: [1+] (default: 4) + # retries: [1+] (default: 5) # s3: # region: [AWS S3 Region] (default: us-east-1) # access_key: [AWS S3 Access Key] diff --git a/mongodb_consistent_backup/Common/Util.py b/mongodb_consistent_backup/Common/Util.py index e6f25744..26f6a33e 100644 --- a/mongodb_consistent_backup/Common/Util.py +++ b/mongodb_consistent_backup/Common/Util.py @@ -1,6 +1,7 @@ import socket from dateutil import parser +from select import select from mongodb_consistent_backup.Errors import OperationError @@ -31,3 +32,23 @@ def validate_hostname(hostname): socket.getaddrinfo(hostname, None) except socket.error, e: raise OperationError("Could not resolve host '%s', error: %s" % (hostname, e)) + + +def wait_popen(process, stderr_callback, stdout_callback): + try: + while not process.returncode: + poll = select([process.stderr.fileno(), process.stdout.fileno()], [], []) + if len(poll) >= 1: + for fd in poll[0]: + if process.stderr and fd == process.stderr.fileno(): + stderr_callback(process.stderr.readline().rstrip()) + if process.stdout and fd == process.stdout.fileno(): + stdout_callback(process.stdout.readline().rstrip()) + if process.poll() is not None: + break + stderr, stdout = process.communicate() + stderr_callback(stderr.rstrip()) + stdout_callback(stdout.rstrip()) + except Exception, e: + raise e + return True diff --git a/mongodb_consistent_backup/Common/__init__.py b/mongodb_consistent_backup/Common/__init__.py index aa30af72..27d38df6 100644 --- a/mongodb_consistent_backup/Common/__init__.py +++ b/mongodb_consistent_backup/Common/__init__.py @@ -4,4 +4,4 @@ from Lock import Lock # NOQA from MongoUri import MongoUri # NOQA from Timer import Timer # NOQA -from Util import config_to_string, is_datetime, parse_method, validate_hostname # NOQA +from Util import config_to_string, is_datetime, parse_method, validate_hostname, wait_popen # NOQA diff --git a/mongodb_consistent_backup/Upload/Gs/Gs.py b/mongodb_consistent_backup/Upload/Gs/Gs.py index 1d3cdd94..f3865eb1 100755 --- a/mongodb_consistent_backup/Upload/Gs/Gs.py +++ b/mongodb_consistent_backup/Upload/Gs/Gs.py @@ -27,12 +27,13 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs): super(Gs, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs) self.backup_location = self.config.backup.location self.remove_uploaded = self.config.upload.remove_uploaded + self.retries = self.config.upload.retries self.project_id = self.config.upload.gs.project_id self.access_key = self.config.upload.gs.access_key self.secret_key = self.config.upload.gs.secret_key self.bucket = self.config.upload.gs.bucket - self.threads(self.config.upload.gs.threads) + self.threads(self.config.upload.threads) self._pool = Pool(processes=self.threads()) def close(self): @@ -69,7 +70,8 @@ def run(self): self.project_id, self.access_key, self.secret_key, - self.remove_uploaded + self.remove_uploaded, + self.retries ).run) self._pool.close() self._pool.join() diff --git a/mongodb_consistent_backup/Upload/Gs/GsUploadThread.py b/mongodb_consistent_backup/Upload/Gs/GsUploadThread.py index ce6a5b9e..9352d258 100755 --- a/mongodb_consistent_backup/Upload/Gs/GsUploadThread.py +++ b/mongodb_consistent_backup/Upload/Gs/GsUploadThread.py @@ -7,7 +7,7 @@ class GsUploadThread: - def __init__(self, backup_dir, file_path, gs_path, bucket, project_id, access_key, secret_key, remove_uploaded=False): + def __init__(self, backup_dir, file_path, gs_path, bucket, project_id, access_key, secret_key, remove_uploaded=False, retries=5): self.backup_dir = backup_dir self.file_path = file_path self.gs_path = gs_path @@ -16,6 +16,7 @@ def __init__(self, backup_dir, file_path, gs_path, bucket, project_id, access_ke self.access_key = access_key self.secret_key = secret_key self.remove_uploaded = remove_uploaded + self.retries = retries self.path = "%s/%s" % (self.bucket, self.gs_path) self.meta_data_dir = "mongodb_consistent_backup-META" @@ -76,10 +77,21 @@ def run(self): logging.debug("Path %s does not exist, uploading" % self.path) try: - f = open(self.file_path, 'r') - uri = self.get_uri() - logging.info("Uploading %s to Google Cloud Storage" % self.path) - uri.new_key().set_contents_from_file(f) + f = open(self.file_path, 'r') + uri = self.get_uri() + retry = 0 + error = None + while retry < self.retries: + try: + logging.info("Uploading %s to Google Cloud Storage (attempt %i/%i)" % (self.path, retry, self.retries)) + uri.new_key().set_contents_from_file(f) + except Exception, e: + logging.error("Received error for Google Cloud Storage upload of %s: %s" % (self.path, e)) + error = e + retry += 1 + continue + if retry >= self.retries and error: + raise error finally: if f: f.close() diff --git a/mongodb_consistent_backup/Upload/Gs/__init__.py b/mongodb_consistent_backup/Upload/Gs/__init__.py index 7fd2f0bb..5505e2ea 100644 --- a/mongodb_consistent_backup/Upload/Gs/__init__.py +++ b/mongodb_consistent_backup/Upload/Gs/__init__.py @@ -7,5 +7,4 @@ def config(parser): parser.add_argument("--upload.gs.secret_key", dest="upload.gs.secret_key", help="Google Cloud Storage Uploader Secret Key (required for GS upload)", type=str) parser.add_argument("--upload.gs.bucket_name", dest="upload.gs.bucket_name", help="Google Cloud Storage Uploader destination bucket name", type=str) parser.add_argument("--upload.gs.bucket_prefix", dest="upload.gs.bucket_prefix", help="Google Cloud Storage Uploader destination bucket path prefix", type=str) - parser.add_argument("--upload.gs.threads", dest="upload.gs.threads", help="Google Cloud Storage Uploader worker threads (default: 4)", default=4, type=int) return parser diff --git a/mongodb_consistent_backup/Upload/Rsync/Rsync.py b/mongodb_consistent_backup/Upload/Rsync/Rsync.py new file mode 100644 index 00000000..49def6ae --- /dev/null +++ b/mongodb_consistent_backup/Upload/Rsync/Rsync.py @@ -0,0 +1,143 @@ +import os +import logging +import re + +from copy_reg import pickle +from multiprocessing import Pool +from subprocess import check_output +from types import MethodType + +from RsyncUploadThread import RsyncUploadThread + +from mongodb_consistent_backup.Common import config_to_string +from mongodb_consistent_backup.Errors import OperationError +from mongodb_consistent_backup.Pipeline import Task + + +# Allows pooled .apply_async()s to work on Class-methods: +def _reduce_method(m): + if m.im_self is None: + return getattr, (m.im_class, m.im_func.func_name) + else: + return getattr, (m.im_self, m.im_func.func_name) + + +pickle(MethodType, _reduce_method) + + +class Rsync(Task): + def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs): + super(Rsync, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs) + self.backup_location = self.config.backup.location + self.backup_name = self.config.backup.name + self.remove_uploaded = self.config.upload.remove_uploaded + self.rsync_path = self.config.upload.rsync.path + self.rsync_user = self.config.upload.rsync.user + self.rsync_host = self.config.upload.rsync.host + self.rsync_port = self.config.upload.rsync.port + self.rsync_ssh_key = self.config.upload.rsync.ssh_key + self.retries = self.config.upload.rsync.retries + self.thread_count = self.config.upload.rsync.threads + self.rsync_binary = "rsync" + + self.rsync_flags = ["--archive", "--compress"] + self.rsync_version = None + self._rsync_info = None + + self._pool = Pool(processes=self.threads()) + + def init(self): + if not self.host_has_rsync(): + raise OperationError("Cannot find rsync binary on this host!") + if not os.path.isdir(self.backup_dir): + logging.error("The source directory: %s does not exist or is not a directory! Skipping Rsync upload!" % self.backup_dir) + raise OperationError("The source directory: %s does not exist or is not a directory! Skipping Rsync upload!" % self.backup_dir) + + def rsync_info(self): + if not self._rsync_info: + output = check_output([self.rsync_binary, "--version"]) + search = re.search("^rsync\s+version\s([0-9.-]+)\s+protocol\sversion\s(\d+)", output) + self.rsync_version = search.group(1) + self._rsync_info = {"version": self.rsync_version, "protocol_version": int(search.group(2))} + return self._rsync_info + + def host_has_rsync(self): + if self.rsync_info(): + return True + return False + + def prepare_dest_dir(self): + # mkdir -p the rsync dest path via ssh + ssh_mkdir_cmd = ["ssh"] + if self.rsync_ssh_key: + ssh_mkdir_cmd.extend(["-i", self.rsync_ssh_key]) + ssh_mkdir_cmd.extend([ + "%s@%s" % (self.rsync_user, self.rsync_host), + "mkdir", "-p", self.base_dir + ]) + + # run the mkdir via ssh + try: + check_output(ssh_mkdir_cmd) + except Exception, e: + logging.error("Creating rsync dest path with ssh failed for %s: %s" % ( + self.rsync_host, + e + )) + raise e + + return True + + def done(self, data): + logging.info(data) + + def run(self): + try: + self.init() + self.timer.start(self.timer_name) + + logging.info("Preparing destination path on %s" % self.rsync_host) + self.prepare_dest_dir() + + rsync_config = { + "dest": "%s@%s:%s" % (self.rsync_user, self.rsync_host, self.rsync_path), + "threads": self.threads(), + "retries": self.retries + } + rsync_config.update(self.rsync_info()) + logging.info("Starting upload using rsync version %s (%s)" % ( + self.rsync_info()['version'], + config_to_string(rsync_config) + )) + for child in os.listdir(self.backup_dir): + self._pool.apply_async(RsyncUploadThread( + os.path.join(self.backup_dir, child), + self.base_dir, + self.rsync_flags, + self.rsync_path, + self.rsync_user, + self.rsync_host, + self.rsync_port, + self.rsync_ssh_key, + self.remove_uploaded, + self.retries + ).run, callback=self.done) + self.wait() + except Exception, e: + logging.error("Rsync upload failed! Error: %s" % e) + raise OperationError(e) + finally: + self.timer.stop(self.timer_name) + self.completed = True + + def wait(self): + if self._pool: + logging.info("Waiting for Rsync upload threads to stop") + self._pool.close() + self._pool.join() + + def close(self): + if self._pool: + logging.error("Stopping Rsync upload threads") + self._pool.terminate() + self._pool.join() diff --git a/mongodb_consistent_backup/Upload/Rsync/RsyncUploadThread.py b/mongodb_consistent_backup/Upload/Rsync/RsyncUploadThread.py new file mode 100644 index 00000000..6e4933e6 --- /dev/null +++ b/mongodb_consistent_backup/Upload/Rsync/RsyncUploadThread.py @@ -0,0 +1,82 @@ +import logging +import os + +from shutil import rmtree +from subprocess import Popen, PIPE + +from mongodb_consistent_backup.Common import wait_popen + + +class RsyncUploadThread: + def __init__(self, src_path, base_path, rsync_flags, rsync_path, rsync_user, rsync_host, + rsync_port=22, rsync_ssh_key=None, remove_uploaded=False, retries=5, + rsync_binary="rsync"): + self.src_path = src_path + self.base_path = base_path + self.rsync_flags = rsync_flags + self.rsync_path = rsync_path + self.rsync_user = rsync_user + self.rsync_host = rsync_host + self.rsync_port = rsync_port + self.rsync_ssh_key = rsync_ssh_key + self.remove_uploaded = remove_uploaded + self.retries = retries + self.rsync_binary = rsync_binary + + self.completed = False + self.rsync_url = None + self.rsync_cmd = None + self.meta_dir = "mongodb-consistent-backup_META" + + def init(self): + self.rsync_url = "%s@%s:%s" % (self.rsync_user, self.rsync_host, self.get_dest_path()) + self.rsync_cmd = [self.rsync_binary] + self.rsync_cmd.extend(self.rsync_flags) + self.rsync_cmd.extend([self.src_path, self.rsync_url]) + + def get_dest_path(self): + return os.path.join(self.rsync_path, self.base_path) + + def handle_success(self): + if self.remove_uploaded: + if self.meta_dir in self.src_path: + logging.info("Skipping removal of metadata path: %s" % self.src_path) + else: + logging.info("Removing uploaded path: %s" % self.src_path) + rmtree(self.src_path) + + def stderr(self, data): + if data: + logging.error(data) + + def stdout(self, data): + if data: + logging.info(data) + + def do_rsync(self): + # do the rsync + self._command = Popen(self.rsync_cmd, stderr=PIPE, stdout=PIPE) + wait_popen(self._command, self.stderr, self.stdout) + + def run(self): + self.init() + try: + logging.info("Uploading to %s" % (self.rsync_url)) + logging.debug("Rsync cmd: %s" % self.rsync_cmd) + self._command = Popen(self.rsync_cmd, stderr=PIPE, stdout=PIPE) + self.completed = wait_popen(self._command, self.stderr, self.stdout) + + if self.completed: + self.handle_success() + finally: + self.close() + return self.completed, self.src_path + + def close(self, code=None, frame=None): + logging.info("Stopping upload to %s@%s:%s" % ( + self.rsync_user, + self.rsync_host, + self.dest_path + )) + if not self.completed and self._command: + self._command.terminate() diff --git a/mongodb_consistent_backup/Upload/Rsync/__init__.py b/mongodb_consistent_backup/Upload/Rsync/__init__.py new file mode 100644 index 00000000..9b448dae --- /dev/null +++ b/mongodb_consistent_backup/Upload/Rsync/__init__.py @@ -0,0 +1,12 @@ +from Rsync import Rsync # NOQA + + +def config(parser): + parser.add_argument("--upload.rsync.path", dest="upload.rsync.path", help="Rsync upload base destination path (default: /)", default='/', type=str) + parser.add_argument("--upload.rsync.user", dest="upload.rsync.user", help="Rsync upload SSH username (default: current)", default=None, type=str) + parser.add_argument("--upload.rsync.host", dest="upload.rsync.host", help="Rsync upload SSH hostname/IP", default=None, type=str) + parser.add_argument("--upload.rsync.port", dest="upload.rsync.port", help="Rsync upload SSH port number (default: 22)", default=22, type=int) + parser.add_argument("--upload.rsync.ssh_key", dest="upload.rsync.ssh_key", help="Rsync upload SSH key path", default=None, type=str) + parser.add_argument("--upload.rsync.retries", dest="upload.rsync.retries", help="Rsync upload retries (default: 5)", default=5, type=int) + parser.add_argument("--upload.rsync.threads", dest="upload.rsync.threads", help="Rsync upload threads (default: 4)", default=4, type=int) + return parser diff --git a/mongodb_consistent_backup/Upload/S3/S3.py b/mongodb_consistent_backup/Upload/S3/S3.py index 9cf2bec9..5ec3c2b9 100644 --- a/mongodb_consistent_backup/Upload/S3/S3.py +++ b/mongodb_consistent_backup/Upload/S3/S3.py @@ -29,16 +29,16 @@ class S3(Task): def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs): super(S3, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs) self.remove_uploaded = self.config.upload.remove_uploaded + self.retries = self.config.upload.retries + self.thread_count = self.config.upload.threads self.region = self.config.upload.s3.region self.bucket_name = self.config.upload.s3.bucket_name self.bucket_prefix = self.config.upload.s3.bucket_prefix self.access_key = self.config.upload.s3.access_key self.secret_key = self.config.upload.s3.secret_key - self.thread_count = self.config.upload.s3.threads self.chunk_size_mb = self.config.upload.s3.chunk_size_mb self.chunk_size = self.chunk_size_mb * 1024 * 1024 self.secure = self.config.upload.s3.secure - self.retries = self.config.upload.s3.retries self.s3_acl = self.config.upload.s3.acl self.key_prefix = base_dir diff --git a/mongodb_consistent_backup/Upload/S3/__init__.py b/mongodb_consistent_backup/Upload/S3/__init__.py index ae5f1e4c..c3b4a312 100644 --- a/mongodb_consistent_backup/Upload/S3/__init__.py +++ b/mongodb_consistent_backup/Upload/S3/__init__.py @@ -7,9 +7,7 @@ def config(parser): parser.add_argument("--upload.s3.secret_key", dest="upload.s3.secret_key", help="S3 Uploader AWS Secret Key (required for S3 upload)", type=str) parser.add_argument("--upload.s3.bucket_name", dest="upload.s3.bucket_name", help="S3 Uploader destination bucket name", type=str) parser.add_argument("--upload.s3.bucket_prefix", dest="upload.s3.bucket_prefix", help="S3 Uploader destination bucket path prefix", type=str) - parser.add_argument("--upload.s3.threads", dest="upload.s3.threads", help="S3 Uploader worker threads (default: 4)", default=4, type=int) parser.add_argument("--upload.s3.chunk_size_mb", dest="upload.s3.chunk_size_mb", help="S3 Uploader upload chunk size, in megabytes (default: 50)", default=50, type=int) parser.add_argument("--upload.s3.secure", dest="upload.s3.secure", help="S3 Uploader connect over SSL (default: true)", default=True, action="store_false") - parser.add_argument("--upload.s3.retries", dest="upload.s3.retries", help="S3 Uploader retry times (default: 5)", default=5, type=int) parser.add_argument("--upload.s3.acl", dest="upload.s3.acl", help="S3 Uploader ACL associated with objects (default: none)", default=None, type=str) return parser diff --git a/mongodb_consistent_backup/Upload/Upload.py b/mongodb_consistent_backup/Upload/Upload.py index a77c47b9..5dad240d 100644 --- a/mongodb_consistent_backup/Upload/Upload.py +++ b/mongodb_consistent_backup/Upload/Upload.py @@ -1,5 +1,6 @@ from mongodb_consistent_backup.Upload.Gs import Gs # NOQA from mongodb_consistent_backup.Upload.S3 import S3 # NOQA +from mongodb_consistent_backup.Upload.Rsync import Rsync # NOQA from mongodb_consistent_backup.Pipeline import Stage diff --git a/mongodb_consistent_backup/Upload/__init__.py b/mongodb_consistent_backup/Upload/__init__.py index 4f5412a7..0e42dad8 100644 --- a/mongodb_consistent_backup/Upload/__init__.py +++ b/mongodb_consistent_backup/Upload/__init__.py @@ -2,6 +2,8 @@ def config(parser): - parser.add_argument("--upload.method", dest="upload.method", help="Uploader method (default: none)", default='none', choices=['gs', 's3', 'none']) + parser.add_argument("--upload.method", dest="upload.method", help="Uploader method (default: none)", default='none', choices=['gs', 'rsync', 's3', 'none']) parser.add_argument("--upload.remove_uploaded", dest="upload.remove_uploaded", help="Remove source files after successful upload (default: false)", default=False, action="store_true") + parser.add_argument("--upload.retries", dest="upload.retries", help="Number of times to retry upload attempts (default: 5)", default=5, type=int) + parser.add_argument("--upload.threads", dest="upload.threads", help="Number of threads to use for upload (default: 4)", default=4, type=int) return parser