Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS S3 added listObjects endpoint including common prefixes for a delimiter #2023

Merged
merged 19 commits into from
Nov 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7c78fed
added listObjects endpoint including common prefixes for a delimiter
an-tex Nov 20, 2019
e43c5be
ListBucketResultBase should be sealed
an-tex Nov 20, 2019
397dcef
updated s3 api links and version typo fix
an-tex Nov 22, 2019
e7fd31c
Return tuple of contents and common prefixes
seglo Nov 25, 2019
1faa369
Merge pull request #1 from seglo/seglo/an-tex-master
an-tex Nov 26, 2019
5719fe8
delimiter should be mandatory, otherwise commonPrefixes will always b…
an-tex Nov 26, 2019
75cbd55
added S3SourceSpec for listBucketAndCommonPrefixes
an-tex Nov 26, 2019
e72e765
refactoring removing duplicated code in listBucket* functions
an-tex Nov 26, 2019
a93fefe
added listBucketAndCommonPrefixes documentation
an-tex Nov 26, 2019
1ce94bd
Merge remote-tracking branch 'upstream/master'
an-tex Nov 26, 2019
e481d1f
Update docs/src/main/paradox/s3.md
an-tex Nov 26, 2019
3d5ee4a
Update s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3.scala
an-tex Nov 26, 2019
fda7776
removed base trait ListBucketResultBase
an-tex Nov 26, 2019
ff4b64d
Merge branch 'master' of github.com:an-tex/alpakka
an-tex Nov 26, 2019
f194074
removed unnecessary paradox comments
an-tex Nov 26, 2019
cde64ff
updated scaladoc return types
an-tex Nov 26, 2019
3ea4954
added listBucket overload taking a delimiter
an-tex Nov 26, 2019
f668010
fixed scala 2.13 compilation as it enforces a stricter collection cas…
an-tex Nov 26, 2019
97265ab
cross compilation fix as scala 2.13 changed behaviour of immutable co…
an-tex Nov 27, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/src/main/paradox/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ Scala
Java
: @@snip [snip](/s3/src/test/java/docs/javadsl/S3Test.java) { #list-bucket }

## List bucket contents and common prefixes

To get a list of the contents and common prefixes for one hierarchy level using a delimiter, use @scala[@scaladoc[S3.listBucketAndCommonPrefixes](akka.stream.alpakka.s3.scaladsl.S3$)]@java[@scaladoc[S3.listBucketAndCommonPrefixes](akka.stream.alpakka.s3.javadsl.S3$)].
When run, this will give a tuple stream of (Seq[@scaladoc[ListBucketResultContents](akka.stream.alpakka.s3.ListBucketResultContents)], Seq[@scaladoc[ListBucketResultCommonPrefixes](akka.stream.alpakka.s3.ListBucketResultCommonPrefixes)]).

Scala
: @@snip [snip](/s3/src/test/scala/docs/scaladsl/S3SourceSpec.scala) { #list-bucket-and-common-prefixes }

Java
: @@snip [snip](/s3/src/test/java/docs/javadsl/S3Test.java) { #list-bucket-and-common-prefixes }

## Copy upload (multi part)

Copy an S3 object from source bucket to target bucket using @scala[@scaladoc[S3.multipartCopy](akka.stream.alpakka.s3.scaladsl.S3$)]@java[@scaladoc[S3.multipartCopy](akka.stream.alpakka.s3.javadsl.S3$)].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.concurrent.{ExecutionContext, Future}
bucket: String,
prefix: Option[String] = None,
continuationToken: Option[String] = None,
delimiter: Option[String] = None,
headers: Seq[HttpHeader] = Nil
)(implicit conf: S3Settings): HttpRequest = {

Expand All @@ -42,6 +43,7 @@ import scala.concurrent.{ExecutionContext, Future}
Seq(
"list-type" -> listType,
"prefix" -> prefix,
"delimiter" -> delimiter,
continuationTokenName -> continuationToken
).collect { case (k, Some(v)) => k -> v }.toMap
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.annotation.InternalApi
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport
import akka.http.scaladsl.model.{ContentTypes, HttpCharsets, MediaTypes, Uri}
import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
import akka.stream.alpakka.s3.ListBucketResultContents
import akka.stream.alpakka.s3.{ListBucketResultCommonPrefixes, ListBucketResultContents}

import scala.util.Try
import scala.xml.NodeSeq
Expand Down Expand Up @@ -70,6 +70,12 @@ import scala.xml.NodeSeq
Instant.parse((c \ "LastModified").text),
(c \ "StorageClass").text
)
},
(x \\ "CommonPrefixes").map { c =>
ListBucketResultCommonPrefixes(
(x \ "Name").text,
(c \ "Prefix").text
)
}
)
}
Expand Down
92 changes: 67 additions & 25 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import akka.stream.alpakka.s3.headers.ServerSideEncryption
import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source}
import akka.util.ByteString

import scala.collection.immutable

/** Internal Api */
@InternalApi private[s3] final case class S3Location(bucket: String, key: String)

Expand Down Expand Up @@ -70,7 +72,8 @@ import akka.util.ByteString
/** Internal Api */
@InternalApi private[impl] final case class ListBucketResult(isTruncated: Boolean,
continuationToken: Option[String],
contents: Seq[ListBucketResultContents])
contents: Seq[ListBucketResultContents],
commonPrefixes: Seq[ListBucketResultCommonPrefixes])

/** Internal Api */
@InternalApi private[impl] final case class CopyPartResult(lastModified: Instant, eTag: String)
Expand Down Expand Up @@ -126,31 +129,40 @@ import akka.util.ByteString
}
}.mapMaterializedValue(_ => NotUsed)

sealed trait ListBucketState
case object Starting extends ListBucketState
case class Running(continuationToken: String) extends ListBucketState
case object Finished extends ListBucketState

def listBucketCall[T](
bucket: String,
prefix: Option[String],
delimiter: Option[String],
s3Headers: S3Headers,
token: Option[String],
resultTransformer: ListBucketResult => T
)(implicit mat: ActorMaterializer, attr: Attributes): Future[Option[(ListBucketState, T)]] = {
import mat.executionContext
implicit val conf = resolveSettings(attr, mat.system)

signAndGetAs[ListBucketResult](
HttpRequests.listBucket(bucket, prefix, token, delimiter, s3Headers.headersFor(ListBucket))
).map { (res: ListBucketResult) =>
Some(
res.continuationToken
.fold[(ListBucketState, T)]((Finished, resultTransformer(res)))(
t => (Running(t), resultTransformer(res))
)
)
}
}

def listBucket(bucket: String,
prefix: Option[String] = None,
s3Headers: S3Headers): Source[ListBucketResultContents, NotUsed] = {
sealed trait ListBucketState
case object Starting extends ListBucketState
case class Running(continuationToken: String) extends ListBucketState
case object Finished extends ListBucketState

def listBucketCall(
token: Option[String]
)(implicit mat: ActorMaterializer,
attr: Attributes): Future[Option[(ListBucketState, Seq[ListBucketResultContents])]] = {
import mat.executionContext
implicit val conf = resolveSettings(attr, mat.system)

signAndGetAs[ListBucketResult](HttpRequests.listBucket(bucket, prefix, token, s3Headers.headersFor(ListBucket)))
.map { (res: ListBucketResult) =>
Some(
res.continuationToken
.fold[(ListBucketState, Seq[ListBucketResultContents])]((Finished, res.contents))(
t => (Running(t), res.contents)
)
)
}
}

def listBucketCallOnlyContents(token: Option[String])(implicit mat: ActorMaterializer, attr: Attributes) =
listBucketCall(bucket, prefix, None, s3Headers, token, _.contents)

Source
.setup { (mat, attr) =>
Expand All @@ -159,14 +171,44 @@ import akka.util.ByteString
Source
.unfoldAsync[ListBucketState, Seq[ListBucketResultContents]](Starting) {
case Finished => Future.successful(None)
case Starting => listBucketCall(None)
case Running(token) => listBucketCall(Some(token))
case Starting => listBucketCallOnlyContents(None)
case Running(token) => listBucketCallOnlyContents(Some(token))
}
.mapConcat(identity)
}
.mapMaterializedValue(_ => NotUsed)
}

def listBucketAndCommonPrefixes(
bucket: String,
delimiter: String,
prefix: Option[String] = None,
s3Headers: S3Headers
): Source[(immutable.Seq[ListBucketResultContents], immutable.Seq[ListBucketResultCommonPrefixes]), NotUsed] = {

def listBucketCallContentsAndCommonPrefixes(token: Option[String])(implicit mat: ActorMaterializer,
attr: Attributes) =
listBucketCall(bucket,
prefix,
Some(delimiter),
s3Headers,
token,
listBucketResult => (listBucketResult.contents, listBucketResult.commonPrefixes))

Source
.setup { (mat, attr) =>
implicit val materializer = mat
implicit val attributes = attr
Source
.unfoldAsync[ListBucketState, (Seq[ListBucketResultContents], Seq[ListBucketResultCommonPrefixes])](Starting) {
case Finished => Future.successful(None)
case Starting => listBucketCallContentsAndCommonPrefixes(None)
case Running(token) => listBucketCallContentsAndCommonPrefixes(Some(token))
}
}
.mapMaterializedValue(_ => NotUsed)
}

def getObjectMetadata(bucket: String,
key: String,
versionId: Option[String],
Expand Down
Loading