Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS S3: discard entity on failed sign request #2001

Merged
merged 2 commits into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ object Dependencies {
"com.fasterxml.jackson.core" % "jackson-databind" % JacksonDatabindVersion,
// in-memory filesystem for file related tests
"com.google.jimfs" % "jimfs" % "1.1" % Test, // ApacheV2
"com.github.tomakehurst" % "wiremock" % "2.18.0" % Test // ApacheV2
"com.github.tomakehurst" % "wiremock" % "2.25.1" % Test // ApacheV2
)
)

Expand Down
9 changes: 6 additions & 3 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ import akka.util.ByteString
s3Headers: Seq[HttpHeader] = Seq.empty): Source[HttpResponse, NotUsed] =
Source
.setup { (mat, attr) =>
implicit val materializer = mat
implicit val attributes = attr
implicit val sys = mat.system
implicit val conf = resolveSettings(attr, mat.system)
Expand Down Expand Up @@ -325,6 +326,7 @@ import akka.util.ByteString
): Source[T, NotUsed] =
Source
.setup { (mat, attr) =>
implicit val materializer = mat
implicit val attributes = attr
implicit val sys: ActorSystem = mat.system
implicit val conf: S3Settings = resolveSettings(attr, mat.system)
Expand Down Expand Up @@ -614,7 +616,7 @@ import akka.util.ByteString
)
}
} else {
r.entity.dataBytes.runWith(Sink.ignore)
r.entity.discardBytes()
val etag = r.headers.find(_.lowercaseName() == "etag").map(_.value)
etag
.map(t => Future.successful(SuccessfulUploadPart(upload, index, t)))
Expand Down Expand Up @@ -690,14 +692,15 @@ import akka.util.ByteString
private def signAndRequest(
request: HttpRequest,
retries: Int = 3
)(implicit sys: ActorSystem, attr: Attributes): Source[HttpResponse, NotUsed] = {
)(implicit sys: ActorSystem, mat: ActorMaterializer, attr: Attributes): Source[HttpResponse, NotUsed] = {
implicit val conf = resolveSettings(attr, sys)

Signer
.signedRequest(request, signingKey)
.mapAsync(parallelism = 1)(req => singleRequest(req))
.flatMapConcat {
case HttpResponse(status, _, _, _) if (retries > 0) && (500 to 599 contains status.intValue()) =>
case HttpResponse(status, _, entity, _) if (retries > 0) && (500 to 599 contains status.intValue()) =>
entity.discardBytes()
signAndRequest(request, retries - 1)
case res => Source.single(res)
}
Expand Down
42 changes: 41 additions & 1 deletion s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,12 @@ object S3 {
* @param bucket Which bucket that you list object metadata for
* @param prefix Prefix of the keys you want to list under passed bucket
* @return Source of object metadata
*
* @deprecated use version with `Optional` instead, since 2.0.0
*/
@Deprecated
def listBucket(bucket: String, prefix: Option[String]): Source[ListBucketResultContents, NotUsed] =
listBucket(bucket, prefix, S3Headers.empty)
listBucket(bucket, prefix.asJava, S3Headers.empty)

/**
* Will return a source of object metadata for a given bucket with optional prefix using version 2 of the List Bucket API.
Expand All @@ -435,14 +438,51 @@ object S3 {
* @param bucket Which bucket that you list object metadata for
* @param prefix Prefix of the keys you want to list under passed bucket
* @return Source of object metadata
*
* @deprecated use version with `Optional` instead, since 2.0.0
*/
@Deprecated
def listBucket(bucket: String,
prefix: Option[String],
s3Headers: S3Headers): Source[ListBucketResultContents, NotUsed] =
S3Stream
.listBucket(bucket, prefix, s3Headers)
.asJava

/**
* Will return a source of object metadata for a given bucket with optional prefix using version 2 of the List Bucket API.
* This will automatically page through all keys with the given parameters.
*
* The <code>akka.stream.alpakka.s3.list-bucket-api-version</code> can be set to 1 to use the older API version 1
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html (version 1 API)
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html (version 1 API)
* @param bucket Which bucket that you list object metadata for
* @param prefix Prefix of the keys you want to list under passed bucket
* @return Source of object metadata
*/
def listBucket(bucket: String, prefix: Optional[String]): Source[ListBucketResultContents, NotUsed] =
listBucket(bucket, prefix, S3Headers.empty)

/**
* Will return a source of object metadata for a given bucket with optional prefix using version 2 of the List Bucket API.
* This will automatically page through all keys with the given parameters.
*
* The <code>akka.stream.alpakka.s3.list-bucket-api-version</code> can be set to 1 to use the older API version 1
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html (version 1 API)
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html (version 1 API)
* @param bucket Which bucket that you list object metadata for
* @param prefix Prefix of the keys you want to list under passed bucket
* @return Source of object metadata
*/
def listBucket(bucket: String,
prefix: Optional[String],
s3Headers: S3Headers): Source[ListBucketResultContents, NotUsed] =
S3Stream
.listBucket(bucket, prefix.asScala, s3Headers)
.asJava

/**
* Uploads a S3 Object by making multiple requests
*
Expand Down
84 changes: 50 additions & 34 deletions s3/src/test/java/docs/javadsl/S3Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,64 @@

package docs.javadsl;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.model.*;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.ContentTypes;
import akka.http.javadsl.model.HttpEntities;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.Uri;
import akka.http.javadsl.model.headers.ByteRange;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.Attributes;
import akka.stream.Materializer;
import akka.stream.alpakka.s3.*;
import akka.stream.alpakka.s3.headers.CustomerKeys;
import akka.stream.alpakka.s3.headers.ServerSideEncryption;
import akka.stream.alpakka.s3.javadsl.S3;
import akka.stream.javadsl.Sink$;
import org.junit.Test;
import akka.NotUsed;
import akka.http.javadsl.model.headers.ByteRange;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.alpakka.s3.scaladsl.S3WireMockBase;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.javadsl.TestKit;
import akka.util.ByteString;
import scala.Option;
import scala.collection.immutable.List;
import com.github.tomakehurst.wiremock.WireMockServer;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class S3Test extends S3WireMockBase {

private static ActorSystem actorSystemForShutdown;
private static WireMockServer wireMockServerForShutdown;

private final Materializer materializer = ActorMaterializer.create(system());

private final S3Settings sampleSettings = S3Ext.get(system()).settings();
private final String prefix = listPrefix();

@Before
public void before() {
wireMockServerForShutdown = _wireMockServer();
actorSystemForShutdown = system();
}

@AfterClass
public static void afterAll() throws Exception {
wireMockServerForShutdown.stop();
Http.get(actorSystemForShutdown)
.shutdownAllConnectionPools()
.thenRun(() -> TestKit.shutdownActorSystem(actorSystemForShutdown));
}

@Test
public void multipartUpload() throws Exception {
Expand Down Expand Up @@ -221,25 +242,20 @@ public void downloadServerSideEncryptionWithVersion() throws Exception {
sourceAndMeta =
S3.download(bucket(), bucketKey(), null, Optional.of(versionId), sseCustomerKeys());

final Source<ByteString, NotUsed> source =
final Pair<Source<ByteString, NotUsed>, ObjectMetadata> p =
sourceAndMeta
.runWith(Sink.head(), materializer)
.toCompletableFuture()
.get(5, TimeUnit.SECONDS)
.get()
.first();
.orElseThrow(() -> new RuntimeException("empty Optional from S3.download"));

final Source<ByteString, NotUsed> source = p.first();
final CompletionStage<String> resultCompletionStage =
source.map(ByteString::utf8String).runWith(Sink.head(), materializer);
final ObjectMetadata metadata =
sourceAndMeta
.runWith(Sink.head(), materializer)
.toCompletableFuture()
.get(5, TimeUnit.SECONDS)
.get()
.second();
final String result = resultCompletionStage.toCompletableFuture().get();

final String result = resultCompletionStage.toCompletableFuture().get(2, TimeUnit.SECONDS);
assertEquals(bodySSE(), result);

final ObjectMetadata metadata = p.second();
assertEquals(Optional.of(versionId), metadata.getVersionId());
}

Expand Down Expand Up @@ -305,7 +321,7 @@ public void listBucket() throws Exception {

// #list-bucket
final Source<ListBucketResultContents, NotUsed> keySource =
S3.listBucket(bucket(), Option.apply(listPrefix()));
S3.listBucket(bucket(), Optional.of(prefix));
// #list-bucket

final CompletionStage<ListBucketResultContents> resultCompletionStage =
Expand All @@ -326,7 +342,7 @@ public void listBucketVersion1() throws Exception {
S3Ext.get(system()).settings().withListBucketApiVersion(ApiVersion.getListBucketVersion1());

final Source<ListBucketResultContents, NotUsed> keySource =
S3.listBucket(bucket(), Option.apply(listPrefix()))
S3.listBucket(bucket(), Optional.of(prefix))
.withAttributes(S3Attributes.settings(useVersion1Api));
// #list-bucket-attributes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures {
val req =
HttpRequests.listBucket(location.bucket, Some("random/prefix"), Some("randomToken"))

Http().singleRequest(req)
Http().singleRequest(req).futureValue

probe.expectMsgType[HttpRequest]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.github.tomakehurst.wiremock.stubbing.Scenario
import com.typesafe.config.ConfigFactory
import software.amazon.awssdk.regions.Region

abstract class S3WireMockBase(_system: ActorSystem, _wireMockServer: WireMockServer) extends TestKit(_system) {
abstract class S3WireMockBase(_system: ActorSystem, val _wireMockServer: WireMockServer) extends TestKit(_system) {

private def this(mock: WireMockServer) =
this(ActorSystem(getCallerName(getClass), config(mock.port()).withFallback(ConfigFactory.load())), mock)
Expand Down Expand Up @@ -295,7 +295,7 @@ abstract class S3WireMockBase(_system: ActorSystem, _wireMockServer: WireMockSer

mock.register(
put(urlEqualTo(s"/$bucketKey?partNumber=1&uploadId=$uploadId"))
.withRequestBody(matching(expectedBody))
.withRequestBody(if (expectedBody.isEmpty) absent() else matching(expectedBody))
.willReturn(
aResponse()
.withStatus(200)
Expand Down