Skip to content

Commit

Permalink
Add S3 put plus get bucketWithVersioning API
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Apr 25, 2023
1 parent b9264dc commit 38b5d02
Show file tree
Hide file tree
Showing 9 changed files with 606 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,21 @@ import pekko.http.scaladsl.model.Uri.{ Authority, Query }
import pekko.http.scaladsl.model.headers.{ `Raw-Request-URI`, Host, RawHeader }
import pekko.http.scaladsl.model.{ RequestEntity, _ }
import pekko.stream.connectors.s3.AccessStyle.{ PathAccessStyle, VirtualHostAccessStyle }
import pekko.stream.connectors.s3.{ ApiVersion, MultipartUpload, S3Settings }
import pekko.stream.connectors.s3.{
ApiVersion,
BucketVersioning,
BucketVersioningStatus,
MFAStatus,
MultipartUpload,
S3Settings
}
import pekko.stream.scaladsl.Source
import pekko.util.ByteString
import software.amazon.awssdk.regions.Region

import scala.collection.immutable.Seq
import scala.concurrent.{ ExecutionContext, Future }
import scala.xml.NodeSeq

/**
* Internal Api
Expand Down Expand Up @@ -178,6 +186,47 @@ import scala.concurrent.{ ExecutionContext, Future }
s3Request(s3Location = s3Location, method = method)
.withDefaultHeaders(headers)

def bucketVersioningRequest(bucket: String, mfaStatus: Option[MFAStatus], method: HttpMethod,
headers: Seq[HttpHeader] = Nil)(
implicit conf: S3Settings): HttpRequest = {

val confWithVirtualHost = conf.withAccessStyle(VirtualHostAccessStyle)
val authority = requestAuthority(bucket, conf.s3RegionProvider.getRegion)(confWithVirtualHost)

val finalHeaders = (mfaStatus, method) match {
case (Some(mfaEnabled: MFAStatus.Enabled), HttpMethods.PUT) =>
RawHeader("x-amz-mfa", s"${mfaEnabled.mfa.serialNumber} ${mfaEnabled.mfa.tokenCode}") +: headers
case _ => headers
}

HttpRequest(method)
.withHeaders(Host(authority) +: finalHeaders)
.withUri(requestUri(bucket, None)(confWithVirtualHost).withAuthority(authority).withQuery(Query("versioning")))
}

def putBucketVersioningPayload(bucketVersioning: BucketVersioning)(
implicit ec: ExecutionContext): Future[RequestEntity] = {

val status = bucketVersioning.status.map {
case BucketVersioningStatus.Enabled => <Status>Enabled</Status>
case BucketVersioningStatus.Suspended => <Status>Suspended</Status>
}.getOrElse(NodeSeq.Empty)

val mfaDelete = bucketVersioning.mfaDelete.map {
case _: MFAStatus.Enabled => <MfaDelete>Enabled</MfaDelete>
case MFAStatus.Disabled => <MfaDelete>Disabled</MfaDelete>
}.getOrElse(NodeSeq.Empty)

// @formatter:off
val payload = <VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
{status}
{mfaDelete}
</VersioningConfiguration>
// @formatter:on

Marshal(payload).to[RequestEntity]
}

def uploadManagementRequest(
s3Location: S3Location,
uploadId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ import scala.xml.NodeSeq
@InternalApi private[impl] object Marshalling {
import ScalaXmlSupport._

implicit val bucketVersioningUnmarshaller: FromEntityUnmarshaller[BucketVersioningResult] = {
nodeSeqUnmarshaller(MediaTypes.`application/xml`, ContentTypes.`application/octet-stream`).map {
case NodeSeq.Empty => throw Unmarshaller.NoContentException
case x =>
val status = (x \ "Status").headOption.map(_.text match {
case "Enabled" => BucketVersioningStatus.Enabled
case "Suspended" => BucketVersioningStatus.Suspended
})
val MFADelete = (x \ "MfaDelete").headOption.map(_.exists(_.text == "Enabled"))
BucketVersioningResult(status, MFADelete)
}
}

implicit val multipartUploadUnmarshaller: FromEntityUnmarshaller[MultipartUpload] = {
nodeSeqUnmarshaller(MediaTypes.`application/xml`, ContentTypes.`application/octet-stream`).map {
case NodeSeq.Empty => throw Unmarshaller.NoContentException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,13 @@ import org.apache.pekko.annotation.InternalApi
* Internal Api
*/
@InternalApi private[s3] case object CheckBucket extends S3Request

/**
* Internal Api
*/
@InternalApi private[s3] case object PutBucketVersioning extends S3Request

/**
* Internal Api
*/
@InternalApi private[s3] case object GetBucketVersioning extends S3Request
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,47 @@ import scala.util.{ Failure, Success, Try }
attr: Attributes): Future[Done] =
deleteUploadSource(bucket, key, uploadId, headers).withAttributes(attr).runWith(Sink.ignore)

private def bucketVersioningRequest(bucket: String, mfaStatus: Option[MFAStatus], headers: S3Headers)(
method: HttpMethod,
conf: S3Settings): HttpRequest =
HttpRequests.bucketVersioningRequest(bucket, mfaStatus, method, headers.headers)(conf)

def putBucketVersioningSource(
bucket: String, bucketVersioning: BucketVersioning, headers: S3Headers): Source[Done, NotUsed] =
s3ManagementRequest[Done](
bucket = bucket,
method = HttpMethods.PUT,
httpRequest = bucketVersioningRequest(bucket, bucketVersioning.mfaDelete, headers),
headers.headersFor(PutBucketVersioning),
process = processS3LifecycleResponse,
httpEntity = Some(putBucketVersioningPayload(bucketVersioning)(ExecutionContexts.parasitic)))

def putBucketVersioning(bucket: String, bucketVersioning: BucketVersioning, headers: S3Headers)(
implicit mat: Materializer,
attr: Attributes): Future[Done] =
putBucketVersioningSource(bucket, bucketVersioning, headers).withAttributes(attr).runWith(Sink.ignore)

def getBucketVersioningSource(
bucket: String, headers: S3Headers): Source[BucketVersioningResult, NotUsed] =
s3ManagementRequest[BucketVersioningResult](
bucket = bucket,
method = HttpMethods.GET,
httpRequest = bucketVersioningRequest(bucket, None, headers),
headers.headersFor(GetBucketVersioning),
process = { (response: HttpResponse, mat: Materializer) =>
response match {
case HttpResponse(status, _, entity, _) if status.isSuccess() =>
Unmarshal(entity).to[BucketVersioningResult](implicitly, ExecutionContexts.parasitic, mat)
case response: HttpResponse =>
unmarshalError(response.status, response.entity)(mat)
}
})

def getBucketVersioning(bucket: String, headers: S3Headers)(
implicit mat: Materializer,
attr: Attributes): Future[BucketVersioningResult] =
getBucketVersioningSource(bucket, headers).withAttributes(attr).runWith(Sink.head)

private def s3ManagementRequest[T](
bucket: String,
method: HttpMethod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1633,4 +1633,124 @@ object S3 {
private def func[T, R](f: T => R) = new pekko.japi.function.Function[T, R] {
override def apply(param: T): R = f(param)
}

/**
* Sets the versioning state of an existing bucket.
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
* @param bucketName Bucket name
* @param bucketVersioning The state that you want to update
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[Done]] as API doesn't return any additional information
*/
def putBucketVersioning(bucketName: String, bucketVersioning: BucketVersioning)(
implicit system: ClassicActorSystemProvider,
attributes: Attributes = Attributes()): CompletionStage[Done] =
putBucketVersioning(bucketName, bucketVersioning, S3Headers.empty)

/**
* Sets the versioning state of an existing bucket.
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
* @param bucketName Bucket name
* @param bucketVersioning The state that you want to update
* @param s3Headers any headers you want to add
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[Done]] as API doesn't return any additional information
*/
def putBucketVersioning(
bucketName: String,
bucketVersioning: BucketVersioning,
s3Headers: S3Headers)(
implicit system: ClassicActorSystemProvider, attributes: Attributes): CompletionStage[Done] =
S3Stream
.putBucketVersioning(bucketName, bucketVersioning, s3Headers)(SystemMaterializer(system).materializer, attributes)
.toJava

/**
* Sets the versioning state of an existing bucket.
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
* @param bucketName Bucket name
* @param bucketVersioning The state that you want to update
* @return [[pekko.stream.javadsl.Source Source]] of type [[Done]] as API doesn't return any additional information
*/
def putBucketVersioningSource(bucketName: String, bucketVersioning: BucketVersioning): Source[Done, NotUsed] =
putBucketVersioningSource(bucketName, bucketVersioning, S3Headers.empty)

/**
* Sets the versioning state of an existing bucket.
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
* @param bucketName Bucket name
* @param bucketVersioning The state that you want to update
* @param s3Headers any headers you want to add
* @return [[pekko.stream.javadsl.Source Source]] of type [[Done]] as API doesn't return any additional information
*/
def putBucketVersioningSource(bucketName: String,
bucketVersioning: BucketVersioning,
s3Headers: S3Headers): Source[Done, NotUsed] =
S3Stream.putBucketVersioningSource(bucketName, bucketVersioning, s3Headers).asJava

/**
* Gets the versioning of an existing bucket
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
* @param bucketName Bucket name
* @param system the actor system which provides the materializer to run with
* @param attributes attributes to run request with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[BucketVersioningResult]]
*/
def getBucketVersioning(bucketName: String,
system: ClassicActorSystemProvider,
attributes: Attributes): CompletionStage[BucketVersioningResult] =
getBucketVersioning(bucketName, system, attributes, S3Headers.empty)

/**
* Gets the versioning of an existing bucket
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
* @param bucketName Bucket name
* @param system the actor system which provides the materializer to run with
* @param attributes attributes to run request with
* @param s3Headers any headers you want to add
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[BucketVersioningResult]]
*/
def getBucketVersioning(bucketName: String,
system: ClassicActorSystemProvider,
attributes: Attributes,
s3Headers: S3Headers): CompletionStage[BucketVersioningResult] =
S3Stream.getBucketVersioning(bucketName, s3Headers)(SystemMaterializer(system).materializer, attributes).toJava

/**
* Gets the versioning of an existing bucket
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
* @param bucketName Bucket name
* @param system the actor system which provides the materializer to run with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[BucketVersioningResult]]
*/
def getBucketVersioning(
bucketName: String, system: ClassicActorSystemProvider): CompletionStage[BucketVersioningResult] =
getBucketVersioning(bucketName, system, Attributes(), S3Headers.empty)

/**
* Gets the versioning of an existing bucket
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
* @param bucketName Bucket name
* @return [[pekko.stream.javadsl.Source Source]] of type [[BucketVersioningResult]]
*/
def getBucketVersioningSource(bucketName: String): Source[BucketVersioningResult, NotUsed] =
getBucketVersioningSource(bucketName, S3Headers.empty)

/**
* Gets the versioning of an existing bucket
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
* @param bucketName Bucket name
* @param s3Headers any headers you want to add
* @return [[pekko.stream.javadsl.Source Source]] of type [[BucketVersioningResult]]
*/
def getBucketVersioningSource(bucketName: String, s3Headers: S3Headers): Source[BucketVersioningResult, NotUsed] =
S3Stream.getBucketVersioningSource(bucketName, s3Headers).asJava

}

0 comments on commit 38b5d02

Please sign in to comment.