Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/mongodb-consistent-backup.example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ production:
method: none
# remove_uploaded: [true|false] (default: false)
# s3:
# region: [AWS S3 Region] (default: us-east-1)
# access_key: [AWS S3 Access Key]
# secret_key: [AWS S3 Secret Key]
# bucket_name: [AWS S3 Bucket Name]
# bucket_prefix: [prefix] (default: /)
# threads: [1+] (default: 1 per CPU)
# secure: [true|false] (default: true)
# retries: [1+] (default: 5)
# acl: [acl_str] (default: none)
31 changes: 19 additions & 12 deletions mongodb_consistent_backup/Upload/S3/S3.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,31 @@ def _reduce_method(m):

class S3(Task):
def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
super(Nsca, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs)
super(S3, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir, **kwargs)
self.remove_uploaded = self.config.upload.remove_uploaded
self.s3_host = self.config.upload.s3.host
self.region = self.config.upload.s3.region
self.bucket_name = self.config.upload.s3.bucket_name
self.bucket_prefix = self.config.upload.s3.bucket_prefix
self.access_key = self.config.upload.s3.access_key
self.secret_key = self.config.upload.s3.secret_key
self.thread_count = self.config.upload.s3.threads
self.chunk_size_mb = self.config.upload.s3.chunk_size_mb
self.chunk_size = self.chunk_size_mb * 1024 * 1024
self.secure = self.config.upload.s3.secure
self.retries = self.config.upload.s3.retries
self.s3_acl = self.config.upload.s3.acl

self.key_prefix = None
self.key_prefix = base_dir
if 'key_prefix' in self.args:
self.key_prefix = key_prefix

self._pool = None
self._multipart = None
self._upload_done = False
if None in (self.access_key, self.secret_key,self.s3_host):
raise "Invalid S3 security key or host detected!"
if None in (self.access_key, self.secret_key, self.region):
raise "Invalid S3 security key or region detected!"
try:
self.s3_conn = S3Session(self.access_key, self.secret_key, self.s3_host)
self.s3_conn = S3Session(self.region, self.access_key, self.secret_key, self.bucket_name)
self.bucket = self.s3_conn.get_bucket(self.bucket_name)
except Exception, e:
raise OperationError(e)
Expand All @@ -57,15 +60,18 @@ def run(self):
try:
self.timer.start(self.timer_name)
for file_name in os.listdir(self.backup_dir):
file_path = os.path.join(self.backup_dir, file_name)
# skip mongodb-consistent-backup_META dir
if os.path.isdir(file_path):
continue
file_size = os.stat(file_path).st_size
chunk_count = int(ceil(file_size / float(self.chunk_size)))

if self.bucket_prefix == "/":
key_name = "/%s/%s" % (self.key_prefix, file_name)
else:
key_name = "%s/%s/%s" % (self.bucket_prefix, self.key_prefix, file_name)

file_path = os.path.join(self.backup_dir, file_name)
file_size = os.stat(file_path).st_size
chunk_count = int(ceil(file_size / float(self.chunk_size)))

logging.info("Starting multipart AWS S3 upload to key: %s%s using %i threads, %imb chunks, %i retries" % (
self.bucket_name,
key_name,
Expand All @@ -82,9 +88,9 @@ def run(self):
part_num = i + 1
self._pool.apply_async(S3UploadThread(
self.bucket_name,
self.region,
self.access_key,
self.secret_key,
self.s3_host,
self._multipart.id,
part_num,
file_path,
Expand All @@ -99,7 +105,8 @@ def run(self):
if len(self._multipart.get_all_parts()) == chunk_count:
self._multipart.complete_upload()
key = self.bucket.get_key(key_name)
key.set_acl(self.s3_acl)
if self.s3_acl:
key.set_acl(self.s3_acl)
self._upload_done = True

if self.remove_uploaded:
Expand Down
23 changes: 16 additions & 7 deletions mongodb_consistent_backup/Upload/S3/S3Session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,26 @@

import boto
import boto.s3
from boto.s3.connection import OrdinaryCallingFormat, SubdomainCallingFormat

from mongodb_consistent_backup.Errors import OperationError

class S3Session:
def __init__(self, access_key, secret_key, s3_host='s3.amazonaws.com', secure=True, num_retries=5, socket_timeout=15):
def __init__(self, region, access_key, secret_key, bucket_name, secure=True, num_retries=5, socket_timeout=15):
self.region = region
self.access_key = access_key
self.secret_key = secret_key
self.s3_host = s3_host
self.secure = secure
self.num_retries = num_retries
self.socket_timeout = socket_timeout

# monkey patch for bucket_name with dots
# https://github.com/boto/boto/issues/2836
if self.secure and '.' in bucket_name:
self.calling_format = OrdinaryCallingFormat()
else:
self.calling_format = SubdomainCallingFormat()

for section in boto.config.sections():
boto.config.remove_section(section)
boto.config.add_section('Boto')
Expand All @@ -33,11 +41,12 @@ def connect(self):
if not self._conn:
try:
logging.debug("Connecting to AWS S3 with Access Key: %s" % self.access_key)
self._conn = boto.s3.S3Connection(
self.access_key,
self.secret_key,
host=self.s3_host,
is_secure=self.secure
self._conn = boto.s3.connect_to_region(
self.region,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
is_secure=self.secure,
calling_format=self.calling_format
)
logging.debug("Successfully connected to AWS S3 with Access Key: %s" % self.access_key)
except Exception, e:
Expand Down
6 changes: 3 additions & 3 deletions mongodb_consistent_backup/Upload/S3/S3UploadThread.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@


class S3UploadThread:
def __init__(self, bucket_name, access_key, secret_key, s3_host, multipart_id, part_num, file_name, offset,
def __init__(self, bucket_name, region, access_key, secret_key, multipart_id, part_num, file_name, offset,
byte_count, retries=5, secure=True):
self.bucket_name = bucket_name
self.region = region
self.access_key = access_key
self.secret_key = secret_key
self.s3_host = s3_host
self.multipart_id = multipart_id
self.part_num = part_num
self.file_name = file_name
Expand All @@ -21,7 +21,7 @@ def __init__(self, bucket_name, access_key, secret_key, s3_host, multipart_id, p
self.secure = secure

try:
self.s3_conn = S3Session(self.access_key, self.secret_key, self.s3_host, self.secure, self.retries)
self.s3_conn = S3Session(self.region, self.access_key, self.secret_key, self.bucket_name, self.secure, self.retries)
self.bucket = self.s3_conn.get_bucket(self.bucket_name)
except Exception, e:
logging.fatal("Could not get AWS S3 connection to bucket %s! Error: %s" % (self.bucket_name, e))
Expand Down
4 changes: 4 additions & 0 deletions mongodb_consistent_backup/Upload/S3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from S3 import S3

def config(parser):
parser.add_argument("--upload.s3.region", dest="upload.s3.region", help="S3 Uploader AWS region to connect to (default: us-east-1)", default="us-east-1", type=str)
parser.add_argument("--upload.s3.access_key", dest="upload.s3.access_key", help="S3 Uploader AWS Access Key (required for S3 upload)", type=str)
parser.add_argument("--upload.s3.secret_key", dest="upload.s3.secret_key", help="S3 Uploader AWS Secret Key (required for S3 upload)", type=str)
parser.add_argument("--upload.s3.bucket_name", dest="upload.s3.bucket_name", help="S3 Uploader destination bucket name", type=str)
parser.add_argument("--upload.s3.bucket_prefix", dest="upload.s3.bucket_prefix", help="S3 Uploader destination bucket path prefix", type=str)
parser.add_argument("--upload.s3.threads", dest="upload.s3.threads", help="S3 Uploader upload worker threads (default: 4)", default=4, type=int)
parser.add_argument("--upload.s3.chunk_size_mb", dest="upload.s3.chunk_size_mb", help="S3 Uploader upload chunk size, in megabytes (default: 50)", default=50, type=int)
parser.add_argument("--upload.s3.secure", dest="upload.s3.secure", help="S3 Uploader connect over SSL (default: true)", default=True, action="store_false")
parser.add_argument("--upload.s3.retries", dest="upload.s3.retries", help="S3 Uploader retry times (default: 5)", default=5, type=int)
parser.add_argument("--upload.s3.acl", dest="upload.s3.acl", help="S3 Uploader ACL associated with objects (default: none)", default=None, type=str)
return parser