Skip to content
This repository has been archived by the owner. It is now read-only.

Introduce pekko interop #459

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ val levshaVersion = "1.3.0"
val akkaVersion = "2.6.19"
val akkaHttpVersion = "10.2.9"

val pekkoVersion = "1.0.0"
val pekkoHttpVersion = "1.0.0"

val circeVersion = "0.14.1"
val ce2Version = "2.5.5"
val ce3Version = "3.3.12"
Expand Down Expand Up @@ -220,6 +223,21 @@ lazy val akka = project
)
.dependsOn(korolev)

lazy val pekko = project
.in(interop / "pekko")
.enablePlugins(GitVersioning)
.settings(crossVersionSettings)
.settings(commonSettings: _*)
.settings(
normalizedName := "korolev-pekko",
libraryDependencies ++= Seq(
("org.apache.pekko" %% "pekko-actor" % pekkoVersion).cross(CrossVersion.for3Use2_13),
("org.apache.pekko" %% "pekko-stream" % pekkoVersion).cross(CrossVersion.for3Use2_13),
("org.apache.pekko" %% "pekko-http" % pekkoHttpVersion).cross(CrossVersion.for3Use2_13)
)
)
.dependsOn(korolev)

lazy val zioHttp = project
.in(interop / "zio-http")
.enablePlugins(GitVersioning)
Expand Down Expand Up @@ -556,7 +574,7 @@ lazy val root = project
korolev, effect, web, http, standalone, testkit,
bytes, webDsl,
// Interop
akka, ce2, ce3, monix, zio, zioStreams, zio2, zio2Streams, slf4j,
akka, pekko, ce2, ce3, monix, zio, zioStreams, zio2, zio2Streams, slf4j,
scodec, fs2ce2, fs2ce3, zioHttp,
// Examples
simpleExample, routingExample, gameOfLifeExample,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2017-2020 Aleksey Fomkin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package korolev.pekko

import scala.concurrent.duration._

case class PekkoHttpServerConfig(maxRequestBodySize: Int = PekkoHttpServerConfig.DefaultMaxRequestBodySize,
outputBufferSize: Int = PekkoHttpServerConfig.DefaultOutputBufferSize,
wsStreamedCompletionTimeout: FiniteDuration = PekkoHttpServerConfig.DefaultWsStreamedCompletionTimeout,
wsStreamedParallelism: Int = PekkoHttpServerConfig.DefaultWsStreamedParallelism)

object PekkoHttpServerConfig {
val DefaultMaxRequestBodySize: Int = 8 * 1024 * 1024
val DefaultOutputBufferSize: Int = 1000
val DefaultWsStreamedCompletionTimeout: FiniteDuration = 30.seconds
val DefaultWsStreamedParallelism: Int = 2
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2017-2020 Aleksey Fomkin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package korolev.pekko

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http

abstract class SimplePekkoHttpKorolevApp(config: PekkoHttpServerConfig = null) {

implicit val actorSystem: ActorSystem = ActorSystem()

def service: PekkoHttpService

def main(args: Array[String]): Unit = {
val escapedConfig =
if (config == null) PekkoHttpServerConfig()
else config
val route = service(escapedConfig)
Http().newServerAt("0.0.0.0", 8080).bindFlow(route)
()
}
}
73 changes: 73 additions & 0 deletions interop/pekko/src/main/scala/korolev/pekko/instances.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2017-2020 Aleksey Fomkin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package korolev.pekko

import org.apache.pekko.NotUsed
import org.apache.pekko.stream.OverflowStrategy
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import org.apache.pekko.util.ByteString
import korolev.pekko.util.{PekkoByteStringBytesLike, KorolevStreamPublisher, KorolevStreamSubscriber}
import korolev.data.BytesLike
import korolev.effect.{Effect, Stream}
import org.reactivestreams.Publisher

import scala.concurrent.ExecutionContext

object instances {

implicit final class SinkCompanionOps(value: Sink.type) {
def korolevStream[F[_]: Effect, T]: Sink[T, Stream[F, T]] = {
val subscriber = new KorolevStreamSubscriber[F, T]()
Sink
.fromSubscriber(subscriber)
.mapMaterializedValue(_ => subscriber)
}
}

implicit final class StreamCompanionOps(value: Stream.type) {
def fromPublisher[F[_]: Effect, T](publisher: Publisher[T]): Stream[F, T] = {
val result = new KorolevStreamSubscriber[F, T]()
publisher.subscribe(result)
result
}
}

implicit final class KorolevStreamsOps[F[_]: Effect, T](stream: Stream[F, T]) {

/**
* Converts korolev [[korolev.effect.Stream]] to [[Publisher]].
*
* If `fanout` is `true`, the `Publisher` will support multiple `Subscriber`s and
* the size of the `inputBuffer` configured for this operator becomes the maximum number of elements that
* the fastest [[org.reactivestreams.Subscriber]] can be ahead of the slowest one before slowing
* the processing down due to back pressure.
*
* If `fanout` is `false` then the `Publisher` will only support a single `Subscriber` and
* reject any additional `Subscriber`s with [[korolev.pekko.util.KorolevStreamPublisher.MultipleSubscribersProhibitedException]].
*/
def asPublisher(fanout: Boolean = false)(implicit ec: ExecutionContext): Publisher[T] =
new KorolevStreamPublisher(stream, fanout)

def asPekkoSource(implicit ec: ExecutionContext): Source[T, NotUsed] = {
val publisher = new KorolevStreamPublisher(stream, fanout = false)
Source.fromPublisher(publisher)
}
}

implicit final val pekkoByteStringBytesLike: BytesLike[ByteString] =
new PekkoByteStringBytesLike()
}
194 changes: 194 additions & 0 deletions interop/pekko/src/main/scala/korolev/pekko/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright 2017-2020 Aleksey Fomkin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package korolev

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.model._
import org.apache.pekko.http.scaladsl.model.headers.RawHeader
import org.apache.pekko.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
import org.apache.pekko.http.scaladsl.server.Directives._
import org.apache.pekko.http.scaladsl.server.Route
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{Flow, Keep, Sink}
import org.apache.pekko.util.ByteString
import korolev.pekko.util.LoggingReporter
import korolev.data.{Bytes, BytesLike}
import korolev.effect.{Effect, Reporter, Stream}
import korolev.server.internal.BadRequestException
import korolev.server.{WebSocketRequest as KorolevWebSocketRequest, WebSocketResponse as KorolevWebSocketResponse}
import korolev.server.{KorolevService, KorolevServiceConfig, HttpRequest as KorolevHttpRequest}
import korolev.state.{StateDeserializer, StateSerializer}
import korolev.web.{PathAndQuery, Request as KorolevRequest, Response as KorolevResponse}

import scala.concurrent.{ExecutionContext, Future}

package object pekko {

type PekkoHttpService = PekkoHttpServerConfig => Route

import instances._

def pekkoHttpService[F[_]: Effect, S: StateSerializer: StateDeserializer, M]
(config: KorolevServiceConfig[F, S, M], wsLoggingEnabled: Boolean = false)
(implicit actorSystem: ActorSystem, materializer: Materializer, ec: ExecutionContext): PekkoHttpService = { pekkoHttpConfig =>
// If reporter wasn't overridden, use pekko-logging reporter.
val actualConfig =
if (config.reporter != Reporter.PrintReporter) config
else config.copy(reporter = new LoggingReporter(actorSystem))

val korolevServer = korolev.server.korolevService(actualConfig)
val wsRouter = configureWsRoute(korolevServer, pekkoHttpConfig, actualConfig, wsLoggingEnabled)
val httpRoute = configureHttpRoute(korolevServer)

wsRouter ~ httpRoute
}

private def configureWsRoute[F[_]: Effect, S: StateSerializer: StateDeserializer, M]
(korolevServer: KorolevService[F],
pekkoHttpConfig: PekkoHttpServerConfig,
korolevServiceConfig: KorolevServiceConfig[F, S, M],
wsLoggingEnabled: Boolean)
(implicit materializer: Materializer, ec: ExecutionContext): Route =
extractRequest { request =>
extractUnmatchedPath { path =>
extractWebSocketUpgrade { upgrade =>
// inSink - consume messages from the client
// outSource - push messages to the client
val (inStream, inSink) = Sink.korolevStream[F, Bytes].preMaterialize()
val korolevRequest = mkKorolevRequest(request, path.toString, inStream)

complete {
val korolevWsRequest = KorolevWebSocketRequest(korolevRequest, upgrade.requestedProtocols)
Effect[F].toFuture(korolevServer.ws(korolevWsRequest)).map {
case KorolevWebSocketResponse(KorolevResponse(_, outStream, _, _), selectedProtocol) =>
val source = outStream
.asPekkoSource
.map(text => BinaryMessage.Strict(text.as[ByteString]))
val sink = Flow[Message]
.mapAsync(pekkoHttpConfig.wsStreamedParallelism) {
case TextMessage.Strict(message) =>
Future.successful(Some(BytesLike[Bytes].utf8(message)))
case TextMessage.Streamed(stream) =>
stream
.completionTimeout(pekkoHttpConfig.wsStreamedCompletionTimeout)
.runFold("")(_ + _)
.map(message => Some(BytesLike[Bytes].utf8(message)))
case BinaryMessage.Strict(data) =>
Future.successful(Some(Bytes.wrap(data)))
case BinaryMessage.Streamed(stream) =>
stream
.completionTimeout(pekkoHttpConfig.wsStreamedCompletionTimeout)
.runFold(ByteString.empty)(_ ++ _)
.map(message => Some(Bytes.wrap(message)))
}
.recover {
case ex =>
korolevServiceConfig.reporter.error(s"WebSocket exception ${ex.getMessage}, shutdown output stream", ex)
outStream.cancel()
None
}
.collect {
case Some(message) =>
message
}
.to(inSink)

upgrade.handleMessages(
if(wsLoggingEnabled) {
Flow.fromSinkAndSourceCoupled(sink, source).log("korolev-ws")
} else {
Flow.fromSinkAndSourceCoupled(sink, source)
},
Some(selectedProtocol)
)
case _ =>
throw new RuntimeException // cannot happen
}.recover {
case BadRequestException(message) =>
HttpResponse(StatusCodes.BadRequest, entity = HttpEntity(message))
}
}
}
}
}

private def configureHttpRoute[F[_]](korolevServer: KorolevService[F])(implicit mat: Materializer, async: Effect[F], ec: ExecutionContext): Route =
extractUnmatchedPath { path =>
extractRequest { request =>
val sink = Sink.korolevStream[F, Bytes]
val body =
if (request.method == HttpMethods.GET) {
Stream.empty[F, Bytes]
} else {
request
.entity
.dataBytes
.map(Bytes.wrap(_))
.toMat(sink)(Keep.right)
.run()
}
val korolevRequest = mkKorolevRequest(request, path.toString, body)
val responseF = handleHttpResponse(korolevServer, korolevRequest)
complete(responseF)
}
}

private def mkKorolevRequest[F[_], Body](request: HttpRequest,
path: String,
body: Body): KorolevRequest[Body] =
KorolevRequest(
pq = PathAndQuery.fromString(path).withParams(request.uri.rawQueryString),
method = KorolevRequest.Method.fromString(request.method.value),
contentLength = request.headers.find(_.is("content-length")).map(_.value().toLong),
renderedCookie = request.headers.find(_.is("cookie")).map(_.value()).getOrElse(""),
headers = {
val contentType = request.entity.contentType
val contentTypeHeaders =
if (contentType.mediaType.isMultipart) Seq("content-type" -> contentType.toString) else Seq.empty
request.headers.map(h => (h.name(), h.value())) ++ contentTypeHeaders
},
body = body
)

private def handleHttpResponse[F[_]: Effect](korolevServer: KorolevService[F],
korolevRequest: KorolevHttpRequest[F])(implicit ec: ExecutionContext): Future[HttpResponse] =
Effect[F].toFuture(korolevServer.http(korolevRequest)).map {
case response @ KorolevResponse(status, body, responseHeaders, _) =>
val (contentTypeOpt, otherHeaders) = getContentTypeAndResponseHeaders(responseHeaders)
val bytesSource = body.asPekkoSource.map(_.as[ByteString])
HttpResponse(
StatusCode.int2StatusCode(status.code),
otherHeaders,
response.contentLength match {
case Some(bytesLength) => HttpEntity(contentTypeOpt.getOrElse(ContentTypes.NoContentType), bytesLength, bytesSource)
case None => HttpEntity(contentTypeOpt.getOrElse(ContentTypes.NoContentType), bytesSource)
}
)
}

private def getContentTypeAndResponseHeaders(responseHeaders: Seq[(String, String)]): (Option[ContentType], List[HttpHeader]) = {
val headers = responseHeaders.map { case (name, value) =>
HttpHeader.parse(name, value) match {
case HttpHeader.ParsingResult.Ok(header, _) => header
case _ => RawHeader(name, value)
}
}
val (contentTypeHeaders, otherHeaders) = headers.partition(_.lowercaseName() == "content-type")
val contentTypeOpt = contentTypeHeaders.headOption.flatMap(h => ContentType.parse(h.value()).right.toOption)
(contentTypeOpt, otherHeaders.toList)
}
}
Loading
Loading