New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3: support custom endpoint - WIP #619

Closed
wants to merge 1 commit into
base: master
from
Jump to file or symbol
Failed to load files and symbols.
+79 −30
Diff settings

Always

Just for now

@@ -17,15 +17,17 @@ final case class S3Settings(bufferType: BufferType,
proxy: Option[Proxy],
credentialsProvider: AWSCredentialsProvider,
s3Region: String,
pathStyleAccess: Boolean) {
pathStyleAccess: Boolean,
baseUrl: Option[String]) {

This comment has been minimized.

@ktoso

ktoso Dec 5, 2017

Member

In this impl this acts more like an "override s3 base url", which could be fine.
An alternative is a bit more complex, and would be alternative mappings for the s3Region strings -- and we'd by default put the global known s3 urls in there.

@ktoso

ktoso Dec 5, 2017

Member

In this impl this acts more like an "override s3 base url", which could be fine.
An alternative is a bit more complex, and would be alternative mappings for the s3Region strings -- and we'd by default put the global known s3 urls in there.

This comment has been minimized.

@easel

easel Dec 30, 2017

Contributor

I think it's probably best to go with the approach in place now. Minio, for instance, expects the region to be set to us-east-1 while using their custom endpoint.

@easel

easel Dec 30, 2017

Contributor

I think it's probably best to go with the approach in place now. Minio, for instance, expects the region to be set to us-east-1 while using their custom endpoint.

override def toString: String =
s"""S3Settings(
|$bufferType,
|$proxy,
|${credentialsProvider.getClass.getSimpleName},
|$s3Region,
|$pathStyleAccess)""".stripMargin
|$pathStyleAccess,
|$baseUrl)""".stripMargin
}
sealed trait BufferType {
@@ -129,7 +131,8 @@ object S3Settings {
proxy = maybeProxy,
credentialsProvider = credentialsProvider,
s3Region = s3region,
pathStyleAccess = pathStyleAccess
pathStyleAccess = pathStyleAccess,
baseUrl = None
)
}
@@ -33,7 +33,7 @@ private[alpakka] object HttpRequests {
)
HttpRequest(HttpMethods.GET)
.withHeaders(Host(requestHost(bucket, conf.s3Region)))
.withHeaders(Host(requestAuthority(bucket, conf.s3Region)))
.withUri(requestUri(bucket, None).withQuery(query))
}
@@ -85,11 +85,11 @@ private[alpakka] object HttpRequests {
method: HttpMethod = HttpMethods.GET,
uriFn: (Uri => Uri) = identity)(implicit conf: S3Settings): HttpRequest =
HttpRequest(method)
.withHeaders(Host(requestHost(s3Location.bucket, conf.s3Region)))
.withHeaders(Host(requestAuthority(s3Location.bucket, conf.s3Region)))
.withUri(uriFn(requestUri(s3Location.bucket, Some(s3Location.key))))
@throws(classOf[IllegalUriException])
private[this] def requestHost(bucket: String, region: String)(implicit conf: S3Settings): Uri.Host =
private[this] def requestAuthority(bucket: String, region: String)(implicit conf: S3Settings): Authority =
conf.proxy match {
case None =>
if (!conf.pathStyleAccess) {
@@ -105,21 +105,23 @@ private[alpakka] object HttpRequests {
case None => ()
}
}
region match {
case "us-east-1" =>
(region, conf.baseUrl) match {
case (_, Some(baseUrl)) =>
Uri(baseUrl).authority
case ("us-east-1", _) =>
if (conf.pathStyleAccess) {
Uri.Host("s3.amazonaws.com")
Authority(Uri.Host("s3.amazonaws.com"))
} else {
Uri.Host(s"$bucket.s3.amazonaws.com")
Authority(Uri.Host(s"$bucket.s3.amazonaws.com"))
}
case _ =>
if (conf.pathStyleAccess) {
Uri.Host(s"s3-$region.amazonaws.com")
Authority(Uri.Host(s"s3-$region.amazonaws.com"))
} else {
Uri.Host(s"$bucket.s3-$region.amazonaws.com")
Authority(Uri.Host(s"$bucket.s3-$region.amazonaws.com"))
}
}
case Some(proxy) => Uri.Host(proxy.host)
case Some(proxy) => Authority(Uri.Host(proxy.host))
}
private[this] def requestUri(bucket: String, key: Option[String])(implicit conf: S3Settings): Uri = {
@@ -131,10 +133,13 @@ private[alpakka] object HttpRequests {
val path = key.fold(basePath) { someKey =>
someKey.split("/").foldLeft(basePath)((acc, p) => acc / p)
}
val uri = Uri(path = path, authority = Authority(requestHost(bucket, conf.s3Region)))
conf.proxy match {
case None => uri.withScheme("https").withHost(requestHost(bucket, conf.s3Region))
case Some(proxy) => uri.withPort(proxy.port).withScheme(proxy.scheme).withHost(proxy.host)
val uri = Uri(path = path, authority = requestAuthority(bucket, conf.s3Region))
(conf.proxy, conf.baseUrl) match {
case (_, Some(baseUrl)) =>
uri.withScheme(Uri(baseUrl).scheme).withHost(requestAuthority(bucket, conf.s3Region).host)
case (None, _) => uri.withScheme("https").withHost(requestAuthority(bucket, conf.s3Region).host)
case (Some(proxy), _) => uri.withPort(proxy.port).withScheme(proxy.scheme).withHost(proxy.host)
}
}
}
@@ -4,6 +4,7 @@
package akka.stream.alpakka.s3.javadsl;
import akka.japi.Option;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.alpakka.s3.MemoryBufferType;
@@ -40,7 +41,8 @@ public void connectBluemix() {
Some.apply(proxy),
credentials,
"",
true
true,
Option.<String>none().asScala() //FIXME
);
final S3Client s3Client = new S3Client(settings,system(), mat);
// #java-bluemix-example
@@ -4,9 +4,6 @@
package akka.stream.alpakka.s3.javadsl;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
import akka.http.javadsl.model.Uri;
import akka.http.javadsl.model.headers.ByteRange;
@@ -25,6 +22,11 @@
import org.junit.Test;
import scala.Option;
import scala.Some;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -44,7 +46,8 @@
Some.apply(proxy),
credentials,
"us-east-1",
false
false,
akka.japi.Option.<String>none().asScala() //FIXME
);
private final S3Client client = new S3Client(settings, system(), materializer);
//#client
@@ -4,12 +4,18 @@
package akka.stream.alpakka.s3.impl
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.Uri.Query
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.model.{HttpEntity, IllegalUriException, MediaTypes}
import akka.http.scaladsl.model.{HttpEntity, HttpRequest, IllegalUriException, MediaTypes}
import akka.stream.ActorMaterializer
import akka.stream.alpakka.s3.acl.CannedAcl
import akka.stream.alpakka.s3.scaladsl.S3Client
import akka.stream.alpakka.s3.{BufferType, MemoryBufferType, Proxy, S3Settings}
import akka.stream.scaladsl.Source
import akka.testkit.{SocketUtil, TestProbe}
import akka.util.ByteString
import com.amazonaws.auth.{AWSCredentialsProvider, AWSStaticCredentialsProvider, AnonymousAWSCredentials}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{FlatSpec, Matchers}
@@ -22,9 +28,10 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures {
proxy: Option[Proxy] = None,
awsCredentials: AWSCredentialsProvider = new AWSStaticCredentialsProvider(new AnonymousAWSCredentials()),
s3Region: String = "us-east-1",
pathStyleAccess: Boolean = false
pathStyleAccess: Boolean = false,
baseUrl: Option[String] = None
) =
new S3Settings(bufferType, proxy, awsCredentials, s3Region, pathStyleAccess)
new S3Settings(bufferType, proxy, awsCredentials, s3Region, pathStyleAccess, baseUrl)
val location = S3Location("bucket", "image-1024@2x")
val contentType = MediaTypes.`image/jpeg`
@@ -236,4 +243,33 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures {
"prefix" -> "random/prefix",
"continuation-token" -> "randomToken")
}
it should "support custom endpoint configured by `baseUrl`" in {
implicit val system: ActorSystem = ActorSystem("HttpRequestsSpec")
import system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
val probe = TestProbe()
val address = SocketUtil.temporaryServerAddress()
import akka.http.scaladsl.server.Directives._
Http().bindAndHandle(extractRequestContext { ctx =>
probe.ref ! ctx.request
complete("MOCK")
}, address.getHostName, address.getPort)
implicit val setting: S3Settings =
getSettings(baseUrl = Some(s"http://${address.getHostName}:${address.getPort}/"))
val req =
HttpRequests.listBucket(location.bucket, Some("random/prefix"), Some("randomToken"))
Http().singleRequest(req)
probe.expectMsgType[HttpRequest]
materializer.shutdown()
system.terminate()
}
}
@@ -35,7 +35,7 @@ class S3StreamSpec(_system: ActorSystem)
)
val location = S3Location("test-bucket", "test-key")
implicit val settings = new S3Settings(MemoryBufferType, None, credentials, "us-east-1", false)
implicit val settings = new S3Settings(MemoryBufferType, None, credentials, "us-east-1", false, None)
val s3stream = new S3Stream(settings)
val result: HttpRequest = s3stream invokePrivate requestHeaders(getDownloadRequest(location), None)
@@ -56,7 +56,7 @@ class S3StreamSpec(_system: ActorSystem)
val location = S3Location("test-bucket", "test-key")
val range = ByteRange(1, 4)
implicit val settings = new S3Settings(MemoryBufferType, None, credentials, "us-east-1", false)
implicit val settings = new S3Settings(MemoryBufferType, None, credentials, "us-east-1", false, None)
val s3stream = new S3Stream(settings)
val result: HttpRequest = s3stream invokePrivate requestHeaders(getDownloadRequest(location), Some(range))
@@ -28,7 +28,7 @@ object DoucmentationSnippets {
val proxy = Some(Proxy(host, port, "https"))
// Set pathStyleAccess to true and specify proxy, leave region blank
val settings = new S3Settings(MemoryBufferType, proxy, credentials, "", true)
val settings = new S3Settings(MemoryBufferType, proxy, credentials, "", true, None)
val s3Client = new S3Client(settings)(system, materializer)
// #scala-bluemix-example
}
@@ -17,7 +17,7 @@ class S3SinkSpec extends S3WireMockBase with S3ClientIntegrationSpec {
new BasicAWSCredentials("my-AWS-access-key-ID", "my-AWS-password")
)
val proxy = Option(Proxy("localhost", port, "http"))
val settings = new S3Settings(MemoryBufferType, proxy, awsCredentials, "us-east-1", false)
val settings = new S3Settings(MemoryBufferType, proxy, awsCredentials, "us-east-1", false, None)
val s3Client = new S3Client(settings)(system, materializer)
"S3Sink" should "upload a stream of bytes to S3" in {
@@ -19,7 +19,7 @@ class S3SourceSpec extends S3WireMockBase with S3ClientIntegrationSpec {
new BasicAWSCredentials("my-AWS-access-key-ID", "my-AWS-password")
)
val proxy = Option(Proxy("localhost", port, "http"))
val settings = new S3Settings(MemoryBufferType, proxy, awsCredentials, "us-east-1", false)
val settings = new S3Settings(MemoryBufferType, proxy, awsCredentials, "us-east-1", false, None)
val s3Client = new S3Client(settings)(system, materializer)
//#client
ProTip! Use n and p to navigate between commits in a pull request.