Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force US_EAST_1 for S3 listBuckets call #66

Merged
merged 1 commit into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,12 @@ import scala.util.{ Failure, Success, Try }
Flow[ByteString].orElse(Source.single(ByteString.empty))

// def because tokens can expire
private def signingKey(implicit settings: S3Settings) = {
private def signingKey(overrideRegion: Option[Region] = None)(implicit settings: S3Settings) = {
val requestDate = ZonedDateTime.now(ZoneOffset.UTC)
SigningKey(
requestDate,
settings.credentialsProvider,
CredentialScope(requestDate.toLocalDate, settings.s3RegionProvider.getRegion, "s3"))
CredentialScope(requestDate.toLocalDate, overrideRegion.getOrElse(settings.s3RegionProvider.getRegion), "s3"))
}

def download(
Expand Down Expand Up @@ -336,7 +336,9 @@ import scala.util.{ Failure, Success, Try }
Source
.future {
signAndGetAs[ListBucketsResult](
HttpRequests.listBuckets(s3Headers.headers)).map { (res: ListBucketsResult) =>
// This request only works when its called from US_EAST_1. Note that buckets are region
// agnostic
HttpRequests.listBuckets(s3Headers.headers), Some(Region.US_EAST_1)).map { (res: ListBucketsResult) =>
res.buckets
}(ExecutionContexts.parasitic)
}
Expand Down Expand Up @@ -1121,7 +1123,7 @@ import scala.util.{ Failure, Success, Try }
}
.flatMapConcat {
case (req, info) =>
Signer.signedRequest(req, signingKey, conf.signAnonymousRequests).zip(Source.single(info))
Signer.signedRequest(req, signingKey(), conf.signAnonymousRequests).zip(Source.single(info))
}
.via(superPool[(MultipartUpload, Int)])

Expand Down Expand Up @@ -1213,7 +1215,7 @@ import scala.util.{ Failure, Success, Try }
}
.flatMapConcat {
case ((req, info), allContext) =>
Signer.signedRequest(req, signingKey, conf.signAnonymousRequests).zip(Source.single(info)).map {
Signer.signedRequest(req, signingKey(), conf.signAnonymousRequests).zip(Source.single(info)).map {
case (httpRequest, data) => (httpRequest, (data, allContext))
}
}
Expand Down Expand Up @@ -1352,12 +1354,13 @@ import scala.util.{ Failure, Success, Try }
.mapMaterializedValue(_.flatMap(identity)(ExecutionContexts.parasitic))

private def signAndGetAs[T](
request: HttpRequest)(
request: HttpRequest,
overrideRegion: Option[Region] = None)(
implicit um: Unmarshaller[ResponseEntity, T], mat: Materializer, attr: Attributes): Future[T] = {
import mat.executionContext
implicit val sys: ActorSystem = mat.system
for {
response <- signAndRequest(request).runWith(Sink.head)
response <- signAndRequest(request, overrideRegion).runWith(Sink.head)
(entity, _) <- entityForSuccess(response)
t <- Unmarshal(entity).to[T]
} yield t
Expand All @@ -1378,14 +1381,15 @@ import scala.util.{ Failure, Success, Try }
}

private def signAndRequest(
request: HttpRequest)(
request: HttpRequest,
overrideRegion: Option[Region] = None)(
implicit sys: ActorSystem, mat: Materializer, attr: Attributes): Source[HttpResponse, NotUsed] = {
implicit val conf: S3Settings = resolveSettings(attr, sys)
import conf.retrySettings._
import mat.executionContext

val retriableFlow = Flow[HttpRequest]
.flatMapConcat(req => Signer.signedRequest(req, signingKey, conf.signAnonymousRequests))
.flatMapConcat(req => Signer.signedRequest(req, signingKey(overrideRegion), conf.signAnonymousRequests))
.mapAsync(parallelism = 1)(req =>
singleRequest(req)
.map(Success.apply)
Expand Down Expand Up @@ -1457,7 +1461,7 @@ import scala.util.{ Failure, Success, Try }
.mapConcat(identity)
.flatMapConcat {
case (req, info) =>
Signer.signedRequest(req, signingKey, conf.signAnonymousRequests).zip(Source.single(info))
Signer.signedRequest(req, signingKey(), conf.signAnonymousRequests).zip(Source.single(info))
}
}
.mapMaterializedValue(_ => NotUsed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,21 @@ trait S3IntegrationSpec
buckets.map(_.name) should contain(defaultBucket)
}

it should "list buckets in current AWS account using non US_EAST_1 region" in {
// Its only AWS that complains if listBuckets is called from a non US_EAST_1 region
assume(this.isInstanceOf[AWSS3IntegrationSpec])
val result = for {
buckets <- S3.listBuckets().withAttributes(
S3Attributes.settings(defaultS3Settings.withS3RegionProvider(new AwsRegionProvider {
override def getRegion: Region = Region.EU_CENTRAL_1
}))).runWith(Sink.seq)
} yield buckets

val buckets = result.futureValue

buckets.map(_.name) should contain(defaultBucket)
}

it should "list with real credentials" in {
val result = S3
.listBucket(defaultBucket, None)
Expand Down