Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3: Bucket management #1608

Merged
merged 1 commit into from
May 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions docs/src/main/paradox/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,50 @@ Scala
Java
: @@snip [snip](/s3/src/test/java/docs/javadsl/S3Test.java) { #list-bucket-attributes }


## Bucket management

Bucket management API provides functionality for both Sources and Futures / CompletionStages.
In case of the Future API user can specify attributes to the request in the method itself and as for Sources it can be done via method `.withAttributes`.
For more information about attributes see: @scaladoc[S3Attributes](akka.stream.alpakka.s3.S3Attributes$) and @scaladoc[Attributes](akka.stream.Attributes)

### Make bucket
In order to create a bucket in S3 you need to specify it's unique name. This value has to be set accordingly to the [requirements](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html).
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The link seems to be the wrong one (cloudtrail), guess this is the right one ? -> https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html#bucketnamingrules

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thank you for noticing. My bad on not checking it before merging. I will submit another PR tomorrow referencing that issue

The bucket will be created in the region specified in the settings.

Scala
: @@snip [snip](/s3/src/test/scala/docs/scaladsl/S3SourceSpec.scala) { #make-bucket }

Java
: @@snip [snip](/s3/src/test/java/docs/javadsl/S3Test.java) { #make-bucket }


### Delete bucket
To delete a bucket you need to specify its name and the bucket needs to be empty.

Scala
: @@snip [snip](/s3/src/test/scala/docs/scaladsl/S3SourceSpec.scala) { #delete-bucket }

Java
: @@snip [snip](/s3/src/test/java/docs/javadsl/S3Test.java) { #delete-bucket }


### Check if bucket exists
It is possible to check if a bucket exists and the user has rights to perform a `listBucket` operation.

There are 3 possible outcomes:

- The user has access to the existing bucket, then it will return `AccessGranted`
- The user doesn't have access but the bucket exists so `AccessDenied` will be returned
- The bucket doesn't exist, the method will return `NotExists`

Scala
: @@snip [snip](/s3/src/test/scala/docs/scaladsl/S3SourceSpec.scala) { #check-if-bucket-exists }

Java
: @@snip [snip](/s3/src/test/java/docs/javadsl/S3Test.java) { #check-if-bucket-exists }


## Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ import scala.concurrent.{ExecutionContext, Future}
.withDefaultHeaders(s3Headers)
}

def bucketManagementRequest(
s3Location: S3Location,
method: HttpMethod,
headers: Seq[HttpHeader] = Seq.empty[HttpHeader]
)(implicit conf: S3Settings): HttpRequest =
s3Request(s3Location = s3Location, method = method)
.withDefaultHeaders(headers)

def uploadRequest(s3Location: S3Location,
payload: Source[ByteString, _],
contentLength: Long,
Expand Down
94 changes: 94 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import akka.http.scaladsl.model.StatusCodes.{NoContent, NotFound, OK}
import akka.http.scaladsl.model.headers.{`Content-Length`, `Content-Type`, ByteRange, CustomHeader}
import akka.http.scaladsl.model.{headers => http, _}
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
import akka.stream.alpakka.s3.BucketAccess.{AccessDenied, AccessGranted, NotExists}
import akka.stream.{ActorMaterializer, Attributes, Materializer}
import akka.stream.alpakka.s3.impl.auth.{CredentialScope, Signer, SigningKey}
import akka.stream.alpakka.s3._
Expand Down Expand Up @@ -261,6 +262,99 @@ import akka.util.ByteString
case _ => downloadRequest
}

def makeBucketSource(bucket: String): Source[Done, NotUsed] =
bucketManagementRequest[Done](
bucket = bucket,
method = HttpMethods.PUT,
process = processBucketLifecycleResponse
)

def makeBucket(bucket: String)(implicit mat: Materializer, attr: Attributes): Future[Done] =
makeBucketSource(bucket).withAttributes(attr).runWith(Sink.ignore)

def deleteBucketSource(bucket: String): Source[Done, NotUsed] =
bucketManagementRequest[Done](
bucket = bucket,
method = HttpMethods.DELETE,
process = processBucketLifecycleResponse
)

def deleteBucket(bucket: String)(implicit mat: Materializer, attr: Attributes): Future[Done] =
deleteBucketSource(bucket).withAttributes(attr).runWith(Sink.ignore)

def checkIfBucketExistsSource(bucketName: String): Source[BucketAccess, NotUsed] =
bucketManagementRequest[BucketAccess](
bucket = bucketName,
method = HttpMethods.HEAD,
process = processCheckIfExistsResponse
)

def checkIfBucketExists(bucket: String)(implicit mat: Materializer, attr: Attributes): Future[BucketAccess] =
checkIfBucketExistsSource(bucket).withAttributes(attr).runWith(Sink.head)

private def bucketManagementRequest[T](
bucket: String,
method: HttpMethod,
process: (HttpResponse, Materializer) => Future[T]
): Source[T, NotUsed] =
Setup
.source { mat => implicit attr =>
implicit val sys: ActorSystem = mat.system
implicit val conf: S3Settings = resolveSettings()

val location = S3Location(bucket = bucket, key = "")

signAndRequest(
requestHeaders(
HttpRequests.bucketManagementRequest(location, method),
None
)
).mapAsync(1) { response =>
process(response, mat)
}
}
.mapMaterializedValue(_ => NotUsed)

private def processBucketLifecycleResponse(response: HttpResponse, materializer: Materializer): Future[Done] = {
import materializer.executionContext

implicit val mat: Materializer = materializer

response match {
case HttpResponse(status, _, entity, _) if status.isSuccess() =>
entity.discardBytes().future()
case HttpResponse(_, _, entity, _) =>
Unmarshal(entity).to[String].map { err =>
throw new S3Exception(err)
}
}
}

private def processCheckIfExistsResponse(response: HttpResponse, materializer: Materializer): Future[BucketAccess] = {
import materializer.executionContext

implicit val mat: Materializer = materializer

response match {
case code @ HttpResponse(StatusCodes.NotFound | StatusCodes.Forbidden | StatusCodes.OK, _, entity, _) =>
entity
.discardBytes()
.future()
.map(
_ =>
code.status match {
case StatusCodes.NotFound => NotExists
case StatusCodes.Forbidden => AccessDenied
case StatusCodes.OK => AccessGranted
}
)
case HttpResponse(_, _, entity, _) =>
Unmarshal(entity).to[String].map { err =>
throw new S3Exception(err)
}
}
}

/**
* Uploads a stream of ByteStrings to a specified location as a multipart upload.
*/
Expand Down
106 changes: 104 additions & 2 deletions s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import akka.http.javadsl.model._
import akka.http.javadsl.model.headers.ByteRange
import akka.http.scaladsl.model.headers.{ByteRange => ScalaByteRange}
import akka.http.scaladsl.model.{ContentType => ScalaContentType, HttpMethod => ScalaHttpMethod}
import akka.stream.{Attributes, Materializer}
import akka.stream.alpakka.s3.headers.{CannedAcl, ServerSideEncryption}
import akka.stream.alpakka.s3.impl._
import akka.stream.alpakka.s3._
Expand Down Expand Up @@ -324,7 +325,7 @@ object S3 {
* @param key the s3 object key
* @param contentType an optional [[akka.http.javadsl.model.ContentType ContentType]]
* @param s3Headers any headers you want to add
* @return a [[akka.stream.javadsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[ava.util.concurrent.CompletionStage CompletionStage]] of [[MultipartUploadResult]]
* @return a [[akka.stream.javadsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[java.util.concurrent.CompletionStage CompletionStage]] of [[MultipartUploadResult]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

*/
def multipartUpload(bucket: String,
key: String,
Expand Down Expand Up @@ -353,7 +354,7 @@ object S3 {
*
* @param bucket the s3 bucket name
* @param key the s3 object key
* @return a [[akka.stream.javadsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[ava.util.concurrent.CompletionStage CompletionStage]] of [[MultipartUploadResult]]
* @return a [[akka.stream.javadsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[java.util.concurrent.CompletionStage CompletionStage]] of [[MultipartUploadResult]]
*/
def multipartUpload(bucket: String, key: String): Sink[ByteString, CompletionStage[MultipartUploadResult]] =
multipartUpload(bucket, key, ContentTypes.APPLICATION_OCTET_STREAM)
Expand Down Expand Up @@ -466,6 +467,107 @@ object S3 {
targetKey: String): RunnableGraph[CompletionStage[MultipartUploadResult]] =
multipartCopy(sourceBucket, sourceKey, targetBucket, targetKey, ContentTypes.APPLICATION_OCTET_STREAM, S3Headers())

/**
* Create new bucket with a given name
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUT.html
* @param bucketName bucket name
* @param materializer materializer to run with
* @param attributes attributes to run request with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[Done]] as API doesn't return any additional information
*/
def makeBucket(bucketName: String, materializer: Materializer, attributes: Attributes): CompletionStage[Done] =
S3Stream.makeBucket(bucketName)(materializer, attributes).toJava

/**
* Create new bucket with a given name
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUT.html
* @param bucketName bucket name
* @param materializer materializer to run with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[Done]] as API doesn't return any additional information
*/
def makeBucket(bucketName: String, materializer: Materializer): CompletionStage[Done] =
S3Stream.makeBucket(bucketName)(materializer, Attributes()).toJava

/**
* Create new bucket with a given name
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUT.html
* @param bucketName bucket name
* @return [[akka.stream.javadsl.Source Source]] of type [[Done]] as API doesn't return any additional information
*/
def makeBucketSource(bucketName: String): Source[Done, NotUsed] =
S3Stream.makeBucketSource(bucketName).asJava

/**
* Delete bucket with a given name
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketDELETE.html
* @param bucketName bucket name
* @param materializer materializer to run with
* @param attributes attributes to run request with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[Done]] as API doesn't return any additional information
*/
def deleteBucket(bucketName: String, materializer: Materializer, attributes: Attributes): CompletionStage[Done] =
S3Stream.deleteBucket(bucketName)(materializer, attributes).toJava

/**
* Delete bucket with a given name
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketDELETE.html
* @param bucketName bucket name
* @param materializer materializer to run with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[Done]] as API doesn't return any additional information
*/
def deleteBucket(bucketName: String, materializer: Materializer): CompletionStage[Done] =
S3Stream.deleteBucket(bucketName)(materializer, Attributes()).toJava

/**
* Delete bucket with a given name
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketDELETE.html
* @param bucketName bucket name
* @return [[akka.stream.javadsl.Source Source]] of type [[Done]] as API doesn't return any additional information
*/
def deleteBucketSource(bucketName: String): Source[Done, NotUsed] =
S3Stream.deleteBucketSource(bucketName).asJava

/**
* Checks whether the bucket exists and the user has rights to perform the `ListBucket` operation
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketHEAD.html
* @param bucketName bucket name
* @param materializer materializer to run with
* @param attributes attributes to run request with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[BucketAccess]]
*/
def checkIfBucketExists(bucketName: String,
materializer: Materializer,
attributes: Attributes): CompletionStage[BucketAccess] =
S3Stream.checkIfBucketExists(bucketName)(materializer, attributes).toJava

/**
* Checks whether the bucket exits and user has rights to perform ListBucket operation
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketHEAD.html
* @param bucketName bucket name
* @param materializer materializer to run with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[BucketAccess]]
*/
def checkIfBucketExists(bucketName: String, materializer: Materializer): CompletionStage[BucketAccess] =
S3Stream.checkIfBucketExists(bucketName)(materializer, Attributes()).toJava

/**
* Checks whether the bucket exits and user has rights to perform ListBucket operation
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketHEAD.html
* @param bucketName bucket name
* @return [[akka.stream.javadsl.Source Source]] of type [[BucketAccess]]
*/
def checkIfBucketExistsSource(bucketName: String): Source[BucketAccess, NotUsed] =
S3Stream.checkIfBucketExistsSource(bucketName).asJava

private def func[T, R](f: T => R) = new akka.japi.function.Function[T, R] {
override def apply(param: T): R = f(param)
}
Expand Down
20 changes: 20 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -459,3 +459,23 @@ final class ObjectMetadata private (
object ObjectMetadata {
def apply(metadata: Seq[HttpHeader]) = new ObjectMetadata(metadata)
}

/**
* While checking for bucket access those responses are available
* 1) AccessDenied - User does have permission to perform ListBucket operation, so bucket exits
* 2) AccessGranted - User doesn't have rights to perform ListBucket but bucket exits
* 3) NotExists - Bucket doesn't exit
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketHEAD.html
*/
sealed class BucketAccess

object BucketAccess {
case object AccessDenied extends BucketAccess
case object AccessGranted extends BucketAccess
case object NotExists extends BucketAccess

val accessDenied: BucketAccess = AccessDenied
val accessGranted: BucketAccess = AccessGranted
val notExists: BucketAccess = NotExists
}
Loading