Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala2.12 support #65

Merged
merged 7 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion project/Common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ object Common extends AutoPlugin {
"-deprecation",
"-Xlint",
"-Ywarn-dead-code",
"-target:jvm-1.8"),
"-target:jvm-1.8",
"-Wconf:cat=unused-nowarn:s"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed because we use the @nowarn annotation to suppress a warning only for Scala 2.13 and without this scalac option it would complain that no warnings were suppressed for Scala 2.12.

Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
"-doc-title",
"Apache Pekko Connectors",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import pekko.stream.connectors.s3.headers.{ CannedAcl, ServerSideEncryption, Sto
import pekko.stream.connectors.s3.impl.S3Request

import scala.collection.immutable.Seq
import scala.collection.JavaConverters._
import org.apache.pekko.util.ccompat.JavaConverters._

final class MetaHeaders private (val metaHeaders: Map[String, String]) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package org.apache.pekko.stream.connectors.s3.impl

import java.net.InetSocketAddress
import java.time.{ Instant, ZoneOffset, ZonedDateTime }

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.annotation.InternalApi
Expand All @@ -36,6 +35,7 @@ import pekko.util.ByteString
import pekko.{ Done, NotUsed }
import software.amazon.awssdk.regions.Region

import scala.annotation.nowarn
import scala.collection.immutable
import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success, Try }
Expand All @@ -57,13 +57,13 @@ import scala.util.{ Failure, Success, Try }
versionId: Option[String] = None)

/** Internal Api */
@InternalApi private[impl] final case class ListBucketsResult(buckets: Seq[ListBucketsResultContents])
@InternalApi private[impl] final case class ListBucketsResult(buckets: immutable.Seq[ListBucketsResultContents])

/** Internal Api */
@InternalApi private[impl] final case class ListBucketResult(isTruncated: Boolean,
continuationToken: Option[String],
contents: Seq[ListBucketResultContents],
commonPrefixes: Seq[ListBucketResultCommonPrefixes])
contents: immutable.Seq[ListBucketResultContents],
commonPrefixes: immutable.Seq[ListBucketResultCommonPrefixes])

/** Internal Api */
@InternalApi private[impl] final case class ListMultipartUploadContinuationToken(nextKeyMarker: Option[String],
Expand All @@ -79,8 +79,8 @@ import scala.util.{ Failure, Success, Try }
delimiter: Option[String],
maxUploads: Int,
isTruncated: Boolean,
uploads: Seq[ListMultipartUploadResultUploads],
commonPrefixes: Seq[CommonPrefixes]) {
uploads: immutable.Seq[ListMultipartUploadResultUploads],
commonPrefixes: immutable.Seq[CommonPrefixes]) {

/**
* The continuation token for listing MultipartUpload is a union of both the nextKeyMarker
Expand Down Expand Up @@ -111,9 +111,9 @@ import scala.util.{ Failure, Success, Try }
delimiter: Option[String],
maxKeys: Int,
isTruncated: Boolean,
versions: Seq[ListObjectVersionsResultVersions],
commonPrefixes: Seq[CommonPrefixes],
deleteMarkers: Seq[DeleteMarkers]) {
versions: immutable.Seq[ListObjectVersionsResultVersions],
commonPrefixes: immutable.Seq[CommonPrefixes],
deleteMarkers: immutable.Seq[DeleteMarkers]) {

/**
* The continuation token for listing ObjectVersions is a union of both the nextKeyMarker
Expand All @@ -135,7 +135,7 @@ import scala.util.{ Failure, Success, Try }
nextPartNumberMarker: Option[Int],
maxParts: Int,
isTruncated: Boolean,
parts: Seq[ListPartsResultParts],
parts: immutable.Seq[ListPartsResultParts],
initiator: Option[AWSIdentity],
owner: Option[AWSIdentity],
storageClass: String) {
Expand Down Expand Up @@ -316,7 +316,8 @@ import scala.util.{ Failure, Success, Try }
implicit val materializer: Materializer = mat
implicit val attributes: Attributes = attr
Source
.unfoldAsync[ListBucketState, (Seq[ListBucketResultContents], Seq[ListBucketResultCommonPrefixes])](
.unfoldAsync[ListBucketState, (immutable.Seq[ListBucketResultContents], immutable.Seq[
ListBucketResultCommonPrefixes])](
Starting()) {
case Finished() => Future.successful(None)
case Starting() => listBucketCallContentsAndCommonPrefixes(None)
Expand Down Expand Up @@ -388,7 +389,8 @@ import scala.util.{ Failure, Success, Try }
implicit val materializer: Materializer = mat
implicit val attributes: Attributes = attr
Source
.unfoldAsync[ListMultipartUploadState, (Seq[ListMultipartUploadResultUploads], Seq[CommonPrefixes])](
.unfoldAsync[ListMultipartUploadState, (immutable.Seq[ListMultipartUploadResultUploads], immutable.Seq[
CommonPrefixes])](
Starting()) {
case Finished() => Future.successful(None)
case Starting() => listMultipartUploadCallContentsAndCommonPrefixes(None)
Expand Down Expand Up @@ -516,7 +518,8 @@ import scala.util.{ Failure, Success, Try }
implicit val materializer: Materializer = mat
implicit val attributes: Attributes = attr
Source
.unfoldAsync[ListObjectVersionsState, (Seq[ListObjectVersionsResultVersions], Seq[DeleteMarkers], Seq[
.unfoldAsync[ListObjectVersionsState, (immutable.Seq[ListObjectVersionsResultVersions], immutable.Seq[
DeleteMarkers], immutable.Seq[
CommonPrefixes])](
Starting()) {
case Finished() => Future.successful(None)
Expand Down Expand Up @@ -548,7 +551,8 @@ import scala.util.{ Failure, Success, Try }
implicit val materializer: Materializer = mat
implicit val attributes: Attributes = attr
Source
.unfoldAsync[ListObjectVersionsState, (Seq[ListObjectVersionsResultVersions], Seq[DeleteMarkers])](
.unfoldAsync[ListObjectVersionsState, (immutable.Seq[ListObjectVersionsResultVersions], immutable.Seq[
DeleteMarkers])](
Starting()) {
case Finished() => Future.successful(None)
case Starting() => listObjectVersionsCallOnlyVersions(None)
Expand Down Expand Up @@ -678,7 +682,7 @@ import scala.util.{ Failure, Success, Try }
method: HttpMethod = HttpMethods.GET,
rangeOption: Option[ByteRange] = None,
versionId: Option[String] = None,
s3Headers: Seq[HttpHeader] = Seq.empty): Source[HttpResponse, NotUsed] =
s3Headers: immutable.Seq[HttpHeader] = Nil): Source[HttpResponse, NotUsed] =
Source
.fromMaterializer { (mat, attr) =>
issueRequest(s3Location, method, rangeOption, versionId, s3Headers)(mat, attr)
Expand All @@ -690,7 +694,8 @@ import scala.util.{ Failure, Success, Try }
method: HttpMethod = HttpMethods.GET,
rangeOption: Option[ByteRange] = None,
versionId: Option[String],
s3Headers: Seq[HttpHeader])(implicit mat: Materializer, attr: Attributes): Source[HttpResponse, NotUsed] = {
s3Headers: immutable.Seq[HttpHeader])(
implicit mat: Materializer, attr: Attributes): Source[HttpResponse, NotUsed] = {
implicit val sys: ActorSystem = mat.system
implicit val conf: S3Settings = resolveSettings(attr, sys)
signAndRequest(requestHeaders(getDownloadRequest(s3Location, method, s3Headers, versionId), rangeOption))
Expand Down Expand Up @@ -925,7 +930,7 @@ import scala.util.{ Failure, Success, Try }

private def initiateMultipartUpload(s3Location: S3Location,
contentType: ContentType,
s3Headers: Seq[HttpHeader]): Source[MultipartUpload, NotUsed] =
s3Headers: immutable.Seq[HttpHeader]): Source[MultipartUpload, NotUsed] =
Source
.fromMaterializer { (mat, attr) =>
implicit val materializer: Materializer = mat
Expand Down Expand Up @@ -974,7 +979,7 @@ import scala.util.{ Failure, Success, Try }
.toMat(completionSink(targetLocation, s3Headers))(Keep.right)
}

private def computeMetaData(headers: Seq[HttpHeader], entity: ResponseEntity): ObjectMetadata =
private def computeMetaData(headers: immutable.Seq[HttpHeader], entity: ResponseEntity): ObjectMetadata =
ObjectMetadata(
headers ++
Seq(
Expand All @@ -994,7 +999,9 @@ import scala.util.{ Failure, Success, Try }
override def renderInResponses(): Boolean = true
}

private def completeMultipartUpload(s3Location: S3Location, parts: Seq[SuccessfulUploadPart], s3Headers: S3Headers)(
private def completeMultipartUpload(s3Location: S3Location,
parts: immutable.Seq[SuccessfulUploadPart],
s3Headers: S3Headers)(
implicit mat: Materializer,
attr: Attributes): Future[CompleteMultipartUploadResult] = {
def populateResult(result: CompleteMultipartUploadResult,
Expand All @@ -1018,13 +1025,14 @@ import scala.util.{ Failure, Success, Try }
/**
* Initiates a multipart upload. Returns a source of the initiated upload with upload part indicess
*/
@nowarn("msg=deprecated")
private def initiateUpload(s3Location: S3Location,
contentType: ContentType,
s3Headers: Seq[HttpHeader]): Source[(MultipartUpload, Int), NotUsed] =
s3Headers: immutable.Seq[HttpHeader]): Source[(MultipartUpload, Int), NotUsed] =
Source
.single(s3Location)
.flatMapConcat(initiateMultipartUpload(_, contentType, s3Headers))
.mapConcat(r => LazyList.continually(r))
.mapConcat(r => Stream.continually(r))
.zip(Source.fromIterator(() => Iterator.from(1)))

private def poolSettings(implicit settings: S3Settings, system: ActorSystem) =
Expand Down Expand Up @@ -1089,7 +1097,7 @@ import scala.util.{ Failure, Success, Try }

val chunkBufferSize = chunkSize * 2

val headers = s3Headers.serverSideEncryption.toIndexedSeq.flatMap(_.headersFor(UploadPart))
val headers = s3Headers.headersFor(UploadPart)

Flow
.fromMaterializer { (mat, attr) =>
Expand Down Expand Up @@ -1180,7 +1188,7 @@ import scala.util.{ Failure, Success, Try }

val chunkBufferSize = chunkSize * 2

val headers = s3Headers.serverSideEncryption.toIndexedSeq.flatMap(_.headersFor(UploadPart))
val headers = s3Headers.headersFor(UploadPart)

Flow
.fromMaterializer { (mat, attr) =>
Expand Down Expand Up @@ -1265,6 +1273,7 @@ import scala.util.{ Failure, Success, Try }
.mapMaterializedValue(_ => NotUsed)
}

@nowarn("msg=deprecated")
private def requestInfoOrUploadState(s3Location: S3Location,
contentType: ContentType,
s3Headers: S3Headers,
Expand All @@ -1276,7 +1285,7 @@ import scala.util.{ Failure, Success, Try }
Source
.single(s3Location)
.flatMapConcat(_ => Source.single(MultipartUpload(s3Location.bucket, s3Location.key, uploadId)))
.mapConcat(r => LazyList.continually(r))
.mapConcat(r => Stream.continually(r))
.zip(Source.fromIterator(() => Iterator.from(initialIndex)))
case None =>
// First step of the multi part upload process is made.
Expand Down Expand Up @@ -1332,9 +1341,9 @@ import scala.util.{ Failure, Success, Try }
import mat.executionContext
Sink
.seq[UploadPartResponse]
.mapMaterializedValue { responseFuture: Future[Seq[UploadPartResponse]] =>
.mapMaterializedValue { responseFuture: Future[immutable.Seq[UploadPartResponse]] =>
responseFuture
.flatMap { responses: Seq[UploadPartResponse] =>
.flatMap { responses: immutable.Seq[UploadPartResponse] =>
val successes = responses.collect { case r: SuccessfulUploadPart => r }
val failures = responses.collect { case r: FailedUploadPart => r }
if (responses.isEmpty) {
Expand Down Expand Up @@ -1408,7 +1417,7 @@ import scala.util.{ Failure, Success, Try }
}

private def entityForSuccess(
resp: HttpResponse)(implicit mat: Materializer): Future[(ResponseEntity, Seq[HttpHeader])] = {
resp: HttpResponse)(implicit mat: Materializer): Future[(ResponseEntity, immutable.Seq[HttpHeader])] = {
resp match {
case HttpResponse(status, headers, entity, _) if status.isSuccess() && !status.isRedirection() =>
Future.successful((entity, headers))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import pekko.stream.connectors.s3.impl._
import pekko.stream.javadsl.{ RunnableGraph, Sink, Source }
import pekko.util.ByteString

import scala.collection.JavaConverters._
import org.apache.pekko.util.ccompat.JavaConverters._
import scala.compat.java8.OptionConverters._
import scala.compat.java8.FutureConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import pekko.stream.connectors.s3.AccessStyle.PathAccessStyle
import scala.annotation.nowarn
import scala.collection.immutable.Seq
import scala.collection.immutable
import scala.collection.JavaConverters._
import org.apache.pekko.util.ccompat.JavaConverters._
import scala.compat.java8.OptionConverters._

final class MultipartUpload private (val bucket: String, val key: String, val uploadId: String) {
Expand Down