Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add S3 put plus get bucketWithVersioning API #84

Merged
merged 1 commit into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,21 @@ import pekko.http.scaladsl.model.Uri.{ Authority, Query }
import pekko.http.scaladsl.model.headers.{ `Raw-Request-URI`, Host, RawHeader }
import pekko.http.scaladsl.model.{ RequestEntity, _ }
import pekko.stream.connectors.s3.AccessStyle.{ PathAccessStyle, VirtualHostAccessStyle }
import pekko.stream.connectors.s3.{ ApiVersion, MultipartUpload, S3Settings }
import pekko.stream.connectors.s3.{
ApiVersion,
BucketVersioning,
BucketVersioningStatus,
MFAStatus,
MultipartUpload,
S3Settings
}
import pekko.stream.scaladsl.Source
import pekko.util.ByteString
import software.amazon.awssdk.regions.Region

import scala.collection.immutable.Seq
import scala.concurrent.{ ExecutionContext, Future }
import scala.xml.NodeSeq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one minor quibble - scala.xml is a bit unloved, the use here is small but it might be better just to use Java's in-built XML support (JAXP API)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I will keep using Scala xml because that is what's used throughout the codebase, can always file an issue about it


/**
* Internal Api
Expand Down Expand Up @@ -178,6 +186,47 @@ import scala.concurrent.{ ExecutionContext, Future }
s3Request(s3Location = s3Location, method = method)
.withDefaultHeaders(headers)

def bucketVersioningRequest(bucket: String, mfaStatus: Option[MFAStatus], method: HttpMethod,
headers: Seq[HttpHeader] = Nil)(
implicit conf: S3Settings): HttpRequest = {

val confWithVirtualHost = conf.withAccessStyle(VirtualHostAccessStyle)
val authority = requestAuthority(bucket, conf.s3RegionProvider.getRegion)(confWithVirtualHost)

val finalHeaders = (mfaStatus, method) match {
case (Some(mfaEnabled: MFAStatus.Enabled), HttpMethods.PUT) =>
RawHeader("x-amz-mfa", s"${mfaEnabled.mfa.serialNumber} ${mfaEnabled.mfa.tokenCode}") +: headers
case _ => headers
}

HttpRequest(method)
.withHeaders(Host(authority) +: finalHeaders)
.withUri(requestUri(bucket, None)(confWithVirtualHost).withAuthority(authority).withQuery(Query("versioning")))
}

def putBucketVersioningPayload(bucketVersioning: BucketVersioning)(
implicit ec: ExecutionContext): Future[RequestEntity] = {

val status = bucketVersioning.status.map {
case BucketVersioningStatus.Enabled => <Status>Enabled</Status>
case BucketVersioningStatus.Suspended => <Status>Suspended</Status>
}.getOrElse(NodeSeq.Empty)

val mfaDelete = bucketVersioning.mfaDelete.map {
case _: MFAStatus.Enabled => <MfaDelete>Enabled</MfaDelete>
case MFAStatus.Disabled => <MfaDelete>Disabled</MfaDelete>
}.getOrElse(NodeSeq.Empty)

// @formatter:off
val payload = <VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
{status}
{mfaDelete}
</VersioningConfiguration>
// @formatter:on

Marshal(payload).to[RequestEntity]
}

def uploadManagementRequest(
s3Location: S3Location,
uploadId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ import scala.xml.NodeSeq
@InternalApi private[impl] object Marshalling {
import ScalaXmlSupport._

implicit val bucketVersioningUnmarshaller: FromEntityUnmarshaller[BucketVersioningResult] = {
nodeSeqUnmarshaller(MediaTypes.`application/xml`, ContentTypes.`application/octet-stream`).map {
case NodeSeq.Empty => throw Unmarshaller.NoContentException
case x =>
val status = (x \ "Status").headOption.map(_.text match {
case "Enabled" => BucketVersioningStatus.Enabled
case "Suspended" => BucketVersioningStatus.Suspended
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I am slightly torn on what to do here. As explained previously I have an impression that there could be a chance that a new value gets added to this field (since its Suspended rather than just being Disabled). If this happens then you would get a match runtime error here, there are alternatives however any of these alternatives would be a completely new case for Marshalling.scala, i.e.

  • If we get a value other than "Enabled"/"Suspended" we can just return "Suspended" but then log an error saying that we discovered a new field. This means we won't break end users but it might also cause unintended behaviour
  • Throw a proper exception rather than just a match error, no such exception actually exists currently since this would be a first

})
val MFADelete = (x \ "MfaDelete").headOption.map(_.exists(_.text == "Enabled"))
BucketVersioningResult(status, MFADelete)
}
}

implicit val multipartUploadUnmarshaller: FromEntityUnmarshaller[MultipartUpload] = {
nodeSeqUnmarshaller(MediaTypes.`application/xml`, ContentTypes.`application/octet-stream`).map {
case NodeSeq.Empty => throw Unmarshaller.NoContentException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,13 @@ import org.apache.pekko.annotation.InternalApi
* Internal Api
*/
@InternalApi private[s3] case object CheckBucket extends S3Request

/**
* Internal Api
*/
@InternalApi private[s3] case object PutBucketVersioning extends S3Request

/**
* Internal Api
*/
@InternalApi private[s3] case object GetBucketVersioning extends S3Request
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,47 @@ import scala.util.{ Failure, Success, Try }
attr: Attributes): Future[Done] =
deleteUploadSource(bucket, key, uploadId, headers).withAttributes(attr).runWith(Sink.ignore)

private def bucketVersioningRequest(bucket: String, mfaStatus: Option[MFAStatus], headers: S3Headers)(
method: HttpMethod,
conf: S3Settings): HttpRequest =
HttpRequests.bucketVersioningRequest(bucket, mfaStatus, method, headers.headers)(conf)

def putBucketVersioningSource(
bucket: String, bucketVersioning: BucketVersioning, headers: S3Headers): Source[Done, NotUsed] =
s3ManagementRequest[Done](
bucket = bucket,
method = HttpMethods.PUT,
httpRequest = bucketVersioningRequest(bucket, bucketVersioning.mfaDelete, headers),
headers.headersFor(PutBucketVersioning),
process = processS3LifecycleResponse,
httpEntity = Some(putBucketVersioningPayload(bucketVersioning)(ExecutionContexts.parasitic)))

def putBucketVersioning(bucket: String, bucketVersioning: BucketVersioning, headers: S3Headers)(
implicit mat: Materializer,
attr: Attributes): Future[Done] =
putBucketVersioningSource(bucket, bucketVersioning, headers).withAttributes(attr).runWith(Sink.ignore)

def getBucketVersioningSource(
bucket: String, headers: S3Headers): Source[BucketVersioningResult, NotUsed] =
s3ManagementRequest[BucketVersioningResult](
bucket = bucket,
method = HttpMethods.GET,
httpRequest = bucketVersioningRequest(bucket, None, headers),
headers.headersFor(GetBucketVersioning),
process = { (response: HttpResponse, mat: Materializer) =>
response match {
case HttpResponse(status, _, entity, _) if status.isSuccess() =>
Unmarshal(entity).to[BucketVersioningResult](implicitly, ExecutionContexts.parasitic, mat)
case response: HttpResponse =>
unmarshalError(response.status, response.entity)(mat)
}
})

def getBucketVersioning(bucket: String, headers: S3Headers)(
implicit mat: Materializer,
attr: Attributes): Future[BucketVersioningResult] =
getBucketVersioningSource(bucket, headers).withAttributes(attr).runWith(Sink.head)

private def s3ManagementRequest[T](
bucket: String,
method: HttpMethod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1633,4 +1633,124 @@ object S3 {
private def func[T, R](f: T => R) = new pekko.japi.function.Function[T, R] {
override def apply(param: T): R = f(param)
}

/**
* Sets the versioning state of an existing bucket.
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
* @param bucketName Bucket name
* @param bucketVersioning The state that you want to update
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[Done]] as API doesn't return any additional information
*/
def putBucketVersioning(bucketName: String, bucketVersioning: BucketVersioning)(
implicit system: ClassicActorSystemProvider,
attributes: Attributes = Attributes()): CompletionStage[Done] =
putBucketVersioning(bucketName, bucketVersioning, S3Headers.empty)

/**
* Sets the versioning state of an existing bucket.
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
* @param bucketName Bucket name
* @param bucketVersioning The state that you want to update
* @param s3Headers any headers you want to add
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[Done]] as API doesn't return any additional information
*/
def putBucketVersioning(
bucketName: String,
bucketVersioning: BucketVersioning,
s3Headers: S3Headers)(
implicit system: ClassicActorSystemProvider, attributes: Attributes): CompletionStage[Done] =
S3Stream
.putBucketVersioning(bucketName, bucketVersioning, s3Headers)(SystemMaterializer(system).materializer, attributes)
.toJava

/**
* Sets the versioning state of an existing bucket.
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
* @param bucketName Bucket name
* @param bucketVersioning The state that you want to update
* @return [[pekko.stream.javadsl.Source Source]] of type [[Done]] as API doesn't return any additional information
*/
def putBucketVersioningSource(bucketName: String, bucketVersioning: BucketVersioning): Source[Done, NotUsed] =
putBucketVersioningSource(bucketName, bucketVersioning, S3Headers.empty)

/**
* Sets the versioning state of an existing bucket.
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
* @param bucketName Bucket name
* @param bucketVersioning The state that you want to update
* @param s3Headers any headers you want to add
* @return [[pekko.stream.javadsl.Source Source]] of type [[Done]] as API doesn't return any additional information
*/
def putBucketVersioningSource(bucketName: String,
bucketVersioning: BucketVersioning,
s3Headers: S3Headers): Source[Done, NotUsed] =
S3Stream.putBucketVersioningSource(bucketName, bucketVersioning, s3Headers).asJava

/**
* Gets the versioning of an existing bucket
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
* @param bucketName Bucket name
* @param system the actor system which provides the materializer to run with
* @param attributes attributes to run request with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[BucketVersioningResult]]
*/
def getBucketVersioning(bucketName: String,
system: ClassicActorSystemProvider,
attributes: Attributes): CompletionStage[BucketVersioningResult] =
getBucketVersioning(bucketName, system, attributes, S3Headers.empty)

/**
* Gets the versioning of an existing bucket
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
* @param bucketName Bucket name
* @param system the actor system which provides the materializer to run with
* @param attributes attributes to run request with
* @param s3Headers any headers you want to add
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[BucketVersioningResult]]
*/
def getBucketVersioning(bucketName: String,
system: ClassicActorSystemProvider,
attributes: Attributes,
s3Headers: S3Headers): CompletionStage[BucketVersioningResult] =
S3Stream.getBucketVersioning(bucketName, s3Headers)(SystemMaterializer(system).materializer, attributes).toJava

/**
* Gets the versioning of an existing bucket
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
* @param bucketName Bucket name
* @param system the actor system which provides the materializer to run with
* @return [[java.util.concurrent.CompletionStage CompletionStage]] of type [[BucketVersioningResult]]
*/
def getBucketVersioning(
bucketName: String, system: ClassicActorSystemProvider): CompletionStage[BucketVersioningResult] =
getBucketVersioning(bucketName, system, Attributes(), S3Headers.empty)

/**
* Gets the versioning of an existing bucket
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
* @param bucketName Bucket name
* @return [[pekko.stream.javadsl.Source Source]] of type [[BucketVersioningResult]]
*/
def getBucketVersioningSource(bucketName: String): Source[BucketVersioningResult, NotUsed] =
getBucketVersioningSource(bucketName, S3Headers.empty)

/**
* Gets the versioning of an existing bucket
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
* @param bucketName Bucket name
* @param s3Headers any headers you want to add
* @return [[pekko.stream.javadsl.Source Source]] of type [[BucketVersioningResult]]
*/
def getBucketVersioningSource(bucketName: String, s3Headers: S3Headers): Source[BucketVersioningResult, NotUsed] =
S3Stream.getBucketVersioningSource(bucketName, s3Headers).asJava

}