Skip to content

Commit

Permalink
Creating and deleting bucket. Checking if it exists akka#1546
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcin Aman authored and MarcinAman committed Apr 26, 2019
1 parent e55332e commit dd90286
Show file tree
Hide file tree
Showing 12 changed files with 723 additions and 10 deletions.
44 changes: 44 additions & 0 deletions docs/src/main/paradox/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,50 @@ Scala
Java
: @@snip [snip](/s3/src/test/java/docs/javadsl/S3Test.java) { #list-bucket-attributes }


## Bucket management

Bucket management API provides functionality for both Sources and Futures / CompletionStages.
In case of the Future API user can specify attributes to the request in the method itself and as for Sources it can be done via method `.withAttributes`.
For more information about attributes see: @scaladoc[S3Attributes](akka.stream.alpakka.s3.S3Attributes$) and @scaladoc[Attributes](akka.stream.Attributes)

### Make bucket
In order to create a bucket in S3 you need to specify it's unique name. This value has to be set accordingly to the [requirements](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html).
The bucket will be created in the region specified in the settings.

Scala
: @@snip [snip](/s3/src/test/scala/docs/scaladsl/S3SourceSpec.scala) { #make-bucket }

Java
: @@snip [snip](/s3/src/test/java/docs/javadsl/S3Test.java) { #make-bucket }


### Delete bucket
To delete a bucket you need to specify its name and the bucket needs to be empty.

Scala
: @@snip [snip](/s3/src/test/scala/docs/scaladsl/S3SourceSpec.scala) { #delete-bucket }

Java
: @@snip [snip](/s3/src/test/java/docs/javadsl/S3Test.java) { #delete-bucket }


### Check if bucket exists
It is possible to check if a bucket exists and the user has rights to perform a `listBucket` operation.

There are 3 possible outcomes:

- The user has access to the existing bucket, then it will return `AccessGranted`
- The user doesn't have access but the bucket exists so `AccessDenied` will be returned
- The bucket doesn't exist, the method will return `NotExists`

Scala
: @@snip [snip](/s3/src/test/scala/docs/scaladsl/S3SourceSpec.scala) { #check-if-bucket-exists }

Java
: @@snip [snip](/s3/src/test/java/docs/javadsl/S3Test.java) { #check-if-bucket-exists }


## Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ import scala.concurrent.{ExecutionContext, Future}
.withDefaultHeaders(s3Headers)
}

def bucketManagementRequest(
s3Location: S3Location,
method: HttpMethod,
headers: Seq[HttpHeader] = Seq.empty[HttpHeader]
)(implicit conf: S3Settings): HttpRequest =
s3Request(s3Location = s3Location, method = method)
.withDefaultHeaders(headers)

def uploadRequest(s3Location: S3Location,
payload: Source[ByteString, _],
contentLength: Long,
Expand Down
94 changes: 94 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import akka.http.scaladsl.model.StatusCodes.{NoContent, NotFound, OK}
import akka.http.scaladsl.model.headers.{`Content-Length`, `Content-Type`, ByteRange, CustomHeader}
import akka.http.scaladsl.model.{headers => http, _}
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
import akka.stream.alpakka.s3.BucketAccess.{AccessDenied, AccessGranted, NotExists}
import akka.stream.{ActorMaterializer, Attributes, Materializer}
import akka.stream.alpakka.s3.impl.auth.{CredentialScope, Signer, SigningKey}
import akka.stream.alpakka.s3._
Expand Down Expand Up @@ -259,6 +260,99 @@ import akka.util.ByteString
case _ => downloadRequest
}

def makeBucket(bucket: String): Source[Done, NotUsed] =
bucketManagementRequest[Done](
bucket = bucket,
method = HttpMethods.PUT,
process = processBucketLifecycleResponse
)

def makeBucketF(bucket: String)(implicit mat: Materializer, attr: Attributes): Future[Done] =
makeBucket(bucket).withAttributes(attr).runWith(Sink.ignore)

def deleteBucket(bucket: String): Source[Done, NotUsed] =
bucketManagementRequest[Done](
bucket = bucket,
method = HttpMethods.DELETE,
process = processBucketLifecycleResponse
)

def deleteBucketF(bucket: String)(implicit mat: Materializer, attr: Attributes): Future[Done] =
deleteBucket(bucket).withAttributes(attr).runWith(Sink.ignore)

def checkIfBucketExists(bucketName: String): Source[BucketAccess, NotUsed] =
bucketManagementRequest[BucketAccess](
bucket = bucketName,
method = HttpMethods.HEAD,
process = processCheckIfExistsResponse
)

def checkIfBucketExistsF(bucket: String)(implicit mat: Materializer, attr: Attributes): Future[BucketAccess] =
checkIfBucketExists(bucket).withAttributes(attr).runWith(Sink.head)

private def bucketManagementRequest[T](
bucket: String,
method: HttpMethod,
process: (HttpResponse, Materializer) => Future[T]
): Source[T, NotUsed] =
Setup
.source { mat => implicit attr =>
implicit val sys: ActorSystem = mat.system
implicit val conf: S3Settings = resolveSettings()

val location = S3Location(bucket = bucket, key = "")

signAndRequest(
requestHeaders(
HttpRequests.bucketManagementRequest(location, method),
None
)
).mapAsync(1) { response =>
process(response, mat)
}
}
.mapMaterializedValue(_ => NotUsed)

private def processBucketLifecycleResponse(response: HttpResponse, materializer: Materializer): Future[Done] = {
import materializer.executionContext

implicit val mat: Materializer = materializer

response match {
case HttpResponse(status, _, entity, _) if status.isSuccess() =>
entity.discardBytes().future()
case HttpResponse(_, _, entity, _) =>
Unmarshal(entity).to[String].map { err =>
throw new S3Exception(err)
}
}
}

private def processCheckIfExistsResponse(response: HttpResponse, materializer: Materializer): Future[BucketAccess] = {
import materializer.executionContext

implicit val mat: Materializer = materializer

response match {
case code @ HttpResponse(StatusCodes.NotFound | StatusCodes.Forbidden | StatusCodes.OK, _, entity, _) =>
entity
.discardBytes()
.future()
.map(
_ =>
code.status match {
case StatusCodes.NotFound => NotExists
case StatusCodes.Forbidden => AccessDenied
case StatusCodes.OK => AccessGranted
}
)
case HttpResponse(_, _, entity, _) =>
Unmarshal(entity).to[String].map { err =>
throw new S3Exception(err)
}
}
}

/**
* Uploads a stream of ByteStrings to a specified location as a multipart upload.
*/
Expand Down
106 changes: 104 additions & 2 deletions s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import akka.http.javadsl.model._
import akka.http.javadsl.model.headers.ByteRange
import akka.http.scaladsl.model.headers.{ByteRange => ScalaByteRange}
import akka.http.scaladsl.model.{ContentType => ScalaContentType, HttpMethod => ScalaHttpMethod}
import akka.stream.{Attributes, Materializer}
import akka.stream.alpakka.s3.headers.{CannedAcl, ServerSideEncryption}
import akka.stream.alpakka.s3.impl._
import akka.stream.alpakka.s3._
Expand Down Expand Up @@ -324,7 +325,7 @@ object S3 {
* @param key the s3 object key
* @param contentType an optional [[akka.http.javadsl.model.ContentType ContentType]]
* @param s3Headers any headers you want to add
* @return a [[akka.stream.javadsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[ava.util.concurrent.CompletionStage CompletionStage]] of [[MultipartUploadResult]]
* @return a [[akka.stream.javadsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[java.util.concurrent.CompletionStage CompletionStage]] of [[MultipartUploadResult]]
*/
def multipartUpload(bucket: String,
key: String,
Expand Down Expand Up @@ -353,7 +354,7 @@ object S3 {
*
* @param bucket the s3 bucket name
* @param key the s3 object key
* @return a [[akka.stream.javadsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[ava.util.concurrent.CompletionStage CompletionStage]] of [[MultipartUploadResult]]
* @return a [[akka.stream.javadsl.Sink Sink]] that accepts [[akka.util.ByteString ByteString]]'s and materializes to a [[java.util.concurrent.CompletionStage CompletionStage]] of [[MultipartUploadResult]]
*/
def multipartUpload(bucket: String, key: String): Sink[ByteString, CompletionStage[MultipartUploadResult]] =
multipartUpload(bucket, key, ContentTypes.APPLICATION_OCTET_STREAM)
Expand Down Expand Up @@ -466,6 +467,107 @@ object S3 {
targetKey: String): RunnableGraph[CompletionStage[MultipartUploadResult]] =
multipartCopy(sourceBucket, sourceKey, targetBucket, targetKey, ContentTypes.APPLICATION_OCTET_STREAM, S3Headers())

/**
* Create new bucket with a given name
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUT.html
* @param bucketName bucket name
* @param materializer materializer to run with
* @param attributes attributes to run request with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[Done]] as API doesn't return any additional information
*/
def makeBucket(bucketName: String, materializer: Materializer, attributes: Attributes): CompletionStage[Done] =
S3Stream.makeBucketF(bucketName)(materializer, attributes).toJava

/**
* Create new bucket with a given name
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUT.html
* @param bucketName bucket name
* @param materializer materializer to run with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[Done]] as API doesn't return any additional information
*/
def makeBucket(bucketName: String, materializer: Materializer): CompletionStage[Done] =
S3Stream.makeBucketF(bucketName)(materializer, Attributes()).toJava

/**
* Create new bucket with a given name
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUT.html
* @param bucketName bucket name
* @return [[akka.stream.javadsl.Source Source]] of type [[Done]] as API doesn't return any additional information
*/
def makeBucket(bucketName: String): Source[Done, NotUsed] =
S3Stream.makeBucket(bucketName).asJava

/**
* Delete bucket with a given name
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketDELETE.html
* @param bucketName bucket name
* @param materializer materializer to run with
* @param attributes attributes to run request with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[Done]] as API doesn't return any additional information
*/
def deleteBucket(bucketName: String, materializer: Materializer, attributes: Attributes): CompletionStage[Done] =
S3Stream.deleteBucketF(bucketName)(materializer, attributes).toJava

/**
* Delete bucket with a given name
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketDELETE.html
* @param bucketName bucket name
* @param materializer materializer to run with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[Done]] as API doesn't return any additional information
*/
def deleteBucket(bucketName: String, materializer: Materializer): CompletionStage[Done] =
S3Stream.deleteBucketF(bucketName)(materializer, Attributes()).toJava

/**
* Delete bucket with a given name
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketDELETE.html
* @param bucketName bucket name
* @return [[akka.stream.javadsl.Source Source]] of type [[Done]] as API doesn't return any additional information
*/
def deleteBucket(bucketName: String): Source[Done, NotUsed] =
S3Stream.deleteBucket(bucketName).asJava

/**
* Checks whether the bucket exists and the user has rights to perform the `ListBucket` operation
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketHEAD.html
* @param bucketName bucket name
* @param materializer materializer to run with
* @param attributes attributes to run request with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[BucketAccess]]
*/
def checkIfBucketExists(bucketName: String,
materializer: Materializer,
attributes: Attributes): CompletionStage[BucketAccess] =
S3Stream.checkIfBucketExistsF(bucketName)(materializer, attributes).toJava

/**
* Checks whether the bucket exits and user has rights to perform ListBucket operation
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketHEAD.html
* @param bucketName bucket name
* @param materializer materializer to run with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[BucketAccess]]
*/
def checkIfBucketExists(bucketName: String, materializer: Materializer): CompletionStage[BucketAccess] =
S3Stream.checkIfBucketExistsF(bucketName)(materializer, Attributes()).toJava

/**
* Checks whether the bucket exits and user has rights to perform ListBucket operation
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketHEAD.html
* @param bucketName bucket name
* @return [[akka.stream.javadsl.Source Source]] of type [[BucketAccess]]
*/
def checkIfBucketExists(bucketName: String): Source[BucketAccess, NotUsed] =
S3Stream.checkIfBucketExists(bucketName).asJava

private def func[T, R](f: T => R) = new akka.japi.function.Function[T, R] {
override def apply(param: T): R = f(param)
}
Expand Down
20 changes: 20 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -464,3 +464,23 @@ final class ObjectMetadata private (
object ObjectMetadata {
def apply(metadata: Seq[HttpHeader]) = new ObjectMetadata(metadata)
}

/**
* While checking for bucket access those responses are available
* 1) AccessDenied - User does have permission to perform ListBucket operation, so bucket exits
* 2) AccessGranted - User doesn't have rights to perform ListBucket but bucket exits
* 3) NotExists - Bucket doesn't exit
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketHEAD.html
*/
sealed class BucketAccess

object BucketAccess {
case object AccessDenied extends BucketAccess
case object AccessGranted extends BucketAccess
case object NotExists extends BucketAccess

val accessDenied: BucketAccess = AccessDenied
val accessGranted: BucketAccess = AccessGranted
val notExists: BucketAccess = NotExists
}
Loading

0 comments on commit dd90286

Please sign in to comment.