Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add scala3 support for s3 #167

Merged
merged 7 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 11 additions & 4 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,14 @@

---------------

pekko-mqtt-streaming contains code from paho.mqtt.java <https://github.com/eclipse/paho.mqtt.java>.
pekko-connectors-google-common contains `org.apache.pekko.stream.connectors.google.jwt.JwtSprayJson.scala`
which is copied from jwt-scala <https://github.com/jwt-scala/jwt-scala>.
The original code was released under the Apache 2.0 license.
Copyright 2021 JWT-Scala Contributors.

---------------

pekko-connectors-mqtt-streaming contains code from paho.mqtt.java <https://github.com/eclipse/paho.mqtt.java>.
This code was released under a dual license:
Eclipse Public License version 2.0 and Eclipse Distribution License.
We choose to use the code under the Eclipse Distribution License.
Expand Down Expand Up @@ -235,7 +242,7 @@ Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.

---------------

pekko-connectors-google-common contains `org.apache.pekko.stream.connectors.google.jwt.JwtSprayJson.scala`
which is copied from jwt-scala <https://github.com/jwt-scala/jwt-scala>.
pekko-connectors-s3 contains test code in `org.apache.pekko.stream.connectors.s3.impl.retry` package
which is copied from futiles <https://github.com/johanandren/futiles>.
The original code was released under the Apache 2.0 license.
Copyright 2021 JWT-Scala Contributors.
Copyright 2015 Johan Andrén.
7 changes: 7 additions & 0 deletions legal/S3License.txt
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,10 @@ This code was released under an Apache 2.0 license.

AWS SDK for Java
Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.

---------------

pekko-connectors-s3 contains test code in `org.apache.pekko.stream.connectors.s3.impl.retry` package
which is copied from futiles <https://github.com/johanandren/futiles>.
The original code was released under the Apache 2.0 license.
Copyright 2015 Johan Andrén.
4 changes: 1 addition & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ object Dependencies {
))

val S3 = Seq(
crossScalaVersions -= Scala3,
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
"org.apache.pekko" %% "pekko-http-xml" % PekkoHttpVersion,
Expand All @@ -373,8 +372,7 @@ object Dependencies {
"com.google.jimfs" % "jimfs" % "1.2" % Test,
"com.github.tomakehurst" % "wiremock-jre8" % "2.32.0" % Test,
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % Test,
"org.scalatestplus" %% scalaTestScalaCheckArtifact % scalaTestScalaCheckVersion % Test,
"com.markatta" %% "futiles" % "2.0.2" % Test))
"org.scalatestplus" %% scalaTestScalaCheckArtifact % scalaTestScalaCheckVersion % Test))

val SpringWeb = {
val SpringVersion = "5.1.17.RELEASE"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import pekko.http.scaladsl.{ ClientTransport, Http }
import pekko.stream.connectors.s3.BucketAccess.{ AccessDenied, AccessGranted, NotExists }
import pekko.stream.connectors.s3._
import pekko.stream.connectors.s3.impl.auth.{ CredentialScope, Signer, SigningKey }
import pekko.stream.scaladsl.{ Flow, Keep, RetryFlow, RunnableGraph, Sink, Source, Tcp }
import pekko.stream.scaladsl.{ Flow, Keep, RetryFlow, RunnableGraph, Sink, Source, SubFlow, Tcp }
import pekko.stream.{ Attributes, Materializer }
import pekko.util.ByteString
import pekko.{ Done, NotUsed }
Expand Down Expand Up @@ -1177,11 +1177,15 @@ import scala.util.{ Failure, Success, Try }

import conf.multipartUploadSettings.retrySettings._

SplitAfterSize(chunkSize, chunkBufferSize)(atLeastOneByteString)
.via(getChunkBuffer(chunkSize, chunkBufferSize, maxRetries)) // creates the chunks
.mergeSubstreamsWithParallelism(parallelism)
val source1: SubFlow[Chunk, NotUsed, Flow[ByteString, ByteString, NotUsed]#Repr, Sink[ByteString, NotUsed]] =
Copy link
Contributor

@mdedetrich mdedetrich Jul 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume that this type annotation was auto generated from Intellij and if so it likely can be simplified a bit more. Ill check out this PR and see if I can improve it.

SplitAfterSize(chunkSize, chunkBufferSize)(atLeastOneByteString)
.via(getChunkBuffer(chunkSize, chunkBufferSize, maxRetries)) // creates the chunks

val source2 = source1.mergeSubstreamsWithParallelism(parallelism)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The source2 here seems to be a leftover from refactoring/figuring out types locally? You should be able to just go

source1.mergeSubstreamsWithParallelism(parallelism)
  .filter(_.size > 0)
  .via(atLeastOne)
  .zip(requestInfoOrUploadState(s3Location, contentType, s3Headers, initialUploadState))
  // etc etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the original code wouldn't compile in scala 3 and splitting the code up has helped to get it to compile

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code no longer compiles when I start removing the types on these params

Copy link
Contributor

@mdedetrich mdedetrich Jul 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see that for source1, its source2 that seems redundant (source2 has no type annotation)?

.filter(_.size > 0)
.via(atLeastOne)

source2
.zip(requestInfoOrUploadState(s3Location, contentType, s3Headers, initialUploadState))
.groupBy(parallelism, { case (_, (_, chunkIndex)) => chunkIndex % parallelism })
// Allow requests that fail with transient errors to be retried, using the already buffered chunk.
Expand Down Expand Up @@ -1278,11 +1282,18 @@ import scala.util.{ Failure, Success, Try }
Flow[(ByteString, C)].orElse(
Source.single((ByteString.empty, null.asInstanceOf[C])))

SplitAfterSizeWithContext(chunkSize)(atLeastOneByteStringAndEmptyContext)
.via(getChunk(chunkBufferSize))
.mergeSubstreamsWithParallelism(parallelism)
.filter { case (chunk, _) => chunk.size > 0 }
.via(atLeastOne)
val source1: SubFlow[(Chunk, immutable.Iterable[C]), NotUsed, Flow[(ByteString, C), (ByteString, C),
NotUsed]#Repr, Sink[(ByteString, C), NotUsed]] =
SplitAfterSizeWithContext(chunkSize)(atLeastOneByteStringAndEmptyContext)
.via(getChunk(chunkBufferSize))

val source2: Flow[(ByteString, C), (Chunk, immutable.Iterable[C]), NotUsed] =
source1
.mergeSubstreamsWithParallelism(parallelism)
.filter { case (chunk, _) => chunk.size > 0 }
.via(atLeastOne)

source2
.zip(requestInfoOrUploadState(s3Location, contentType, s3Headers, initialUploadState))
.groupBy(parallelism, { case (_, (_, chunkIndex)) => chunkIndex % parallelism })
.map {
Expand Down Expand Up @@ -1379,9 +1390,9 @@ import scala.util.{ Failure, Success, Try }
import mat.executionContext
Sink
.seq[UploadPartResponse]
.mapMaterializedValue { responseFuture: Future[immutable.Seq[UploadPartResponse]] =>
.mapMaterializedValue { (responseFuture: Future[immutable.Seq[UploadPartResponse]]) =>
responseFuture
.flatMap { responses: immutable.Seq[UploadPartResponse] =>
.flatMap { (responses: immutable.Seq[UploadPartResponse]) =>
val successes = responses.collect { case r: SuccessfulUploadPart => r }
val failures = responses.collect { case r: FailedUploadPart => r }
if (responses.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import pekko.http.scaladsl.model.{ HttpHeader, HttpRequest }
def canonicalQueryString(query: Query): String = {
def uriEncode(s: String): String = s.flatMap {
case c if isUnreservedCharacter(c) => c.toString
case c => "%" + c.toHexString.toUpperCase
case c => "%" + Integer.toHexString(c).toUpperCase
}

query
Expand Down Expand Up @@ -99,7 +99,7 @@ import pekko.http.scaladsl.model.{ HttpHeader, HttpRequest }
if (path.isEmpty) "/"
else {
path.toString.flatMap {
case c if isReservedCharacter(c) => "%" + c.toHexString.toUpperCase
case c if isReservedCharacter(c) => "%" + Integer.toHexString(c).toUpperCase
case c => c.toString
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import pekko.stream.connectors.s3.headers.{ CannedAcl, ServerSideEncryption }
import pekko.stream.connectors.s3._
import pekko.stream.connectors.s3.impl._
import pekko.stream.javadsl.{ RunnableGraph, Sink, Source }
import pekko.stream.scaladsl.SourceToCompletionStage
import pekko.util.ccompat.JavaConverters._
import pekko.util.ByteString
import pekko.util.OptionConverters._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.pekko.stream.connectors.s3

import markatta.futiles.Retry
import org.apache.commons.lang3.StringUtils
import org.apache.pekko
import org.apache.pekko.stream.connectors.s3.scaladsl.S3
import org.apache.pekko.stream.scaladsl.Sink
import pekko.stream.connectors.s3.impl.retry.Retry
import pekko.stream.connectors.s3.scaladsl.S3
import pekko.stream.scaladsl.Sink
import org.scalacheck.Gen
import pekko.actor.ActorSystem
import pekko.stream.Attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class SplitAfterSizeSpec(_system: ActorSystem)
Seq(ByteString(16), ByteString(17, 18))))
}

def bytes(start: Byte, end: Byte): Array[Byte] = (start to end).map(_.toByte).toArray[Byte]
// https://github.com/lampepfl/dotty/issues/18068
def bytes(start: Byte, end: Byte): Array[Byte] = (start.toInt to end).map(_.toByte).toArray[Byte]

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class CanonicalRequestSpec extends AnyFlatSpec with Matchers {
val reservedCharacters = ":?#[]@!$&'()*+,;="
reservedCharacters.foreach { char =>
withClue(s"failed for path containing reserved character [$char]:") {
val expectedCharEncoding = "%" + char.toHexString.toUpperCase
val expectedCharEncoding = "%" + Integer.toHexString(char).toUpperCase

val request =
HttpRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class SplitAfterSizeWithContextSpec(_system: ActorSystem)
Seq((ByteString(17, 18), 2))))
}

def bytes(start: Byte, end: Byte): Array[Byte] = (start to end).map(_.toByte).toArray[Byte]
// https://github.com/lampepfl/dotty/issues/18068
def bytes(start: Byte, end: Byte): Array[Byte] = (start.toInt to end).map(_.toByte).toArray[Byte]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2015 Johan Andrén
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.connectors.s3.impl.retry

import java.util.concurrent.{ ThreadLocalRandom, TimeUnit }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Random

// copied from https://github.com/johanandren/futiles/blob/18868f252bbf5dd71d2cd0fc67e7eb39863b686a/src/main/scala/markatta/futiles/Retry.scala
object Retry {

private val alwaysRetry: Throwable => Boolean = _ => true

/**
* Evaluate a block that creates a future up to a specific number of times, if the future fails, decide about
* retrying using a predicate, if it should retry an exponential back off is applied so that the retry waits longer
* and longer for every retry it makes. A jitter is also added so that the exact timing of the retry isn't exactly
* the same for all calls with the same backOffUnit
*
* Any exception in the block creating the future will also be returned as a failed future Default is to retry for
* all throwables.
*
* Based on this wikipedia article: http://en.wikipedia.org/wiki/Truncated_binary_exponential_backoff
*/
def retryWithBackOff[A](
times: Int,
backOffUnit: FiniteDuration,
shouldRetry: Throwable => Boolean = alwaysRetry)(fBlock: => Future[A])(implicit ec: ExecutionContext): Future[A] =
try
if (times <= 1) fBlock
else retryWithBackOffLoop(times, 1, backOffUnit, shouldRetry)(fBlock)
catch {
// failure to actually create the future
case x: Throwable => Future.failed(x)
}

private def retryWithBackOffLoop[A](
totalTimes: Int,
timesTried: Int,
backOffUnit: FiniteDuration,
shouldRetry: Throwable => Boolean)(fBlock: => Future[A])(implicit ec: ExecutionContext): Future[A] =
if (totalTimes <= timesTried) fBlock
else
fBlock.recoverWith {
case ex: Throwable if shouldRetry(ex) =>
val timesTriedNow = timesTried + 1
val backOff = nextBackOff(timesTriedNow, backOffUnit)
Timeouts
.timeout(backOff)(())
.flatMap(_ =>
retryWithBackOffLoop(
totalTimes,
timesTriedNow,
backOffUnit,
shouldRetry)(fBlock))
}

private def nextBackOff(
tries: Int,
backOffUnit: FiniteDuration): FiniteDuration = {
require(tries > 0, "tries should start from 1")
val rng = new Random(ThreadLocalRandom.current())
// jitter between 0.5 and 1.5
val jitter = 0.5 + rng.nextDouble()
val factor = math.pow(2, tries) * jitter
FiniteDuration(
(backOffUnit.toMillis * factor).toLong,
TimeUnit.MILLISECONDS)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2015 Johan Andrén
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.connectors.s3.impl.retry

import java.util.{ Timer, TimerTask }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.util.Try

// copied from https://github.com/johanandren/futiles/blob/18868f252bbf5dd71d2cd0fc67e7eb39863b686a/src/main/scala/markatta/futiles/Timeouts.scala
object Timeouts {

private val timer = new Timer()

/**
* When ```waitFor``` has passed, evaluate ```what``` on the given execution context and complete the future
*/
def timeout[A](waitFor: FiniteDuration)(what: => A)(implicit ec: ExecutionContext): Future[A] = {
val promise = Promise[A]()
timer.schedule(new TimerTask {
override def run(): Unit =
// make sure we do not block the timer thread
Future {
promise.complete(Try(what))
}
},
waitFor.toMillis)

promise.future
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1293,8 +1293,8 @@ class AWSS3IntegrationSpec extends TestKit(ActorSystem("AWSS3IntegrationSpec"))
}.orElse(Some(1.minute))

// Since S3 accounts share global state, we should randomly generate bucket names so concurrent tests
// against an S3 account don't conflict with eachother
override lazy val randomlyGenerateBucketNames: Boolean =
// against an S3 account don't conflict with each other
override val randomlyGenerateBucketNames: Boolean =
sys.props.get("pekko.stream.connectors.s3.scaladsl.AWSS3IntegrationSpec.randomlyGenerateBucketNames")
.map(_.toBoolean).getOrElse(true)
}
Expand All @@ -1313,7 +1313,7 @@ class MinioS3IntegrationSpec

// Since a unique new Minio container is started with each test run there is no point in making random
// bucket names
override lazy val randomlyGenerateBucketNames: Boolean = false
override val randomlyGenerateBucketNames: Boolean = false

override lazy val defaultS3Settings: S3Settings = s3Settings
.withS3RegionProvider(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import software.amazon.awssdk.regions.Region
abstract class S3WireMockBase(_system: ActorSystem, val _wireMockServer: WireMockServer) extends TestKit(_system) {

private def this(mock: WireMockServer) =
this(ActorSystem(getCallerName(getClass), config(mock.port()).withFallback(ConfigFactory.load())), mock)
this(ActorSystem(getCallerName(classOf[S3WireMockBase]), config(mock.port()).withFallback(ConfigFactory.load())),
mock)

def this() = {
this(initServer())
Expand Down