Skip to content

Commit

Permalink
feat: Scala 3 support
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Nov 9, 2022
2 parents 576ce76 + a1bdc3e commit 6277aa4
Show file tree
Hide file tree
Showing 231 changed files with 4,792 additions and 3,395 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/validate-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
strategy:
fail-fast: false
matrix:
SCALA_VERSION: [2.12, 2.13]
SCALA_VERSION: [2.12, 2.13, 3.1]
JABBA_JDK: [1.8, 1.11]
steps:
- name: Checkout
Expand Down
2 changes: 1 addition & 1 deletion akka-http-bench-jmh/src/main/scala/akka/BenchRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object BenchRunner {
val opts = new CommandLineOptions(args2: _*)
val results = new Runner(opts).run()

val report = results.asScala.map { result: RunResult =>
val report = results.asScala.map { (result: RunResult) =>
val bench = result.getParams.getBenchmark
val params = result.getParams.getParamsKeys.asScala.map(key => s"$key=${result.getParams.getParam(key)}").mkString("_")
val score = result.getAggregatedResult.getPrimaryResult.getScore.round
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ abstract class LfuCacheSettings private[http] () extends javadsl.LfuCacheSetting
def timeToLive: Duration
def timeToIdle: Duration

final def getMaxCapacity: Int = maxCapacity
final def getInitialCapacity: Int = initialCapacity
final def getTimeToLive: Duration = timeToLive
final def getTimeToIdle: Duration = timeToIdle
final def getMaxCapacity: Int = self.maxCapacity
final def getInitialCapacity: Int = self.initialCapacity
final def getTimeToLive: Duration = self.timeToLive
final def getTimeToIdle: Duration = self.timeToIdle

override def withMaxCapacity(newMaxCapacity: Int): LfuCacheSettings = self.copy(maxCapacity = newMaxCapacity)
override def withInitialCapacity(newInitialCapacity: Int): LfuCacheSettings = self.copy(initialCapacity = newInitialCapacity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object CachingDirectives {

import akka.http.scaladsl.server.directives.{ CachingDirectives => D }

private implicit def routeResultCacheMapping[K] =
private implicit def routeResultCacheMapping[K]: JavaMapping[Cache[K, RouteResult], akka.http.caching.scaladsl.Cache[K, akka.http.scaladsl.server.RouteResult]] =
CacheJavaMapping.cacheMapping[K, RouteResult, K, akka.http.scaladsl.server.RouteResult]

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ class ExpiringLfuCacheSpec extends AnyWordSpec with Matchers with BeforeAndAfter
"return Futures on uncached values during evaluation and replace these with the value afterwards" in {
val cache = lfuCache[String]()
val latch = new CountDownLatch(1)
val future1 = cache(1, (promise: Promise[String]) =>
val future1 = cache(1, { (promise: Promise[String]) =>
Future {
latch.await()
promise.success("A")
}
)
// (block autoformat)
() // provide Unit result automatically to hand-hold Scala 3 overload selection
})
val future2 = cache.get(1, () => "")

latch.countDown()
Expand Down Expand Up @@ -150,7 +152,7 @@ class ExpiringLfuCacheSpec extends AnyWordSpec with Matchers with BeforeAndAfter
}
}, 10.second)

views.transpose.foreach { ints: Seq[Int] =>
views.transpose.foreach { (ints: Seq[Int]) =>
ints.filter(_ != 0).reduceLeft((a, b) => if (a == b) a else 0) should not be 0
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import akka.http.impl.model.JavaQuery;
import akka.http.impl.model.UriJavaAccessor;
import akka.http.scaladsl.model.*;
import akka.http.javadsl.model.HttpCharset;
import akka.japi.Pair;
import akka.parboiled2.CharPredicate;
import akka.parboiled2.ParserInput$;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Changes to internal signatures
ProblemFilters.exclude[Problem]("akka.http.impl.engine.http2.*")
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# MiMa doesn't get that scala.Serializable is alias for java.io.Serializable
ProblemFilters.exclude[MissingTypesProblem]("akka.http.scaladsl.model.ErrorInfo")
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# internal, changed earlier, not sure why not picked up from 10.1.x excludes for Scala 2.12
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.impl.util.StreamUtils#ScheduleSupport.akka$http$impl$util$StreamUtils$ScheduleSupport$$super$materializer")
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private[http] object OutgoingConnectionBlueprint {
val terminationFanout = b.add(Broadcast[HttpResponse](2))

val logger = b.add(Flow[ByteString].mapError { case t => log.debug(s"Outgoing request stream error {}", t); t }.named("errorLogger"))
val wrapTls = b.add(Flow[ByteString].map(SendBytes))
val wrapTls = b.add(Flow[ByteString].map(SendBytes(_)))

val collectSessionBytes = b.add(Flow[SslTlsInbound].collect { case s: SessionBytes => s })

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ private[pool] object SlotState {
WaitingForEndOfResponseEntity(ongoingRequest, ongoingResponse, waitingForEndOfRequestEntity = false)
}
}
final case object WaitingForEndOfRequestEntity extends ConnectedState {
case object WaitingForEndOfRequestEntity extends ConnectedState {
final override def isIdle = false

override def onRequestEntityCompleted(ctx: SlotContext): SlotState =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private[http2] class BufferedOutletExtended[T](outlet: GenericOutlet[T]) extends
* INTERNAL API
*/
@InternalApi
private[http2] trait GenericOutletSupport { logic: GraphStageLogic =>
private[http2] trait GenericOutletSupport extends GraphStageLogic { logic =>
def fromSubSourceOutlet[T](subSourceOutlet: SubSourceOutlet[T]): GenericOutlet[T] =
new GenericOutlet[T] {
def setHandler(handler: OutHandler): Unit = subSourceOutlet.setHandler(handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)

val masterTerminator = new MasterServerTerminator(log)

Tcp().bind(interface, effectivePort, settings.backlog, settings.socketOptions, halfClose = false, Duration.Inf) // we knowingly disable idle-timeout on TCP level, as we handle it explicitly in Akka HTTP itself
Tcp(system).bind(interface, effectivePort, settings.backlog, settings.socketOptions, halfClose = false, Duration.Inf) // we knowingly disable idle-timeout on TCP level, as we handle it explicitly in Akka HTTP itself
.via(if (telemetry == NoOpTelemetry) Flow[Tcp.IncomingConnection] else telemetry.serverBinding)
.mapAsyncUnordered(settings.maxConnections) {
incoming: Tcp.IncomingConnection =>
(incoming: Tcp.IncomingConnection) =>
try {
httpPlusSwitching(http1, http2).addAttributes(prepareServerAttributes(settings, incoming))
.watchTermination() {
Expand Down Expand Up @@ -263,7 +263,7 @@ private[http] object Http2 extends ExtensionId[Http2Ext] with ExtensionIdProvide
override def get(system: ClassicActorSystemProvider): Http2Ext = super.get(system)
def apply()(implicit system: ClassicActorSystemProvider): Http2Ext = super.apply(system)
override def apply(system: ActorSystem): Http2Ext = super.apply(system)
def lookup(): ExtensionId[_ <: Extension] = Http2
def lookup: ExtensionId[_ <: Extension] = Http2
def createExtension(system: ExtendedActorSystem): Http2Ext = new Http2Ext()(system)

private[http] type HttpImplementation = Flow[SslTlsInbound, SslTlsOutbound, ServerTerminator]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ private[http] object Http2Blueprint {
}

private[http] val unwrapTls: BidiFlow[ByteString, SslTlsOutbound, SslTlsInbound, ByteString, NotUsed] =
BidiFlow.fromFlows(Flow[ByteString].map(SendBytes), Flow[SslTlsInbound].collect {
BidiFlow.fromFlows(Flow[ByteString].map(SendBytes(_)), Flow[SslTlsInbound].collect {
case SessionBytes(_, bytes) => bytes
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.impl.io.ByteStringParser.ParsingException
import akka.stream.scaladsl.Source
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, StageLogging, TimerGraphStageLogic }
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler, StageLogging, TimerGraphStageLogic }
import akka.util.{ ByteString, OptionVal }

import scala.collection.immutable
Expand Down Expand Up @@ -108,12 +108,12 @@ private[http2] object ConfigurablePing {
def sendingPing(): Unit = ()
def pingAckOverdue(): Boolean = false
}
final class EnabledPingState(tickInterval: FiniteDuration, pingEveryNTickWithoutData: Long) extends PingState {
final class EnabledPingState(_tickInterval: FiniteDuration, pingEveryNTickWithoutData: Long) extends PingState {
private var ticksWithoutData = 0L
private var ticksSincePing = 0L
private var pingInFlight = false

def tickInterval(): Option[FiniteDuration] = Some(tickInterval)
def tickInterval(): Option[FiniteDuration] = Some(_tickInterval)

def onDataFrameSeen(): Unit = {
ticksWithoutData = 0L
Expand Down Expand Up @@ -261,7 +261,7 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings, ini
push(frameOut, event)
}

val multiplexer = createMultiplexer(StreamPrioritizer.First)
val multiplexer: Http2Multiplexer with OutHandler = createMultiplexer(StreamPrioritizer.First)
setHandler(frameOut, multiplexer)

val pingState = ConfigurablePing.PingState(http2Settings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ private[http] object Http2Protocol {
*/
case object HTTP_1_1_REQUIRED extends ErrorCode(0xd)

case class Unknown private (override val id: Int) extends ErrorCode(id)
case class Unknown private[ErrorCode] (override val id: Int) extends ErrorCode(id)

val All =
Array( // must start with id = 0 and don't have holes between ids
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.util.control.NoStackTrace
* Mixed into the Http2ServerDemux graph logic.
*/
@InternalApi
private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper =>
private[http2] trait Http2StreamHandling extends GraphStageLogic with LogHelper { self =>
// required API from demux
def isServer: Boolean
def multiplexer: Http2Multiplexer
Expand Down Expand Up @@ -235,7 +235,7 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
* * Open -> HalfClosedRemoteSendingData: on receiving response DATA with endStream = true before request has been fully sent out (uncommon)
* * HalfClosedRemoteSendingData -> Closed: on sending out request DATA with endStream = true
*/
sealed abstract class StreamState { _: Product =>
sealed abstract class StreamState { self: Product =>
def handle(event: StreamFrameEvent): StreamState

def stateName: String = productPrefix
Expand Down Expand Up @@ -350,7 +350,7 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper

override def incrementWindow(delta: Int): StreamState = copy(extraInitialWindow = extraInitialWindow + delta)
}
trait Sending extends StreamState { _: Product =>
trait Sending extends StreamState { self: Product =>
protected def outStream: OutStream

override def pullNextFrame(maxSize: Int): (StreamState, PullFrameResult) = {
Expand All @@ -365,7 +365,7 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
}

val nextState =
if (outStream.isDone) handleOutgoingEnded()
if (outStream.isDone) this.handleOutgoingEnded()
else this

(nextState, res)
Expand Down Expand Up @@ -408,10 +408,10 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
// We're not planning on sending any data on this stream anymore, so we don't care about window updates.
this
case _ =>
expectIncomingStream(event, Closed, HalfClosedLocal, correlationAttributes)
expectIncomingStream(event, Closed, HalfClosedLocal(_), correlationAttributes)
}
}
sealed abstract class ReceivingData extends StreamState { _: Product =>
sealed abstract class ReceivingData extends StreamState { self: Product =>
def handle(event: StreamFrameEvent): StreamState = event match {
case d: DataFrame =>
outstandingConnectionLevelWindow -= d.sizeInWindow
Expand Down Expand Up @@ -448,7 +448,7 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
protected def incrementWindow(delta: Int): StreamState
protected def onRstStreamFrame(rstStreamFrame: RstStreamFrame): Unit
}
sealed abstract class ReceivingDataWithBuffer(afterEndStreamReceived: StreamState) extends ReceivingData { _: Product =>
sealed abstract class ReceivingDataWithBuffer(afterEndStreamReceived: StreamState) extends ReceivingData { self: Product =>
protected def buffer: IncomingStreamBuffer

override protected def onDataFrame(dataFrame: DataFrame): StreamState = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,18 @@ private[akka] object OutgoingConnectionBuilderImpl {

override def http(): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
// http/1.1 plaintext
Http(system).outgoingConnectionUsingContext(host, port.getOrElse(80), ConnectionContext.noEncryption(), clientConnectionSettings, log)
Http(system.classicSystem).outgoingConnectionUsingContext(host, port.getOrElse(80), ConnectionContext.noEncryption(), clientConnectionSettings, log)
}

override def https(): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
// http/1.1 tls
Http(system).outgoingConnectionHttps(host, port.getOrElse(443), connectionContext.getOrElse(Http(system).defaultClientHttpsContext), None, clientConnectionSettings, log)
Http(system.classicSystem).outgoingConnectionHttps(host, port.getOrElse(443), connectionContext.getOrElse(Http(system.classicSystem).defaultClientHttpsContext), None, clientConnectionSettings, log)
}

override def http2(): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
// http/2 tls
val port = this.port.getOrElse(443)
Http2(system).outgoingConnection(host, port, connectionContext.getOrElse(Http(system).defaultClientHttpsContext), clientConnectionSettings, log)
Http2(system.classicSystem).outgoingConnection(host, port, connectionContext.getOrElse(Http(system.classicSystem).defaultClientHttpsContext), clientConnectionSettings, log)
}

override def managedPersistentHttp2(): Flow[HttpRequest, HttpResponse, NotUsed] =
Expand All @@ -85,7 +85,7 @@ private[akka] object OutgoingConnectionBuilderImpl {

override def http2WithPriorKnowledge(): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = {
// http/2 prior knowledge plaintext
Http2(system).outgoingConnectionPriorKnowledge(host, port.getOrElse(80), clientConnectionSettings, log)
Http2(system.classicSystem).outgoingConnectionPriorKnowledge(host, port.getOrElse(80), clientConnectionSettings, log)
}

override def managedPersistentHttp2WithPriorKnowledge(): Flow[HttpRequest, HttpResponse, NotUsed] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ private[engine] final class HttpHeaderParser private (
charBuffer.flip()
val result =
if (coderResult.isUnderflow & charBuffer.hasRemaining) {
val c = charBuffer.get()
val c = charBuffer.get().toInt
if (charBuffer.hasRemaining) (charBuffer.get() << 16) | c else c
} else -1
byteBuffer.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private[http] object BodyPartRenderer {
def randomBoundary(length: Int = 18, random: java.util.Random = ThreadLocalRandom.current()): String = {
val array = new Array[Byte](length)
random.nextBytes(array)
Base64.custom.encodeToString(array, false)
Base64.custom().encodeToString(array, false)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import akka.stream.stage.GraphStage
import akka.stream._
import akka.stream.scaladsl.{ Flow, Sink, Source }

import scala.collection.immutable

/**
* INTERNAL API
*/
Expand All @@ -41,17 +43,17 @@ private[http] object RenderSupport {
private val TextHtmlContentType = preRenderContentType(`text/html(UTF-8)`)
private val TextCsvContentType = preRenderContentType(`text/csv(UTF-8)`)

implicit val trailerRenderer = Renderer.genericSeqRenderer[Renderable, HttpHeader](Rendering.CrLf, Rendering.Empty)
implicit val trailerRenderer: Renderer[immutable.Iterable[HttpHeader]] =
Renderer.genericSeqRenderer[Renderable, HttpHeader](Rendering.CrLf, Rendering.Empty)

val defaultLastChunkBytes: ByteString = renderChunk(HttpEntity.LastChunk)

def CancelSecond[T, Mat](first: Source[T, Mat], second: Source[T, Any]): Source[T, Mat] = {
Source.fromGraph(GraphDSL.create(first) { implicit b => frst =>
def CancelSecond[T, Mat](first: Source[T, Mat], second: Source[T, Any]): Source[T, Mat] =
Source.fromGraph(GraphDSL.createGraph(first) { implicit b => frst =>
import GraphDSL.Implicits._
second ~> Sink.cancelled
SourceShape(frst.out)
})
}

def renderEntityContentType(r: Rendering, entity: HttpEntity): r.type = {
val ct = entity.contentType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[http] object HttpServerBluePrint {
logTLSBidiBySetting("server-plain-text", settings.logUnencryptedNetworkBytes)

val tlsSupport: BidiFlow[ByteString, SslTlsOutbound, SslTlsInbound, SessionBytes, NotUsed] =
BidiFlow.fromFlows(Flow[ByteString].map(SendBytes), Flow[SslTlsInbound].collect { case x: SessionBytes => x })
BidiFlow.fromFlows(Flow[ByteString].map(SendBytes(_)), Flow[SslTlsInbound].collect { case x: SessionBytes => x })

def websocketSupport(settings: ServerSettings, log: LoggingAdapter): BidiFlow[ResponseRenderingOutput, ByteString, SessionBytes, SessionBytes, NotUsed] =
BidiFlow.fromGraph(new ProtocolSwitchStage(settings, log))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,5 +166,5 @@ private[http] object WebSocketClientBlueprint {
def simpleTls: BidiFlow[SslTlsInbound, ByteString, ByteString, SendBytes, NotUsed] =
BidiFlow.fromFlowsMat(
Flow[SslTlsInbound].collect { case SessionBytes(_, bytes) => bytes },
Flow[ByteString].map(SendBytes))(Keep.none)
Flow[ByteString].map(SendBytes(_)))(Keep.none)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.parboiled2.Parser
import akka.http.scaladsl.model.headers.`Accept-Charset`
import akka.http.scaladsl.model.HttpCharsetRange

private[parser] trait AcceptCharsetHeader { this: Parser with CommonRules with CommonActions =>
private[parser] trait AcceptCharsetHeader { this: Parser with CommonRules with CommonActions with StringBuilding =>

// http://tools.ietf.org/html/rfc7231#section-5.3.3
def `accept-charset` = rule {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.http.impl.model.parser
import akka.parboiled2.Parser
import akka.http.scaladsl.model.headers._

private[parser] trait AcceptEncodingHeader { this: Parser with CommonRules with CommonActions =>
private[parser] trait AcceptEncodingHeader { this: Parser with CommonRules with CommonActions with StringBuilding =>

// http://tools.ietf.org/html/rfc7231#section-5.3.4
def `accept-encoding` = rule {
Expand All @@ -23,8 +23,8 @@ private[parser] trait AcceptEncodingHeader { this: Parser with CommonRules with
}
}

def codings = rule { ws('*') ~ push(HttpEncodingRange.`*`) | token ~> getEncoding }
def codings = rule { ws('*') ~ push(HttpEncodingRange.`*`) | token ~> getEncoding _ }

private val getEncoding: String => HttpEncodingRange =
name => HttpEncodingRange(HttpEncodings.getForKeyCaseInsensitive(name) getOrElse HttpEncoding.custom(name))
private def getEncoding(name: String): HttpEncodingRange =
HttpEncodingRange(HttpEncodings.getForKeyCaseInsensitive(name) getOrElse HttpEncoding.custom(name))
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.model.{ MediaRange, MediaRanges }
import akka.http.impl.util._

private[parser] trait AcceptHeader { this: Parser with CommonRules with CommonActions =>
private[parser] trait AcceptHeader { this: Parser with CommonRules with CommonActions with StringBuilding =>
import CharacterClasses._

// http://tools.ietf.org/html/rfc7231#section-5.3.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.http.impl.model.parser
import akka.parboiled2.Parser
import akka.http.scaladsl.model.headers._

private[parser] trait AcceptLanguageHeader { this: Parser with CommonRules with CommonActions =>
private[parser] trait AcceptLanguageHeader { this: Parser with CommonRules with CommonActions with StringBuilding =>

// http://tools.ietf.org/html/rfc7231#section-5.3.5
def `accept-language` = rule {
Expand Down

0 comments on commit 6277aa4

Please sign in to comment.