Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala2.12 support #65

Merged
merged 7 commits into from
Mar 17, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import pekko.http.scaladsl.unmarshalling.{ Unmarshal, Unmarshaller }
import pekko.http.scaladsl.{ ClientTransport, Http }
import pekko.stream.connectors.s3.BucketAccess.{ AccessDenied, AccessGranted, NotExists }
import pekko.stream.connectors.s3._
import pekko.stream.connectors.s3.headers.ServerSideEncryption
import pekko.stream.connectors.s3.impl.auth.{ CredentialScope, Signer, SigningKey }
import pekko.stream.scaladsl.{ Flow, Keep, RetryFlow, RunnableGraph, Sink, Source, Tcp }
import pekko.stream.{ Attributes, Materializer }
Expand Down Expand Up @@ -57,13 +58,13 @@ import scala.util.{ Failure, Success, Try }
versionId: Option[String] = None)

/** Internal Api */
@InternalApi private[impl] final case class ListBucketsResult(buckets: Seq[ListBucketsResultContents])
@InternalApi private[impl] final case class ListBucketsResult(buckets: immutable.Seq[ListBucketsResultContents])

/** Internal Api */
@InternalApi private[impl] final case class ListBucketResult(isTruncated: Boolean,
continuationToken: Option[String],
contents: Seq[ListBucketResultContents],
commonPrefixes: Seq[ListBucketResultCommonPrefixes])
contents: immutable.Seq[ListBucketResultContents],
commonPrefixes: immutable.Seq[ListBucketResultCommonPrefixes])

/** Internal Api */
@InternalApi private[impl] final case class ListMultipartUploadContinuationToken(nextKeyMarker: Option[String],
Expand All @@ -79,8 +80,8 @@ import scala.util.{ Failure, Success, Try }
delimiter: Option[String],
maxUploads: Int,
isTruncated: Boolean,
uploads: Seq[ListMultipartUploadResultUploads],
commonPrefixes: Seq[CommonPrefixes]) {
uploads: immutable.Seq[ListMultipartUploadResultUploads],
commonPrefixes: immutable.Seq[CommonPrefixes]) {

/**
* The continuation token for listing MultipartUpload is a union of both the nextKeyMarker
Expand Down Expand Up @@ -111,9 +112,9 @@ import scala.util.{ Failure, Success, Try }
delimiter: Option[String],
maxKeys: Int,
isTruncated: Boolean,
versions: Seq[ListObjectVersionsResultVersions],
commonPrefixes: Seq[CommonPrefixes],
deleteMarkers: Seq[DeleteMarkers]) {
versions: immutable.Seq[ListObjectVersionsResultVersions],
commonPrefixes: immutable.Seq[CommonPrefixes],
deleteMarkers: immutable.Seq[DeleteMarkers]) {

/**
* The continuation token for listing ObjectVersions is a union of both the nextKeyMarker
Expand All @@ -135,7 +136,7 @@ import scala.util.{ Failure, Success, Try }
nextPartNumberMarker: Option[Int],
maxParts: Int,
isTruncated: Boolean,
parts: Seq[ListPartsResultParts],
parts: immutable.Seq[ListPartsResultParts],
initiator: Option[AWSIdentity],
owner: Option[AWSIdentity],
storageClass: String) {
Expand Down Expand Up @@ -198,7 +199,7 @@ import scala.util.{ Failure, Success, Try }
.mapAsync(parallelism = 1)(entityForSuccess)
.map {
case (entity, headers) =>
Some((entity.dataBytes.mapMaterializedValue(_ => NotUsed), computeMetaData(headers, entity)))
Some((entity.dataBytes.mapMaterializedValue(_ => NotUsed), computeMetaData(headers.toIndexedSeq, entity)))
}
.recover[Option[(Source[ByteString, NotUsed], ObjectMetadata)]] {
case e: S3Exception if e.code == "NoSuchKey" => None
Expand All @@ -222,7 +223,7 @@ import scala.util.{ Failure, Success, Try }
.mapAsync(parallelism = 1)(entityForSuccess)
.flatMapConcat {
case (entity, headers) =>
objectMetadataMat.success(computeMetaData(headers, entity))
objectMetadataMat.success(computeMetaData(headers.toIndexedSeq, entity))
entity.dataBytes
}
.mapError {
Expand Down Expand Up @@ -316,7 +317,8 @@ import scala.util.{ Failure, Success, Try }
implicit val materializer: Materializer = mat
implicit val attributes: Attributes = attr
Source
.unfoldAsync[ListBucketState, (Seq[ListBucketResultContents], Seq[ListBucketResultCommonPrefixes])](
.unfoldAsync[ListBucketState, (immutable.Seq[ListBucketResultContents], immutable.Seq[
ListBucketResultCommonPrefixes])](
Starting()) {
case Finished() => Future.successful(None)
case Starting() => listBucketCallContentsAndCommonPrefixes(None)
Expand Down Expand Up @@ -388,7 +390,8 @@ import scala.util.{ Failure, Success, Try }
implicit val materializer: Materializer = mat
implicit val attributes: Attributes = attr
Source
.unfoldAsync[ListMultipartUploadState, (Seq[ListMultipartUploadResultUploads], Seq[CommonPrefixes])](
.unfoldAsync[ListMultipartUploadState, (immutable.Seq[ListMultipartUploadResultUploads], immutable.Seq[
CommonPrefixes])](
Starting()) {
case Finished() => Future.successful(None)
case Starting() => listMultipartUploadCallContentsAndCommonPrefixes(None)
Expand Down Expand Up @@ -516,7 +519,8 @@ import scala.util.{ Failure, Success, Try }
implicit val materializer: Materializer = mat
implicit val attributes: Attributes = attr
Source
.unfoldAsync[ListObjectVersionsState, (Seq[ListObjectVersionsResultVersions], Seq[DeleteMarkers], Seq[
.unfoldAsync[ListObjectVersionsState, (immutable.Seq[ListObjectVersionsResultVersions], immutable.Seq[
DeleteMarkers], immutable.Seq[
CommonPrefixes])](
Starting()) {
case Finished() => Future.successful(None)
Expand Down Expand Up @@ -548,7 +552,8 @@ import scala.util.{ Failure, Success, Try }
implicit val materializer: Materializer = mat
implicit val attributes: Attributes = attr
Source
.unfoldAsync[ListObjectVersionsState, (Seq[ListObjectVersionsResultVersions], Seq[DeleteMarkers])](
.unfoldAsync[ListObjectVersionsState, (immutable.Seq[ListObjectVersionsResultVersions], immutable.Seq[
DeleteMarkers])](
Starting()) {
case Finished() => Future.successful(None)
case Starting() => listObjectVersionsCallOnlyVersions(None)
Expand Down Expand Up @@ -577,9 +582,9 @@ import scala.util.{ Failure, Success, Try }
}
case HttpResponse(NotFound, _, entity, _) =>
Source.future(entity.discardBytes().future().map(_ => None)(ExecutionContexts.parasitic))
case response: HttpResponse =>
case HttpResponse(code, _, entity, _) =>
Source.future {
unmarshalError(response.status, response.entity)
unmarshalError(code, entity)
}
}
}
Expand All @@ -602,9 +607,9 @@ import scala.util.{ Failure, Success, Try }
.flatMapConcat {
case HttpResponse(NoContent, _, entity, _) =>
Source.future(entity.discardBytes().future().map(_ => Done)(ExecutionContexts.parasitic))
case response: HttpResponse =>
case HttpResponse(code, _, entity, _) =>
Source.future {
unmarshalError(response.status, response.entity)
unmarshalError(code, entity)
}
}
}
Expand Down Expand Up @@ -665,9 +670,9 @@ import scala.util.{ Failure, Success, Try }
ObjectMetadata(h :+ `Content-Length`(entity.contentLengthOption.getOrElse(0)))
}
}
case response: HttpResponse =>
case HttpResponse(code, _, entity, _) =>
Source.future {
unmarshalError(response.status, response.entity)
unmarshalError(code, entity)
}
}
}
Expand All @@ -681,7 +686,7 @@ import scala.util.{ Failure, Success, Try }
s3Headers: Seq[HttpHeader] = Seq.empty): Source[HttpResponse, NotUsed] =
Source
.fromMaterializer { (mat, attr) =>
issueRequest(s3Location, method, rangeOption, versionId, s3Headers)(mat, attr)
issueRequest(s3Location, method, rangeOption, versionId, s3Headers.toIndexedSeq)(mat, attr)
}
.mapMaterializedValue(_ => NotUsed)

Expand All @@ -690,7 +695,8 @@ import scala.util.{ Failure, Success, Try }
method: HttpMethod = HttpMethods.GET,
rangeOption: Option[ByteRange] = None,
versionId: Option[String],
s3Headers: Seq[HttpHeader])(implicit mat: Materializer, attr: Attributes): Source[HttpResponse, NotUsed] = {
s3Headers: immutable.Seq[HttpHeader])(
implicit mat: Materializer, attr: Attributes): Source[HttpResponse, NotUsed] = {
implicit val sys: ActorSystem = mat.system
implicit val conf: S3Settings = resolveSettings(attr, sys)
signAndRequest(requestHeaders(getDownloadRequest(s3Location, method, s3Headers, versionId), rangeOption))
Expand Down Expand Up @@ -814,8 +820,8 @@ import scala.util.{ Failure, Success, Try }
response match {
case HttpResponse(status, _, entity, _) if status.isSuccess() =>
entity.discardBytes().future()
case response: HttpResponse =>
unmarshalError(response.status, response.entity)
case HttpResponse(code, _, entity, _) =>
unmarshalError(code, entity)
}
}

Expand All @@ -836,8 +842,8 @@ import scala.util.{ Failure, Success, Try }
case StatusCodes.OK => AccessGranted
case other => throw new IllegalArgumentException(s"received status $other")
})
case response: HttpResponse =>
unmarshalError(response.status, response.entity)
case HttpResponse(code, _, entity, _) =>
unmarshalError(code, entity)
}
}

Expand All @@ -851,7 +857,7 @@ import scala.util.{ Failure, Success, Try }
chunkSize: Int = MinChunkSize,
chunkingParallelism: Int = 4): Sink[ByteString, Future[MultipartUploadResult]] =
chunkAndRequest(s3Location, contentType, s3Headers, chunkSize)(chunkingParallelism)
.toMat(completionSink(s3Location, s3Headers))(Keep.right)
.toMat(completionSink(s3Location, s3Headers.serverSideEncryption))(Keep.right)

/**
* Uploads a stream of ByteStrings along with a context to a specified location as a multipart upload. The
Expand All @@ -865,7 +871,7 @@ import scala.util.{ Failure, Success, Try }
chunkSize: Int = MinChunkSize,
chunkingParallelism: Int = 4): Sink[(ByteString, C), Future[MultipartUploadResult]] =
chunkAndRequestWithContext[C](s3Location, contentType, s3Headers, chunkSize, chunkUploadSink)(chunkingParallelism)
.toMat(completionSink(s3Location, s3Headers))(Keep.right)
.toMat(completionSink(s3Location, s3Headers.serverSideEncryption))(Keep.right)

/**
* Resumes a previously created a multipart upload by uploading a stream of ByteStrings to a specified location
Expand All @@ -884,7 +890,7 @@ import scala.util.{ Failure, Success, Try }
}
chunkAndRequest(s3Location, contentType, s3Headers, chunkSize, initialUpload)(chunkingParallelism)
.prepend(Source(successfulParts))
.toMat(completionSink(s3Location, s3Headers))(Keep.right)
.toMat(completionSink(s3Location, s3Headers.serverSideEncryption))(Keep.right)
}

/**
Expand All @@ -907,7 +913,7 @@ import scala.util.{ Failure, Success, Try }
}
chunkAndRequestWithContext[C](s3Location, contentType, s3Headers, chunkSize, chunkUploadSink, initialUpload)(
chunkingParallelism).prepend(Source(successfulParts))
.toMat(completionSink(s3Location, s3Headers))(Keep.right)
.toMat(completionSink(s3Location, s3Headers.serverSideEncryption))(Keep.right)
}

def completeMultipartUpload(
Expand All @@ -919,13 +925,13 @@ import scala.util.{ Failure, Success, Try }
SuccessfulUploadPart(MultipartUpload(s3Location.bucket, s3Location.key, uploadId), part.partNumber, part.eTag)
}
Source(successfulParts)
.toMat(completionSink(s3Location, s3Headers).withAttributes(attr))(Keep.right)
.toMat(completionSink(s3Location, s3Headers.serverSideEncryption).withAttributes(attr))(Keep.right)
.run()
}

private def initiateMultipartUpload(s3Location: S3Location,
contentType: ContentType,
s3Headers: Seq[HttpHeader]): Source[MultipartUpload, NotUsed] =
s3Headers: immutable.Seq[HttpHeader]): Source[MultipartUpload, NotUsed] =
Source
.fromMaterializer { (mat, attr) =>
implicit val materializer: Materializer = mat
Expand All @@ -939,9 +945,9 @@ import scala.util.{ Failure, Success, Try }
signAndRequest(req).flatMapConcat {
case HttpResponse(status, _, entity, _) if status.isSuccess() =>
Source.future(Unmarshal(entity).to[MultipartUpload])
case response: HttpResponse =>
case HttpResponse(code, _, entity, _) =>
Source.future {
unmarshalError(response.status, response.entity)
unmarshalError(code, entity)
}
}
}
Expand Down Expand Up @@ -971,10 +977,10 @@ import scala.util.{ Failure, Success, Try }

// The individual copy upload part requests are processed here
processUploadCopyPartRequests(copyRequests)(chunkingParallelism)
.toMat(completionSink(targetLocation, s3Headers))(Keep.right)
.toMat(completionSink(targetLocation, s3Headers.serverSideEncryption))(Keep.right)
}

private def computeMetaData(headers: Seq[HttpHeader], entity: ResponseEntity): ObjectMetadata =
private def computeMetaData(headers: immutable.Seq[HttpHeader], entity: ResponseEntity): ObjectMetadata =
ObjectMetadata(
headers ++
Seq(
Expand All @@ -994,7 +1000,9 @@ import scala.util.{ Failure, Success, Try }
override def renderInResponses(): Boolean = true
}

private def completeMultipartUpload(s3Location: S3Location, parts: Seq[SuccessfulUploadPart], s3Headers: S3Headers)(
private def completeMultipartUpload(s3Location: S3Location,
parts: immutable.Seq[SuccessfulUploadPart],
sse: Option[ServerSideEncryption])(
implicit mat: Materializer,
attr: Attributes): Future[CompleteMultipartUploadResult] = {
def populateResult(result: CompleteMultipartUploadResult,
Expand All @@ -1006,7 +1014,7 @@ import scala.util.{ Failure, Success, Try }
import mat.executionContext
implicit val conf: S3Settings = resolveSettings(attr, mat.system)

val headers = s3Headers.headersFor(UploadPart)
val headers = sse.toIndexedSeq.flatMap(_.headersFor(UploadPart))

Source
.future(
Expand All @@ -1020,11 +1028,11 @@ import scala.util.{ Failure, Success, Try }
*/
private def initiateUpload(s3Location: S3Location,
contentType: ContentType,
s3Headers: Seq[HttpHeader]): Source[(MultipartUpload, Int), NotUsed] =
s3Headers: immutable.Seq[HttpHeader]): Source[(MultipartUpload, Int), NotUsed] =
Source
.single(s3Location)
.flatMapConcat(initiateMultipartUpload(_, contentType, s3Headers))
.mapConcat(r => LazyList.continually(r))
.mapConcat(r => Stream.continually(r))
.zip(Source.fromIterator(() => Iterator.from(1)))

private def poolSettings(implicit settings: S3Settings, system: ActorSystem) =
Expand Down Expand Up @@ -1276,7 +1284,7 @@ import scala.util.{ Failure, Success, Try }
Source
.single(s3Location)
.flatMapConcat(_ => Source.single(MultipartUpload(s3Location.bucket, s3Location.key, uploadId)))
.mapConcat(r => LazyList.continually(r))
.mapConcat(r => Stream.continually(r))
.zip(Source.fromIterator(() => Iterator.from(initialIndex)))
case None =>
// First step of the multi part upload process is made.
Expand Down Expand Up @@ -1324,7 +1332,7 @@ import scala.util.{ Failure, Success, Try }

private def completionSink(
s3Location: S3Location,
s3Headers: S3Headers): Sink[UploadPartResponse, Future[MultipartUploadResult]] =
sse: Option[ServerSideEncryption]): Sink[UploadPartResponse, Future[MultipartUploadResult]] =
Sink
.fromMaterializer { (mat, attr) =>
implicit val materializer: Materializer = mat
Expand All @@ -1342,10 +1350,10 @@ import scala.util.{ Failure, Success, Try }
} else if (failures.isEmpty) {
Future.successful(successes.sortBy(_.partNumber))
} else {
Future.failed(FailedUpload(failures.map(_.exception)))
Future.failed(FailedUpload(failures.map(_.exception).toIndexedSeq))
}
}
.flatMap(completeMultipartUpload(s3Location, _, s3Headers))
.flatMap(parts => completeMultipartUpload(s3Location, parts.toIndexedSeq, sse))
}
.mapMaterializedValue(_.map(r => MultipartUploadResult(r.location, r.bucket, r.key, r.eTag, r.versionId)))
}
Expand Down Expand Up @@ -1412,8 +1420,8 @@ import scala.util.{ Failure, Success, Try }
resp match {
case HttpResponse(status, headers, entity, _) if status.isSuccess() && !status.isRedirection() =>
Future.successful((entity, headers))
case response: HttpResponse =>
unmarshalError(response.status, response.entity)
case HttpResponse(code, _, entity, _) =>
unmarshalError(code, entity)
}
}

Expand All @@ -1439,7 +1447,7 @@ import scala.util.{ Failure, Success, Try }
val requestInfo: Source[(MultipartUpload, Int), NotUsed] =
initiateUpload(location, contentType, s3Headers.headersFor(InitiateMultipartUpload))

val headers = s3Headers.headersFor(CopyPart)
val headers = s3Headers.serverSideEncryption.toIndexedSeq.flatMap(_.headersFor(CopyPart))

Source
.fromMaterializer { (mat, attr) =>
Expand Down