From 3b6d43dc9f076d0a45413ec0b71a0ec7302501bd Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Sat, 6 May 2017 00:53:51 +0200 Subject: [PATCH 1/6] GS draft so far --- conf/mongodb-consistent-backup.example.conf | 13 +++-- mongodb_consistent_backup/Upload/GS/GS.py | 49 +++++++++++++++++++ .../Upload/GS/__init__.py | 10 ++++ .../Upload/S3/__init__.py | 2 +- mongodb_consistent_backup/Upload/Upload.py | 1 + mongodb_consistent_backup/Upload/__init__.py | 2 +- 6 files changed, 72 insertions(+), 5 deletions(-) create mode 100755 mongodb_consistent_backup/Upload/GS/GS.py create mode 100644 mongodb_consistent_backup/Upload/GS/__init__.py diff --git a/conf/mongodb-consistent-backup.example.conf b/conf/mongodb-consistent-backup.example.conf index fef12a24..1c984d1f 100644 --- a/conf/mongodb-consistent-backup.example.conf +++ b/conf/mongodb-consistent-backup.example.conf @@ -48,9 +48,16 @@ production: upload: method: none # remove_uploaded: [true|false] (default: false) + # gs: + # project_id: [Google Cloud Project ID] + # access_key: [Google Cloud Storage Access Key] + # secret_key: [Google Cloud Storage Secret Key] + # bucket_name: [Google Cloud Storage Bucket Name] + # bucket_prefix: [prefix] (default: /) + # threads: [1+] (default: 1 per CPU) # s3: - # access_key: [AWS S3 Access Key] - # secret_key: [AWS S3 Secret Key] - # bucket_name: [AWS S3 Bucket Name] + # access_key: [AWS S3 Access Key] + # secret_key: [AWS S3 Secret Key] + # bucket_name: [AWS S3 Bucket Name] # bucket_prefix: [prefix] (default: /) # threads: [1+] (default: 1 per CPU) diff --git a/mongodb_consistent_backup/Upload/GS/GS.py b/mongodb_consistent_backup/Upload/GS/GS.py new file mode 100755 index 00000000..516fa2cc --- /dev/null +++ b/mongodb_consistent_backup/Upload/GS/GS.py @@ -0,0 +1,49 @@ +import boto +import logging +import os +import time + +from mongodb_consistent_backup.Errors import OperationError +from mongodb_consistent_backup.Pipeline import Task + + +class GS(Task): + def __init__(self): + self.remove_uploaded = self.config.upload.remove_uploaded + self.project_id = self.config.upload.gs.project_id + self.access_key = self.config.upload.gs.access_key + self.secret_key = self.config.upload.gs.secret_key + self.bucket_name = self.config.upload.gs.bucket_name + self.bucket_prefix = self.config.upload.gs.bucket_prefix + self.thread_count = self.config.upload.gs.threads + + self.boto_scheme = 'gs' + self.header_values = {"x-goog-project-id": self.project_id} + + def init(self): + try: + if not boto.config.has_section("Credentials"): + boto.config.add_section("Credentials") + boto.config.set("Credentials", "gs_access_key_id", self.access_key) + boto.config.set("Credentials", "gs_secret_access_key", self.secret_key) + if not boto.config.has_section("Boto"): + boto.config.add_section("Boto") + boto.config.setbool('Boto', 'https_validate_certificates', True) + except Exception, e: + return OperationError("Error setting up boto for Google Cloud Storage: '%s'!" % e) + + def upload_file(self, filename): + f = None + if os.path.exists(filename): + logging.info("Uploading file to GS: %s" % filename) + try: + path = os.path.join(self.bucket_name, self.bucket_prefix, os.path.basename(filename)) + f = open(filename, 'r') + uri = boto.storage_uri(path, self.boto_scheme) + uri.new_key().set_contents_from_file(f) + return path + except Exception, e: + return OperationError("Failed to upload file to GS: %s" % filename) + finally: + if f: + f.close() diff --git a/mongodb_consistent_backup/Upload/GS/__init__.py b/mongodb_consistent_backup/Upload/GS/__init__.py new file mode 100644 index 00000000..2cb6f599 --- /dev/null +++ b/mongodb_consistent_backup/Upload/GS/__init__.py @@ -0,0 +1,10 @@ +from GS import GS + +def config(parser): + parser.add_argument("--upload.gs.project_id", dest="upload.gs.project_id", help="Google Cloud Storage Uploader Project ID (required for GS upload)", type=str) + parser.add_argument("--upload.gs.access_key", dest="upload.gs.access_key", help="Google Cloud Storage Uploader Access Key (required for GS upload)", type=str) + parser.add_argument("--upload.gs.secret_key", dest="upload.gs.secret_key", help="Google Cloud Storage Uploader Secret Key (required for GS upload)", type=str) + parser.add_argument("--upload.gs.bucket_name", dest="upload.gs.bucket_name", help="Google Cloud Storage Uploader destination bucket name", type=str) + parser.add_argument("--upload.gs.bucket_prefix", dest="upload.gs.bucket_prefix", help="Google Cloud Storage Uploader destination bucket path prefix", type=str) + parser.add_argument("--upload.gs.threads", dest="upload.gs.threads", help="Google Cloud Storage Uploader worker threads (default: 4)", default=4, type=int) + return parser diff --git a/mongodb_consistent_backup/Upload/S3/__init__.py b/mongodb_consistent_backup/Upload/S3/__init__.py index b3c77b6f..b167efd0 100644 --- a/mongodb_consistent_backup/Upload/S3/__init__.py +++ b/mongodb_consistent_backup/Upload/S3/__init__.py @@ -5,6 +5,6 @@ def config(parser): parser.add_argument("--upload.s3.secret_key", dest="upload.s3.secret_key", help="S3 Uploader AWS Secret Key (required for S3 upload)", type=str) parser.add_argument("--upload.s3.bucket_name", dest="upload.s3.bucket_name", help="S3 Uploader destination bucket name", type=str) parser.add_argument("--upload.s3.bucket_prefix", dest="upload.s3.bucket_prefix", help="S3 Uploader destination bucket path prefix", type=str) - parser.add_argument("--upload.s3.threads", dest="upload.s3.threads", help="S3 Uploader upload worker threads (default: 4)", default=4, type=int) + parser.add_argument("--upload.s3.threads", dest="upload.s3.threads", help="S3 Uploader worker threads (default: 4)", default=4, type=int) parser.add_argument("--upload.s3.chunk_size_mb", dest="upload.s3.chunk_size_mb", help="S3 Uploader upload chunk size, in megabytes (default: 50)", default=50, type=int) return parser diff --git a/mongodb_consistent_backup/Upload/Upload.py b/mongodb_consistent_backup/Upload/Upload.py index 615fe37c..7a1c2580 100644 --- a/mongodb_consistent_backup/Upload/Upload.py +++ b/mongodb_consistent_backup/Upload/Upload.py @@ -1,3 +1,4 @@ +from mongodb_consistent_backup.Upload.GS import GS from mongodb_consistent_backup.Upload.S3 import S3 from mongodb_consistent_backup.Pipeline import Stage diff --git a/mongodb_consistent_backup/Upload/__init__.py b/mongodb_consistent_backup/Upload/__init__.py index 84cb4b2d..fa10dfa6 100644 --- a/mongodb_consistent_backup/Upload/__init__.py +++ b/mongodb_consistent_backup/Upload/__init__.py @@ -2,6 +2,6 @@ def config(parser): - parser.add_argument("--upload.method", dest="upload.method", help="Uploader method (default: none)", default='none', choices=['s3', 'none']) + parser.add_argument("--upload.method", dest="upload.method", help="Uploader method (default: none)", default='none', choices=['gs', 's3', 'none']) parser.add_argument("--upload.remove_uploaded", dest="upload.remove_uploaded",help="Remove source files after successful upload (default: false)", default=False, action="store_true") return parser From 233d6da7f43d1af40a64cd31ba4a70ccbcf581fb Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 11 May 2017 13:10:40 +0200 Subject: [PATCH 2/6] gcs stuff --- mongodb_consistent_backup/Upload/GS/GS.py | 42 ++--------------------- 1 file changed, 3 insertions(+), 39 deletions(-) diff --git a/mongodb_consistent_backup/Upload/GS/GS.py b/mongodb_consistent_backup/Upload/GS/GS.py index 516fa2cc..5d1f468d 100755 --- a/mongodb_consistent_backup/Upload/GS/GS.py +++ b/mongodb_consistent_backup/Upload/GS/GS.py @@ -5,45 +5,9 @@ from mongodb_consistent_backup.Errors import OperationError from mongodb_consistent_backup.Pipeline import Task +from mongodb_consistent_backup.Upload.GS import GSUploadThread class GS(Task): - def __init__(self): - self.remove_uploaded = self.config.upload.remove_uploaded - self.project_id = self.config.upload.gs.project_id - self.access_key = self.config.upload.gs.access_key - self.secret_key = self.config.upload.gs.secret_key - self.bucket_name = self.config.upload.gs.bucket_name - self.bucket_prefix = self.config.upload.gs.bucket_prefix - self.thread_count = self.config.upload.gs.threads - - self.boto_scheme = 'gs' - self.header_values = {"x-goog-project-id": self.project_id} - - def init(self): - try: - if not boto.config.has_section("Credentials"): - boto.config.add_section("Credentials") - boto.config.set("Credentials", "gs_access_key_id", self.access_key) - boto.config.set("Credentials", "gs_secret_access_key", self.secret_key) - if not boto.config.has_section("Boto"): - boto.config.add_section("Boto") - boto.config.setbool('Boto', 'https_validate_certificates', True) - except Exception, e: - return OperationError("Error setting up boto for Google Cloud Storage: '%s'!" % e) - - def upload_file(self, filename): - f = None - if os.path.exists(filename): - logging.info("Uploading file to GS: %s" % filename) - try: - path = os.path.join(self.bucket_name, self.bucket_prefix, os.path.basename(filename)) - f = open(filename, 'r') - uri = boto.storage_uri(path, self.boto_scheme) - uri.new_key().set_contents_from_file(f) - return path - except Exception, e: - return OperationError("Failed to upload file to GS: %s" % filename) - finally: - if f: - f.close() + def __init__(self, config): + self.config = config From bc1e648957c54d8380f546efc633b0c9745cc2a3 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 22 Jun 2017 19:02:04 +0200 Subject: [PATCH 3/6] GCS uploads working --- mongodb_consistent_backup/Pipeline/Stage.py | 2 +- mongodb_consistent_backup/Upload/GS/GS.py | 13 -- mongodb_consistent_backup/Upload/Gs/Gs.py | 112 ++++++++++++++++++ .../Upload/{GS => Gs}/__init__.py | 3 +- mongodb_consistent_backup/Upload/Upload.py | 2 +- 5 files changed, 116 insertions(+), 16 deletions(-) delete mode 100755 mongodb_consistent_backup/Upload/GS/GS.py create mode 100755 mongodb_consistent_backup/Upload/Gs/Gs.py rename mongodb_consistent_backup/Upload/{GS => Gs}/__init__.py (98%) diff --git a/mongodb_consistent_backup/Pipeline/Stage.py b/mongodb_consistent_backup/Pipeline/Stage.py index efc6c167..eda46ba6 100644 --- a/mongodb_consistent_backup/Pipeline/Stage.py +++ b/mongodb_consistent_backup/Pipeline/Stage.py @@ -35,7 +35,7 @@ def init(self): module = sys.modules["%s.%s" % (self.stage, self.task.capitalize())] mod_class = getattr(module, self.task.capitalize()) except LookupError, e: - raise OperationError('Could not load task: %s' % self.task) + raise OperationError('Could not load task %s: %s' % (self.task, e)) if mod_class: self._task = mod_class( self.manager, diff --git a/mongodb_consistent_backup/Upload/GS/GS.py b/mongodb_consistent_backup/Upload/GS/GS.py deleted file mode 100755 index 5d1f468d..00000000 --- a/mongodb_consistent_backup/Upload/GS/GS.py +++ /dev/null @@ -1,13 +0,0 @@ -import boto -import logging -import os -import time - -from mongodb_consistent_backup.Errors import OperationError -from mongodb_consistent_backup.Pipeline import Task -from mongodb_consistent_backup.Upload.GS import GSUploadThread - - -class GS(Task): - def __init__(self, config): - self.config = config diff --git a/mongodb_consistent_backup/Upload/Gs/Gs.py b/mongodb_consistent_backup/Upload/Gs/Gs.py new file mode 100755 index 00000000..b534e650 --- /dev/null +++ b/mongodb_consistent_backup/Upload/Gs/Gs.py @@ -0,0 +1,112 @@ +import boto +import logging +import os +import time + +from mongodb_consistent_backup.Errors import OperationError +from mongodb_consistent_backup.Pipeline import Task + + +class Gs(Task): + def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs): + super(Gs, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs) + self.backup_loc = self.config.backup.location + self.project_id = self.config.upload.gs.project_id + self.access_key = self.config.upload.gs.access_key + self.secret_key = self.config.upload.gs.secret_key + self.bucket = self.config.upload.gs.bucket + + self._header_values = {"x-goog-project-id": self.project_id} + self._key_meta_cache = {} + + self.init() + + def init(self): + if not boto.config.has_section("Credentials"): + boto.config.add_section("Credentials") + boto.config.set("Credentials", "gs_access_key_id", self.access_key) + boto.config.set("Credentials", "gs_secret_access_key", self.secret_key) + if not boto.config.has_section("Boto"): + boto.config.add_section("Boto") + boto.config.setbool('Boto', 'https_validate_certificates', True) + + def close(self): + pass + + def get_uri(self, path): + return boto.storage_uri(path, 'gs') + + def object_exists(self, path): + try: + self.get_object_metadata(path) + return True + except boto.exception.InvalidUriError: + pass + return False + + def get_object_metadata(self, path, force=False): + if force or not path in self._key_meta_cache: + logging.debug("Getting metadata for path: %s" % path) + uri = self.get_uri(path) + self._key_meta_cache[path] = uri.get_key() + if path in self._key_meta_cache: + return self._key_meta_cache[path] + + def get_object_md5hash(self, path): + key = self.get_object_metadata(path) + if hasattr(key, 'etag'): + return key.etag.strip('"\'') + + def get_file_md5hash(self, filename, blocksize=65536): + md5 = hashlib.md5() + with open(filename, "rb") as f: + for block in iter(lambda: f.read(blocksize), b""): + md5.update(block) + return md5.hexdigest() + + def upload(self, filename, path=None): + if not path: + path = filename + path = "%s/%s" % (self.bucket, path) + if self.object_exists(path): + object_md5hash = self.get_object_md5hash(path) + if object_md5hash and self.get_file_md5hash(filename) == object_md5hash: + logging.debug("Path %s already exists with the same checksum (%s), skipping" % (path, object_md5hash)) + return + logging.debug("Path %s checksum and local checksum differ, re-uploading" % path) + return self.upload_object(path) + logging.debug("Path %s does not exist, uploading" % path) + return self.upload_object(filename, path) + + def upload_object(self, filename, path): + f = None + try: + f = open(filename, 'r') + uri = self.get_uri(path) + logging.debug("Uploading object to GS: %s" % path) + return uri.new_key().set_contents_from_file(f) + finally: + if f: + f.close() + + def run(self): + if not os.path.isdir(self.backup_dir): + logging.error("The source directory: %s does not exist or is not a directory! Skipping GS Upload!" % self.backup_dir) + return + try: + self.running = True + self.timer.start(self.timer_name) + for file_name in os.listdir(self.backup_dir): + file_path = os.path.join(self.backup_dir, file_name) + gs_path = os.path.join(self.base_dir, file_name) + # skip mongodb-consistent-backup_META dir + if os.path.isdir(file_path): + continue + self.upload(file_path, gs_path) + self.exit_code = 0 + self.completed = True + except Exception, e: + logging.error("Uploading to GS failed! Error: %s" % e) + raise OperationError(e) + finally: + self.stopped = True diff --git a/mongodb_consistent_backup/Upload/GS/__init__.py b/mongodb_consistent_backup/Upload/Gs/__init__.py similarity index 98% rename from mongodb_consistent_backup/Upload/GS/__init__.py rename to mongodb_consistent_backup/Upload/Gs/__init__.py index 2cb6f599..f87b6e2a 100644 --- a/mongodb_consistent_backup/Upload/GS/__init__.py +++ b/mongodb_consistent_backup/Upload/Gs/__init__.py @@ -1,4 +1,5 @@ -from GS import GS +from Gs import Gs + def config(parser): parser.add_argument("--upload.gs.project_id", dest="upload.gs.project_id", help="Google Cloud Storage Uploader Project ID (required for GS upload)", type=str) diff --git a/mongodb_consistent_backup/Upload/Upload.py b/mongodb_consistent_backup/Upload/Upload.py index 7a1c2580..e9fd22e2 100644 --- a/mongodb_consistent_backup/Upload/Upload.py +++ b/mongodb_consistent_backup/Upload/Upload.py @@ -1,4 +1,4 @@ -from mongodb_consistent_backup.Upload.GS import GS +from mongodb_consistent_backup.Upload.Gs import Gs from mongodb_consistent_backup.Upload.S3 import S3 from mongodb_consistent_backup.Pipeline import Stage From a4ed24bcaea257dbacef08cf4d48735c5089a120 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 22 Jun 2017 19:41:11 +0200 Subject: [PATCH 4/6] Code cleanup and added .get_backup_files() to walk subdirs if archiving is disabled --- mongodb_consistent_backup/Upload/Gs/Gs.py | 29 +++++++++++++++-------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/mongodb_consistent_backup/Upload/Gs/Gs.py b/mongodb_consistent_backup/Upload/Gs/Gs.py index b534e650..4289815d 100755 --- a/mongodb_consistent_backup/Upload/Gs/Gs.py +++ b/mongodb_consistent_backup/Upload/Gs/Gs.py @@ -33,6 +33,17 @@ def init(self): def close(self): pass + def get_backup_files(self, base_dir=None, files=[]): + if not base_dir: + base_dir = self.backup_dir + for child in os.listdir(base_dir): + path = os.path.join(base_dir, child) + if os.path.isfile(path): + files.append(path) + elif os.path.isdir(path): + self.get_backup_files(path, files) + return files + def get_uri(self, path): return boto.storage_uri(path, 'gs') @@ -67,7 +78,7 @@ def get_file_md5hash(self, filename, blocksize=65536): def upload(self, filename, path=None): if not path: path = filename - path = "%s/%s" % (self.bucket, path) + path = "%s/%s" % (self.bucket, path) if self.object_exists(path): object_md5hash = self.get_object_md5hash(path) if object_md5hash and self.get_file_md5hash(filename) == object_md5hash: @@ -83,7 +94,7 @@ def upload_object(self, filename, path): try: f = open(filename, 'r') uri = self.get_uri(path) - logging.debug("Uploading object to GS: %s" % path) + logging.info("Uploading object to Google Cloud Storage: %s" % path) return uri.new_key().set_contents_from_file(f) finally: if f: @@ -91,22 +102,20 @@ def upload_object(self, filename, path): def run(self): if not os.path.isdir(self.backup_dir): - logging.error("The source directory: %s does not exist or is not a directory! Skipping GS Upload!" % self.backup_dir) + logging.error("The source directory: %s does not exist or is not a directory! Skipping Google Cloud Storage upload!" % self.backup_dir) return try: self.running = True self.timer.start(self.timer_name) - for file_name in os.listdir(self.backup_dir): - file_path = os.path.join(self.backup_dir, file_name) - gs_path = os.path.join(self.base_dir, file_name) - # skip mongodb-consistent-backup_META dir - if os.path.isdir(file_path): - continue + logging.info("Uploading backup dir %s to Google Cloud Storage bucket: %s" % (self.backup_dir, self.bucket)) + for file_path in self.get_backup_files(): + gs_path = os.path.relpath(file_path, self.backup_loc) self.upload(file_path, gs_path) self.exit_code = 0 self.completed = True except Exception, e: - logging.error("Uploading to GS failed! Error: %s" % e) + logging.error("Uploading to Google Cloud Storage failed! Error: %s" % e) raise OperationError(e) finally: + self.timer.stop(self.timer_name) self.stopped = True From 6010cfe751a1d6e56ef9cfd2fbededcdd77b9b93 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 22 Jun 2017 19:50:28 +0200 Subject: [PATCH 5/6] Support remove uploaded --- mongodb_consistent_backup/Upload/Gs/Gs.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/mongodb_consistent_backup/Upload/Gs/Gs.py b/mongodb_consistent_backup/Upload/Gs/Gs.py index 4289815d..cfc6793f 100755 --- a/mongodb_consistent_backup/Upload/Gs/Gs.py +++ b/mongodb_consistent_backup/Upload/Gs/Gs.py @@ -10,11 +10,12 @@ class Gs(Task): def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs): super(Gs, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs) - self.backup_loc = self.config.backup.location - self.project_id = self.config.upload.gs.project_id - self.access_key = self.config.upload.gs.access_key - self.secret_key = self.config.upload.gs.secret_key - self.bucket = self.config.upload.gs.bucket + self.backup_location = self.config.backup.location + self.remove_uploaded = self.config.upload.remove_uploaded + self.project_id = self.config.upload.gs.project_id + self.access_key = self.config.upload.gs.access_key + self.secret_key = self.config.upload.gs.secret_key + self.bucket = self.config.upload.gs.bucket self._header_values = {"x-goog-project-id": self.project_id} self._key_meta_cache = {} @@ -95,11 +96,17 @@ def upload_object(self, filename, path): f = open(filename, 'r') uri = self.get_uri(path) logging.info("Uploading object to Google Cloud Storage: %s" % path) - return uri.new_key().set_contents_from_file(f) + uri.new_key().set_contents_from_file(f) + self.handle_uploaded(filename) finally: if f: f.close() + def handle_uploaded(self, local_path): + if self.remove_uploaded: + logging.debug("Removing successfully uploaded file: %s" % local_path) + os.remove(local_path) + def run(self): if not os.path.isdir(self.backup_dir): logging.error("The source directory: %s does not exist or is not a directory! Skipping Google Cloud Storage upload!" % self.backup_dir) @@ -109,7 +116,7 @@ def run(self): self.timer.start(self.timer_name) logging.info("Uploading backup dir %s to Google Cloud Storage bucket: %s" % (self.backup_dir, self.bucket)) for file_path in self.get_backup_files(): - gs_path = os.path.relpath(file_path, self.backup_loc) + gs_path = os.path.relpath(file_path, self.backup_location) self.upload(file_path, gs_path) self.exit_code = 0 self.completed = True From 83fccf776cfa1f760ca3060aedc68cc5e92c4c8d Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 22 Jun 2017 19:59:13 +0200 Subject: [PATCH 6/6] Don't remove meta dir --- mongodb_consistent_backup/Upload/Gs/Gs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mongodb_consistent_backup/Upload/Gs/Gs.py b/mongodb_consistent_backup/Upload/Gs/Gs.py index cfc6793f..b6d6c73a 100755 --- a/mongodb_consistent_backup/Upload/Gs/Gs.py +++ b/mongodb_consistent_backup/Upload/Gs/Gs.py @@ -17,6 +17,7 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs): self.secret_key = self.config.upload.gs.secret_key self.bucket = self.config.upload.gs.bucket + self.meta_data_dir = "mongodb-consistent-backup_META" self._header_values = {"x-goog-project-id": self.project_id} self._key_meta_cache = {} @@ -103,7 +104,7 @@ def upload_object(self, filename, path): f.close() def handle_uploaded(self, local_path): - if self.remove_uploaded: + if self.remove_uploaded and not local_path.startswith(os.path.join(self.backup_dir, self.meta_data_dir)): logging.debug("Removing successfully uploaded file: %s" % local_path) os.remove(local_path)