From cb7a9b107212290d0e640f596c75d81f1bb5eeaa Mon Sep 17 00:00:00 2001 From: Hu Hailin Date: Mon, 15 May 2017 19:16:07 +0900 Subject: [PATCH] make s3 upload work --- conf/mongodb-consistent-backup.example.conf | 4 +++ mongodb_consistent_backup/Upload/S3/S3.py | 31 ++++++++++++------- .../Upload/S3/S3Session.py | 23 +++++++++----- .../Upload/S3/S3UploadThread.py | 6 ++-- .../Upload/S3/__init__.py | 4 +++ 5 files changed, 46 insertions(+), 22 deletions(-) diff --git a/conf/mongodb-consistent-backup.example.conf b/conf/mongodb-consistent-backup.example.conf index 24959f2c..6c13860a 100644 --- a/conf/mongodb-consistent-backup.example.conf +++ b/conf/mongodb-consistent-backup.example.conf @@ -49,8 +49,12 @@ production: method: none # remove_uploaded: [true|false] (default: false) # s3: + # region: [AWS S3 Region] (default: us-east-1) # access_key: [AWS S3 Access Key] # secret_key: [AWS S3 Secret Key] # bucket_name: [AWS S3 Bucket Name] # bucket_prefix: [prefix] (default: /) # threads: [1+] (default: 1 per CPU) + # secure: [true|false] (default: true) + # retries: [1+] (default: 5) + # acl: [acl_str] (default: none) diff --git a/mongodb_consistent_backup/Upload/S3/S3.py b/mongodb_consistent_backup/Upload/S3/S3.py index 7bf9d8ad..fe3d79dc 100644 --- a/mongodb_consistent_backup/Upload/S3/S3.py +++ b/mongodb_consistent_backup/Upload/S3/S3.py @@ -24,9 +24,9 @@ def _reduce_method(m): class S3(Task): def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs): - super(Nsca, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs) + super(S3, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs) self.remove_uploaded = self.config.upload.remove_uploaded - self.s3_host = self.config.upload.s3.host + self.region = self.config.upload.s3.region self.bucket_name = self.config.upload.s3.bucket_name self.bucket_prefix = self.config.upload.s3.bucket_prefix self.access_key = self.config.upload.s3.access_key @@ -34,18 +34,21 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs): self.thread_count = self.config.upload.s3.threads self.chunk_size_mb = self.config.upload.s3.chunk_size_mb self.chunk_size = self.chunk_size_mb * 1024 * 1024 + self.secure = self.config.upload.s3.secure + self.retries = self.config.upload.s3.retries + self.s3_acl = self.config.upload.s3.acl - self.key_prefix = None + self.key_prefix = base_dir if 'key_prefix' in self.args: self.key_prefix = key_prefix self._pool = None self._multipart = None self._upload_done = False - if None in (self.access_key, self.secret_key,self.s3_host): - raise "Invalid S3 security key or host detected!" + if None in (self.access_key, self.secret_key, self.region): + raise "Invalid S3 security key or region detected!" try: - self.s3_conn = S3Session(self.access_key, self.secret_key, self.s3_host) + self.s3_conn = S3Session(self.region, self.access_key, self.secret_key, self.bucket_name) self.bucket = self.s3_conn.get_bucket(self.bucket_name) except Exception, e: raise OperationError(e) @@ -57,15 +60,18 @@ def run(self): try: self.timer.start(self.timer_name) for file_name in os.listdir(self.backup_dir): + file_path = os.path.join(self.backup_dir, file_name) + # skip mongodb-consistent-backup_META dir + if os.path.isdir(file_path): + continue + file_size = os.stat(file_path).st_size + chunk_count = int(ceil(file_size / float(self.chunk_size))) + if self.bucket_prefix == "/": key_name = "/%s/%s" % (self.key_prefix, file_name) else: key_name = "%s/%s/%s" % (self.bucket_prefix, self.key_prefix, file_name) - file_path = os.path.join(self.backup_dir, file_name) - file_size = os.stat(file_path).st_size - chunk_count = int(ceil(file_size / float(self.chunk_size))) - logging.info("Starting multipart AWS S3 upload to key: %s%s using %i threads, %imb chunks, %i retries" % ( self.bucket_name, key_name, @@ -82,9 +88,9 @@ def run(self): part_num = i + 1 self._pool.apply_async(S3UploadThread( self.bucket_name, + self.region, self.access_key, self.secret_key, - self.s3_host, self._multipart.id, part_num, file_path, @@ -99,7 +105,8 @@ def run(self): if len(self._multipart.get_all_parts()) == chunk_count: self._multipart.complete_upload() key = self.bucket.get_key(key_name) - key.set_acl(self.s3_acl) + if self.s3_acl: + key.set_acl(self.s3_acl) self._upload_done = True if self.remove_uploaded: diff --git a/mongodb_consistent_backup/Upload/S3/S3Session.py b/mongodb_consistent_backup/Upload/S3/S3Session.py index a2027a58..aa24d0c9 100644 --- a/mongodb_consistent_backup/Upload/S3/S3Session.py +++ b/mongodb_consistent_backup/Upload/S3/S3Session.py @@ -2,18 +2,26 @@ import boto import boto.s3 +from boto.s3.connection import OrdinaryCallingFormat, SubdomainCallingFormat from mongodb_consistent_backup.Errors import OperationError class S3Session: - def __init__(self, access_key, secret_key, s3_host='s3.amazonaws.com', secure=True, num_retries=5, socket_timeout=15): + def __init__(self, region, access_key, secret_key, bucket_name, secure=True, num_retries=5, socket_timeout=15): + self.region = region self.access_key = access_key self.secret_key = secret_key - self.s3_host = s3_host self.secure = secure self.num_retries = num_retries self.socket_timeout = socket_timeout + # monkey patch for bucket_name with dots + # https://github.com/boto/boto/issues/2836 + if self.secure and '.' in bucket_name: + self.calling_format = OrdinaryCallingFormat() + else: + self.calling_format = SubdomainCallingFormat() + for section in boto.config.sections(): boto.config.remove_section(section) boto.config.add_section('Boto') @@ -33,11 +41,12 @@ def connect(self): if not self._conn: try: logging.debug("Connecting to AWS S3 with Access Key: %s" % self.access_key) - self._conn = boto.s3.S3Connection( - self.access_key, - self.secret_key, - host=self.s3_host, - is_secure=self.secure + self._conn = boto.s3.connect_to_region( + self.region, + aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, + is_secure=self.secure, + calling_format=self.calling_format ) logging.debug("Successfully connected to AWS S3 with Access Key: %s" % self.access_key) except Exception, e: diff --git a/mongodb_consistent_backup/Upload/S3/S3UploadThread.py b/mongodb_consistent_backup/Upload/S3/S3UploadThread.py index 066db146..727eaa7c 100644 --- a/mongodb_consistent_backup/Upload/S3/S3UploadThread.py +++ b/mongodb_consistent_backup/Upload/S3/S3UploadThread.py @@ -6,12 +6,12 @@ class S3UploadThread: - def __init__(self, bucket_name, access_key, secret_key, s3_host, multipart_id, part_num, file_name, offset, + def __init__(self, bucket_name, region, access_key, secret_key, multipart_id, part_num, file_name, offset, byte_count, retries=5, secure=True): self.bucket_name = bucket_name + self.region = region self.access_key = access_key self.secret_key = secret_key - self.s3_host = s3_host self.multipart_id = multipart_id self.part_num = part_num self.file_name = file_name @@ -21,7 +21,7 @@ def __init__(self, bucket_name, access_key, secret_key, s3_host, multipart_id, p self.secure = secure try: - self.s3_conn = S3Session(self.access_key, self.secret_key, self.s3_host, self.secure, self.retries) + self.s3_conn = S3Session(self.region, self.access_key, self.secret_key, self.bucket_name, self.secure, self.retries) self.bucket = self.s3_conn.get_bucket(self.bucket_name) except Exception, e: logging.fatal("Could not get AWS S3 connection to bucket %s! Error: %s" % (self.bucket_name, e)) diff --git a/mongodb_consistent_backup/Upload/S3/__init__.py b/mongodb_consistent_backup/Upload/S3/__init__.py index b3c77b6f..f05e610e 100644 --- a/mongodb_consistent_backup/Upload/S3/__init__.py +++ b/mongodb_consistent_backup/Upload/S3/__init__.py @@ -1,10 +1,14 @@ from S3 import S3 def config(parser): + parser.add_argument("--upload.s3.region", dest="upload.s3.region", help="S3 Uploader AWS region to connect to (default: us-east-1)", default="us-east-1", type=str) parser.add_argument("--upload.s3.access_key", dest="upload.s3.access_key", help="S3 Uploader AWS Access Key (required for S3 upload)", type=str) parser.add_argument("--upload.s3.secret_key", dest="upload.s3.secret_key", help="S3 Uploader AWS Secret Key (required for S3 upload)", type=str) parser.add_argument("--upload.s3.bucket_name", dest="upload.s3.bucket_name", help="S3 Uploader destination bucket name", type=str) parser.add_argument("--upload.s3.bucket_prefix", dest="upload.s3.bucket_prefix", help="S3 Uploader destination bucket path prefix", type=str) parser.add_argument("--upload.s3.threads", dest="upload.s3.threads", help="S3 Uploader upload worker threads (default: 4)", default=4, type=int) parser.add_argument("--upload.s3.chunk_size_mb", dest="upload.s3.chunk_size_mb", help="S3 Uploader upload chunk size, in megabytes (default: 50)", default=50, type=int) + parser.add_argument("--upload.s3.secure", dest="upload.s3.secure", help="S3 Uploader connect over SSL (default: true)", default=True, action="store_false") + parser.add_argument("--upload.s3.retries", dest="upload.s3.retries", help="S3 Uploader retry times (default: 5)", default=5, type=int) + parser.add_argument("--upload.s3.acl", dest="upload.s3.acl", help="S3 Uploader ACL associated with objects (default: none)", default=None, type=str) return parser