Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions conf/mongodb-consistent-backup.example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ production:
upload:
method: none
# remove_uploaded: [true|false] (default: false)
# gs:
# project_id: [Google Cloud Project ID]
# access_key: [Google Cloud Storage Access Key]
# secret_key: [Google Cloud Storage Secret Key]
# bucket_name: [Google Cloud Storage Bucket Name]
# bucket_prefix: [prefix] (default: /)
# threads: [1+] (default: 1 per CPU)
# s3:
# region: [AWS S3 Region] (default: us-east-1)
# access_key: [AWS S3 Access Key]
Expand Down
2 changes: 1 addition & 1 deletion mongodb_consistent_backup/Pipeline/Stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def init(self):
module = sys.modules["%s.%s" % (self.stage, self.task.capitalize())]
mod_class = getattr(module, self.task.capitalize())
except LookupError, e:
raise OperationError('Could not load task: %s' % self.task)
raise OperationError('Could not load task %s: %s' % (self.task, e))
if mod_class:
self._task = mod_class(
self.manager,
Expand Down
129 changes: 129 additions & 0 deletions mongodb_consistent_backup/Upload/Gs/Gs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import boto
import logging
import os
import time

from mongodb_consistent_backup.Errors import OperationError
from mongodb_consistent_backup.Pipeline import Task


class Gs(Task):
def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
super(Gs, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs)
self.backup_location = self.config.backup.location
self.remove_uploaded = self.config.upload.remove_uploaded
self.project_id = self.config.upload.gs.project_id
self.access_key = self.config.upload.gs.access_key
self.secret_key = self.config.upload.gs.secret_key
self.bucket = self.config.upload.gs.bucket

self.meta_data_dir = "mongodb-consistent-backup_META"
self._header_values = {"x-goog-project-id": self.project_id}
self._key_meta_cache = {}

self.init()

def init(self):
if not boto.config.has_section("Credentials"):
boto.config.add_section("Credentials")
boto.config.set("Credentials", "gs_access_key_id", self.access_key)
boto.config.set("Credentials", "gs_secret_access_key", self.secret_key)
if not boto.config.has_section("Boto"):
boto.config.add_section("Boto")
boto.config.setbool('Boto', 'https_validate_certificates', True)

def close(self):
pass

def get_backup_files(self, base_dir=None, files=[]):
if not base_dir:
base_dir = self.backup_dir
for child in os.listdir(base_dir):
path = os.path.join(base_dir, child)
if os.path.isfile(path):
files.append(path)
elif os.path.isdir(path):
self.get_backup_files(path, files)
return files

def get_uri(self, path):
return boto.storage_uri(path, 'gs')

def object_exists(self, path):
try:
self.get_object_metadata(path)
return True
except boto.exception.InvalidUriError:
pass
return False

def get_object_metadata(self, path, force=False):
if force or not path in self._key_meta_cache:
logging.debug("Getting metadata for path: %s" % path)
uri = self.get_uri(path)
self._key_meta_cache[path] = uri.get_key()
if path in self._key_meta_cache:
return self._key_meta_cache[path]

def get_object_md5hash(self, path):
key = self.get_object_metadata(path)
if hasattr(key, 'etag'):
return key.etag.strip('"\'')

def get_file_md5hash(self, filename, blocksize=65536):
md5 = hashlib.md5()
with open(filename, "rb") as f:
for block in iter(lambda: f.read(blocksize), b""):
md5.update(block)
return md5.hexdigest()

def upload(self, filename, path=None):
if not path:
path = filename
path = "%s/%s" % (self.bucket, path)
if self.object_exists(path):
object_md5hash = self.get_object_md5hash(path)
if object_md5hash and self.get_file_md5hash(filename) == object_md5hash:
logging.debug("Path %s already exists with the same checksum (%s), skipping" % (path, object_md5hash))
return
logging.debug("Path %s checksum and local checksum differ, re-uploading" % path)
return self.upload_object(path)
logging.debug("Path %s does not exist, uploading" % path)
return self.upload_object(filename, path)

def upload_object(self, filename, path):
f = None
try:
f = open(filename, 'r')
uri = self.get_uri(path)
logging.info("Uploading object to Google Cloud Storage: %s" % path)
uri.new_key().set_contents_from_file(f)
self.handle_uploaded(filename)
finally:
if f:
f.close()

def handle_uploaded(self, local_path):
if self.remove_uploaded and not local_path.startswith(os.path.join(self.backup_dir, self.meta_data_dir)):
logging.debug("Removing successfully uploaded file: %s" % local_path)
os.remove(local_path)

def run(self):
if not os.path.isdir(self.backup_dir):
logging.error("The source directory: %s does not exist or is not a directory! Skipping Google Cloud Storage upload!" % self.backup_dir)
return
try:
self.running = True
self.timer.start(self.timer_name)
logging.info("Uploading backup dir %s to Google Cloud Storage bucket: %s" % (self.backup_dir, self.bucket))
for file_path in self.get_backup_files():
gs_path = os.path.relpath(file_path, self.backup_location)
self.upload(file_path, gs_path)
self.exit_code = 0
self.completed = True
except Exception, e:
logging.error("Uploading to Google Cloud Storage failed! Error: %s" % e)
raise OperationError(e)
finally:
self.timer.stop(self.timer_name)
self.stopped = True
11 changes: 11 additions & 0 deletions mongodb_consistent_backup/Upload/Gs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from Gs import Gs


def config(parser):
parser.add_argument("--upload.gs.project_id", dest="upload.gs.project_id", help="Google Cloud Storage Uploader Project ID (required for GS upload)", type=str)
parser.add_argument("--upload.gs.access_key", dest="upload.gs.access_key", help="Google Cloud Storage Uploader Access Key (required for GS upload)", type=str)
parser.add_argument("--upload.gs.secret_key", dest="upload.gs.secret_key", help="Google Cloud Storage Uploader Secret Key (required for GS upload)", type=str)
parser.add_argument("--upload.gs.bucket_name", dest="upload.gs.bucket_name", help="Google Cloud Storage Uploader destination bucket name", type=str)
parser.add_argument("--upload.gs.bucket_prefix", dest="upload.gs.bucket_prefix", help="Google Cloud Storage Uploader destination bucket path prefix", type=str)
parser.add_argument("--upload.gs.threads", dest="upload.gs.threads", help="Google Cloud Storage Uploader worker threads (default: 4)", default=4, type=int)
return parser
2 changes: 1 addition & 1 deletion mongodb_consistent_backup/Upload/S3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ def config(parser):
parser.add_argument("--upload.s3.secret_key", dest="upload.s3.secret_key", help="S3 Uploader AWS Secret Key (required for S3 upload)", type=str)
parser.add_argument("--upload.s3.bucket_name", dest="upload.s3.bucket_name", help="S3 Uploader destination bucket name", type=str)
parser.add_argument("--upload.s3.bucket_prefix", dest="upload.s3.bucket_prefix", help="S3 Uploader destination bucket path prefix", type=str)
parser.add_argument("--upload.s3.threads", dest="upload.s3.threads", help="S3 Uploader upload worker threads (default: 4)", default=4, type=int)
parser.add_argument("--upload.s3.threads", dest="upload.s3.threads", help="S3 Uploader worker threads (default: 4)", default=4, type=int)
parser.add_argument("--upload.s3.chunk_size_mb", dest="upload.s3.chunk_size_mb", help="S3 Uploader upload chunk size, in megabytes (default: 50)", default=50, type=int)
parser.add_argument("--upload.s3.secure", dest="upload.s3.secure", help="S3 Uploader connect over SSL (default: true)", default=True, action="store_false")
parser.add_argument("--upload.s3.retries", dest="upload.s3.retries", help="S3 Uploader retry times (default: 5)", default=5, type=int)
Expand Down
1 change: 1 addition & 0 deletions mongodb_consistent_backup/Upload/Upload.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from mongodb_consistent_backup.Upload.Gs import Gs
from mongodb_consistent_backup.Upload.S3 import S3
from mongodb_consistent_backup.Pipeline import Stage

Expand Down
2 changes: 1 addition & 1 deletion mongodb_consistent_backup/Upload/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@


def config(parser):
parser.add_argument("--upload.method", dest="upload.method", help="Uploader method (default: none)", default='none', choices=['s3', 'none'])
parser.add_argument("--upload.method", dest="upload.method", help="Uploader method (default: none)", default='none', choices=['gs', 's3', 'none'])
parser.add_argument("--upload.remove_uploaded", dest="upload.remove_uploaded",help="Remove source files after successful upload (default: false)", default=False, action="store_true")
return parser