Skip to content

Commit

Permalink
AWS S3 added listBucket variants supporting common prefixes and delim…
Browse files Browse the repository at this point in the history
…iter (#2023)
  • Loading branch information
an-tex authored and seglo committed Nov 27, 2019
1 parent 6fe999c commit eab5486
Show file tree
Hide file tree
Showing 12 changed files with 531 additions and 69 deletions.
11 changes: 11 additions & 0 deletions docs/src/main/paradox/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ Scala
Java
: @@snip [snip](/s3/src/test/java/docs/javadsl/S3Test.java) { #list-bucket }

## List bucket contents and common prefixes

To get a list of the contents and common prefixes for one hierarchy level using a delimiter, use @scala[@scaladoc[S3.listBucketAndCommonPrefixes](akka.stream.alpakka.s3.scaladsl.S3$)]@java[@scaladoc[S3.listBucketAndCommonPrefixes](akka.stream.alpakka.s3.javadsl.S3$)].
When run, this will give a tuple stream of (Seq[@scaladoc[ListBucketResultContents](akka.stream.alpakka.s3.ListBucketResultContents)], Seq[@scaladoc[ListBucketResultCommonPrefixes](akka.stream.alpakka.s3.ListBucketResultCommonPrefixes)]).

Scala
: @@snip [snip](/s3/src/test/scala/docs/scaladsl/S3SourceSpec.scala) { #list-bucket-and-common-prefixes }

Java
: @@snip [snip](/s3/src/test/java/docs/javadsl/S3Test.java) { #list-bucket-and-common-prefixes }

## Copy upload (multi part)

Copy an S3 object from source bucket to target bucket using @scala[@scaladoc[S3.multipartCopy](akka.stream.alpakka.s3.scaladsl.S3$)]@java[@scaladoc[S3.multipartCopy](akka.stream.alpakka.s3.javadsl.S3$)].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.concurrent.{ExecutionContext, Future}
bucket: String,
prefix: Option[String] = None,
continuationToken: Option[String] = None,
delimiter: Option[String] = None,
headers: Seq[HttpHeader] = Nil
)(implicit conf: S3Settings): HttpRequest = {

Expand All @@ -42,6 +43,7 @@ import scala.concurrent.{ExecutionContext, Future}
Seq(
"list-type" -> listType,
"prefix" -> prefix,
"delimiter" -> delimiter,
continuationTokenName -> continuationToken
).collect { case (k, Some(v)) => k -> v }.toMap
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.annotation.InternalApi
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport
import akka.http.scaladsl.model.{ContentTypes, HttpCharsets, MediaTypes, Uri}
import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
import akka.stream.alpakka.s3.ListBucketResultContents
import akka.stream.alpakka.s3.{ListBucketResultCommonPrefixes, ListBucketResultContents}

import scala.util.Try
import scala.xml.NodeSeq
Expand Down Expand Up @@ -70,6 +70,12 @@ import scala.xml.NodeSeq
Instant.parse((c \ "LastModified").text),
(c \ "StorageClass").text
)
},
(x \\ "CommonPrefixes").map { c =>
ListBucketResultCommonPrefixes(
(x \ "Name").text,
(c \ "Prefix").text
)
}
)
}
Expand Down
92 changes: 67 additions & 25 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import akka.stream.alpakka.s3.headers.ServerSideEncryption
import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source}
import akka.util.ByteString

import scala.collection.immutable

/** Internal Api */
@InternalApi private[s3] final case class S3Location(bucket: String, key: String)

Expand Down Expand Up @@ -70,7 +72,8 @@ import akka.util.ByteString
/** Internal Api */
@InternalApi private[impl] final case class ListBucketResult(isTruncated: Boolean,
continuationToken: Option[String],
contents: Seq[ListBucketResultContents])
contents: Seq[ListBucketResultContents],
commonPrefixes: Seq[ListBucketResultCommonPrefixes])

/** Internal Api */
@InternalApi private[impl] final case class CopyPartResult(lastModified: Instant, eTag: String)
Expand Down Expand Up @@ -126,31 +129,40 @@ import akka.util.ByteString
}
}.mapMaterializedValue(_ => NotUsed)

sealed trait ListBucketState
case object Starting extends ListBucketState
case class Running(continuationToken: String) extends ListBucketState
case object Finished extends ListBucketState

def listBucketCall[T](
bucket: String,
prefix: Option[String],
delimiter: Option[String],
s3Headers: S3Headers,
token: Option[String],
resultTransformer: ListBucketResult => T
)(implicit mat: ActorMaterializer, attr: Attributes): Future[Option[(ListBucketState, T)]] = {
import mat.executionContext
implicit val conf = resolveSettings(attr, mat.system)

signAndGetAs[ListBucketResult](
HttpRequests.listBucket(bucket, prefix, token, delimiter, s3Headers.headersFor(ListBucket))
).map { (res: ListBucketResult) =>
Some(
res.continuationToken
.fold[(ListBucketState, T)]((Finished, resultTransformer(res)))(
t => (Running(t), resultTransformer(res))
)
)
}
}

def listBucket(bucket: String,
prefix: Option[String] = None,
s3Headers: S3Headers): Source[ListBucketResultContents, NotUsed] = {
sealed trait ListBucketState
case object Starting extends ListBucketState
case class Running(continuationToken: String) extends ListBucketState
case object Finished extends ListBucketState

def listBucketCall(
token: Option[String]
)(implicit mat: ActorMaterializer,
attr: Attributes): Future[Option[(ListBucketState, Seq[ListBucketResultContents])]] = {
import mat.executionContext
implicit val conf = resolveSettings(attr, mat.system)

signAndGetAs[ListBucketResult](HttpRequests.listBucket(bucket, prefix, token, s3Headers.headersFor(ListBucket)))
.map { (res: ListBucketResult) =>
Some(
res.continuationToken
.fold[(ListBucketState, Seq[ListBucketResultContents])]((Finished, res.contents))(
t => (Running(t), res.contents)
)
)
}
}

def listBucketCallOnlyContents(token: Option[String])(implicit mat: ActorMaterializer, attr: Attributes) =
listBucketCall(bucket, prefix, None, s3Headers, token, _.contents)

Source
.setup { (mat, attr) =>
Expand All @@ -159,14 +171,44 @@ import akka.util.ByteString
Source
.unfoldAsync[ListBucketState, Seq[ListBucketResultContents]](Starting) {
case Finished => Future.successful(None)
case Starting => listBucketCall(None)
case Running(token) => listBucketCall(Some(token))
case Starting => listBucketCallOnlyContents(None)
case Running(token) => listBucketCallOnlyContents(Some(token))
}
.mapConcat(identity)
}
.mapMaterializedValue(_ => NotUsed)
}

def listBucketAndCommonPrefixes(
bucket: String,
delimiter: String,
prefix: Option[String] = None,
s3Headers: S3Headers
): Source[(immutable.Seq[ListBucketResultContents], immutable.Seq[ListBucketResultCommonPrefixes]), NotUsed] = {

def listBucketCallContentsAndCommonPrefixes(token: Option[String])(implicit mat: ActorMaterializer,
attr: Attributes) =
listBucketCall(bucket,
prefix,
Some(delimiter),
s3Headers,
token,
listBucketResult => (listBucketResult.contents, listBucketResult.commonPrefixes))

Source
.setup { (mat, attr) =>
implicit val materializer = mat
implicit val attributes = attr
Source
.unfoldAsync[ListBucketState, (Seq[ListBucketResultContents], Seq[ListBucketResultCommonPrefixes])](Starting) {
case Finished => Future.successful(None)
case Starting => listBucketCallContentsAndCommonPrefixes(None)
case Running(token) => listBucketCallContentsAndCommonPrefixes(Some(token))
}
}
.mapMaterializedValue(_ => NotUsed)
}

def getObjectMetadata(bucket: String,
key: String,
versionId: Option[String],
Expand Down
Loading

0 comments on commit eab5486

Please sign in to comment.