Skip to content

Commit

Permalink
Retry when S3 bucket creation yells BucketAlreadyOwnedBy (fixes #955)
Browse files Browse the repository at this point in the history
  • Loading branch information
hannes-ucsc committed Jun 16, 2016
1 parent ba6b808 commit e17fe6e
Showing 1 changed file with 38 additions and 13 deletions.
51 changes: 38 additions & 13 deletions src/toil/jobStores/aws/jobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from boto.sdb.item import Item
import boto.s3
import boto.sdb
from boto.exception import SDBResponseError, S3ResponseError
from boto.exception import SDBResponseError, S3ResponseError, S3CreateError
from boto.s3.key import Key

from toil.jobStores.abstractJobStore import (AbstractJobStore,
Expand All @@ -50,7 +50,8 @@
sdb_unavailable,
monkeyPatchSdbConnection,
retry_s3,
bucket_location_to_region)
bucket_location_to_region,
region_to_bucket_location)
from toil.jobWrapper import JobWrapper
import toil.lib.encryption as encryption

Expand Down Expand Up @@ -520,18 +521,36 @@ def _getOrCreateBucket(self, bucket_name, versioning=False):
"""
assert self.minBucketNameLen <= len(bucket_name) <= self.maxBucketNameLen
assert self.bucketNameRe.match(bucket_name)
try:
bucket = self.s3.get_bucket(bucket_name, validate=True)
assert versioning is self.__getBucketVersioning(bucket)
return bucket
except S3ResponseError as e:
if e.error_code == 'NoSuchBucket':
bucket = self.s3.create_bucket(bucket_name, location=self.region)
if versioning:
bucket.configure_versioning(versioning)
log.info("Setting up job store bucket '%s'.", bucket_name)
while True:
log.debug("Looking up job store bucket '%s'.", bucket_name)
try:
bucket = self.s3.get_bucket(bucket_name, validate=True)
assert self.__getBucketRegion(bucket) == self.region
assert versioning is self.__getBucketVersioning(bucket)
log.debug("Using existing job store bucket '%s'.", bucket_name)
return bucket
else:
raise
except S3ResponseError as e:
if e.error_code == 'NoSuchBucket':
log.debug("Bucket '%s' does not exist. Creating it.", bucket_name)
try:
location = region_to_bucket_location(self.region)
bucket = self.s3.create_bucket(bucket_name, location=location)
except S3CreateError as e:
if e.error_code == 'BucketAlreadyOwnedByYou':
# https://github.com/BD2KGenomics/toil/issues/955
log.warn('Got %s, retrying.', e)
# and loop
else:
raise
else:
assert self.__getBucketRegion(bucket) == self.region
if versioning:
bucket.configure_versioning(versioning)
log.debug("Created new job store bucket '%s'.", bucket_name)
return bucket
else:
raise

def _getOrCreateDomain(self, domain_name):
"""
Expand All @@ -542,6 +561,7 @@ def _getOrCreateDomain(self, domain_name):
:rtype : Domain
"""
log.info("Setting up job store SDB domain '%s'.", domain_name)
try:
return self.db.get_domain(domain_name)
except SDBResponseError as e:
Expand Down Expand Up @@ -1075,6 +1095,11 @@ def __getBucketVersioning(self, bucket):
status = bucket.get_versioning_status()
return bool(status) and self.versionings[status['Versioning']]

def __getBucketRegion(self, bucket):
for attempt in retry_s3():
with attempt:
return bucket_location_to_region(bucket.get_location())

def deleteJobStore(self):
self.registry_domain.put_attributes(self.namePrefix, dict(exists=str(False)))
if self.filesBucket is not None:
Expand Down

0 comments on commit e17fe6e

Please sign in to comment.