Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to fs2 version 2 #41

Open
wants to merge 8 commits into
base: series/0.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
@@ -1,4 +1,6 @@

dist: trusty

language : scala

scala:
Expand Down
6 changes: 3 additions & 3 deletions build.sbt
Expand Up @@ -31,9 +31,9 @@ lazy val commonSettings = Seq(
, "org.scodec" %% "scodec-core" % "1.10.3"
, "com.spinoco" %% "protocol-http" % "0.3.15"
, "com.spinoco" %% "protocol-websocket" % "0.3.15"
, "co.fs2" %% "fs2-core" % "1.0.0"
, "co.fs2" %% "fs2-io" % "1.0.0"
, "com.spinoco" %% "fs2-crypto" % "0.4.0"
, "co.fs2" %% "fs2-core" % "2.0.1"
, "co.fs2" %% "fs2-io" % "2.0.1"
, "com.spinoco" %% "fs2-crypto" % "0.5.0-SNAPSHOT"
, "org.scalacheck" %% "scalacheck" % "1.13.4" % "test"
),
scmInfo := Some(ScmInfo(url("https://github.com/Spinoco/fs2-http"), "git@github.com:Spinoco/fs2-http.git")),
Expand Down
10 changes: 6 additions & 4 deletions src/main/scala/spinoco/fs2/http/HttpClient.scala
Expand Up @@ -110,12 +110,14 @@ trait HttpClient[F[_]] {
* @param responseCodec Codec used to encode response header
* @param sslExecutionContext Strategy used when communication with SSL (https or wss)
* @param sslContext SSL Context to use with SSL Client (https, wss)
* @param blocker An execution context for blocking operations
*/
def apply[F[_] : ConcurrentEffect : ContextShift : Timer](
requestCodec : Codec[HttpRequestHeader]
, responseCodec : Codec[HttpResponseHeader]
, sslExecutionContext: => ExecutionContext
, sslContext : => SSLContext
, blocker : Blocker
)(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = Sync[F].delay {
lazy val sslCtx = sslContext
lazy val sslS = sslExecutionContext
Expand All @@ -128,7 +130,7 @@ trait HttpClient[F[_]] {
, timeout: Duration
): Stream[F, HttpResponse[F]] = {
Stream.eval(addressForRequest[F](request.scheme, request.host)).flatMap { address =>
Stream.resource(io.tcp.client[F](address))
Stream.resource(new io.tcp.SocketGroup(AG, blocker).client[F](address))
.evalMap { socket =>
if (!request.isSecure) Applicative[F].pure(socket)
else clientLiftToSecure[F](sslS, sslCtx)(socket, request.host)
Expand All @@ -143,7 +145,7 @@ trait HttpClient[F[_]] {
, chunkSize: Int
, maxFrameSize: Int
): Stream[F, Option[HttpResponseHeader]] =
WebSocket.client(request,pipe,maxResponseHeaderSize,chunkSize,maxFrameSize, requestCodec, responseCodec, sslS, sslCtx)
WebSocket.client(request,pipe,maxResponseHeaderSize,chunkSize,maxFrameSize, requestCodec, responseCodec, sslS, sslCtx, blocker)


def sse[A : SSEDecoder](rq: HttpRequest[F], maxResponseHeaderSize: Int, chunkSize: Int): Stream[F, A] =
Expand Down Expand Up @@ -174,7 +176,7 @@ trait HttpClient[F[_]] {
timeout match {
case fin: FiniteDuration =>
eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { start =>
HttpRequest.toStream(request, requestCodec).to(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ =>
HttpRequest.toStream(request, requestCodec).through(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ =>
eval(SignallingRef[F, Boolean](true)).flatMap { timeoutSignal =>
eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { sent =>
val remains = fin - (sent - start).millis
Expand All @@ -186,7 +188,7 @@ trait HttpClient[F[_]] {
}}}}

case _ =>
HttpRequest.toStream(request, requestCodec).to(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ =>
HttpRequest.toStream(request, requestCodec).through(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ =>
socket.reads(chunkSize, None) through HttpResponse.fromStream[F](maxResponseHeaderSize, responseCodec)
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/main/scala/spinoco/fs2/http/HttpServer.scala
Expand Up @@ -3,7 +3,7 @@ package spinoco.fs2.http
import java.net.InetSocketAddress
import java.nio.channels.AsynchronousChannelGroup

import cats.effect.{ConcurrentEffect, Sync, Timer}
import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Sync, Timer}
import cats.syntax.all._
import fs2._
import fs2.concurrent.SignallingRef
Expand Down Expand Up @@ -34,9 +34,10 @@ object HttpServer {
* This is also evaluated when the server failed to process the request itself (i.e. `service` did not handle the failure )
* @param sendFailure A function to be evaluated on failure to process the the response.
* Request is not suplied if failure happened before request was constructed.
* @param blocker An execution context for blocking operations
*
*/
def apply[F[_] : ConcurrentEffect : Timer](
def apply[F[_] : ConcurrentEffect : ContextShift : Timer](
maxConcurrent: Int = Int.MaxValue
, receiveBufferSize: Int = 256 * 1024
, maxHeaderSize: Int = 10 *1024
Expand All @@ -47,6 +48,7 @@ object HttpServer {
, service: (HttpRequestHeader, Stream[F,Byte]) => Stream[F,HttpResponse[F]]
, requestFailure : Throwable => Stream[F, HttpResponse[F]]
, sendFailure: (Option[HttpRequestHeader], HttpResponse[F], Throwable) => Stream[F, Nothing]
, blocker: Blocker
)(
implicit
AG: AsynchronousChannelGroup
Expand All @@ -58,7 +60,7 @@ object HttpServer {
case _ => (false, 0.millis)
}

io.tcp.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource =>
new io.tcp.SocketGroup(AG, blocker).server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource =>
Stream.resource(resource).flatMap { socket =>
eval(SignallingRef(initial)).flatMap { timeoutSignal =>
readWithTimeout[F](socket, readDuration, timeoutSignal.get, receiveBufferSize)
Expand Down
14 changes: 10 additions & 4 deletions src/main/scala/spinoco/fs2/http/http.scala
Expand Up @@ -5,7 +5,7 @@ import java.nio.channels.AsynchronousChannelGroup
import java.util.concurrent.Executors

import javax.net.ssl.SSLContext
import cats.effect.{ConcurrentEffect, ContextShift, Timer}
import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Timer}
import fs2._
import scodec.Codec
import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader}
Expand All @@ -29,15 +29,17 @@ package object http {
* @param requestHeaderReceiveTimeout A timeout to await request header to be fully received.
* Request will fail, if the header won't be read within this timeout.
* @param service Pipe that defines handling of each incoming request and produces a response
* @param blocker An execution context for blocking operations
*/
def server[F[_] : ConcurrentEffect : Timer](
def server[F[_] : ConcurrentEffect : ContextShift : Timer](
bindTo: InetSocketAddress
, maxConcurrent: Int = Int.MaxValue
, receiveBufferSize: Int = 256 * 1024
, maxHeaderSize: Int = 10 *1024
, requestHeaderReceiveTimeout: Duration = 5.seconds
, requestCodec: Codec[HttpRequestHeader] = HttpRequestHeaderCodec.defaultCodec
, responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec
, blocker: Blocker = util.mkBlocker(2)
)(
service: (HttpRequestHeader, Stream[F,Byte]) => Stream[F,HttpResponse[F]]
)(implicit AG: AsynchronousChannelGroup):Stream[F,Unit] = HttpServer(
Expand All @@ -51,6 +53,7 @@ package object http {
, service = service
, requestFailure = HttpServer.handleRequestParseError[F] _
, sendFailure = HttpServer.handleSendFailure[F] _
, blocker = blocker
)


Expand All @@ -60,13 +63,16 @@ package object http {
* @param requestCodec Codec used to decode request header
* @param responseCodec Codec used to encode response header
* @param sslStrategy Strategy used to perform blocking SSL operations
* @param blocker An execution context for blocking operations
*/
def client[F[_]: ConcurrentEffect : ContextShift : Timer](
requestCodec: Codec[HttpRequestHeader] = HttpRequestHeaderCodec.defaultCodec
, responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec
, sslStrategy: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(util.mkThreadFactory("fs2-http-ssl", daemon = true)))
, sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx }
)(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] =
HttpClient(requestCodec, responseCodec, sslStrategy, sslContext)
, blocker: Blocker = util.mkBlocker(2)
)(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = {
HttpClient(requestCodec, responseCodec, sslStrategy, sslContext, blocker)
}

}
6 changes: 6 additions & 0 deletions src/main/scala/spinoco/fs2/http/util/util.scala
Expand Up @@ -4,6 +4,7 @@ import java.lang.Thread.UncaughtExceptionHandler
import java.util.concurrent.{Executors, ThreadFactory}
import java.util.concurrent.atomic.AtomicInteger

import cats.effect.Blocker
import fs2.Chunk.ByteVectorChunk
import fs2._
import scodec.bits.{BitVector, ByteVector}
Expand Down Expand Up @@ -155,6 +156,11 @@ package object util {
}
}

def mkFixedExecutionContext(nThreads: Int) =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(nThreads))

def mkBlocker(nThreads: Int) = Blocker.liftExecutionContext(mkFixedExecutionContext(nThreads))

def getCharset(ct: ContentType): Option[MIMECharset] = {
ct match {
case ContentType.TextContent(_, maybeCharset) => maybeCharset
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala
Expand Up @@ -6,7 +6,7 @@ import java.util.concurrent.Executors

import cats.Applicative
import javax.net.ssl.SSLContext
import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Timer}
import cats.effect.{Blocker, Concurrent, ConcurrentEffect, ContextShift, Timer}
import fs2.Chunk.ByteVectorChunk
import fs2._
import fs2.concurrent.Queue
Expand Down Expand Up @@ -79,6 +79,7 @@ object WebSocket {
* supplied value, websocket will fail.
* @param requestCodec Codec to encode HttpRequests Header
* @param responseCodec Codec to decode HttpResponse Header
* @param blocker An execution context for blocking operations
*
*/
def client[F[_] : ConcurrentEffect : ContextShift : Timer, I : Decoder, O : Encoder](
Expand All @@ -91,11 +92,12 @@ object WebSocket {
, responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec
, sslES: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(spinoco.fs2.http.util.mkThreadFactory("fs2-http-ssl", daemon = true)))
, sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx }
, blocker: Blocker = spinoco.fs2.http.util.mkBlocker(2)
)(implicit AG: AsynchronousChannelGroup): Stream[F, Option[HttpResponseHeader]] = {
import spinoco.fs2.http.internal._
import Stream._
eval(addressForRequest[F](if (request.secure) HttpScheme.WSS else HttpScheme.WS, request.hostPort)).flatMap { address =>
Stream.resource(io.tcp.client[F](address, receiveBufferSize = receiveBufferSize))
Stream.resource(new io.tcp.SocketGroup(AG, blocker).client[F](address, receiveBufferSize = receiveBufferSize))
.evalMap { socket => if (request.secure) clientLiftToSecure(sslES, sslContext)(socket, request.hostPort) else Applicative[F].pure(socket) }
.flatMap { socket =>
val (header, fingerprint) = impl.createRequestHeaders(request.header)
Expand Down
2 changes: 2 additions & 0 deletions src/test/scala/spinoco/fs2/http/HttpServerSpec.scala
Expand Up @@ -117,6 +117,7 @@ object HttpServerSpec extends Properties("HttpServer"){
, service = failRouteService
, requestFailure = _ => { Stream(HttpResponse[IO](HttpStatusCode.BadRequest)).covary[IO] }
, sendFailure = HttpServer.handleSendFailure[IO] _
, blocker = util.mkBlocker(2)
).drain
).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients).parJoin(MaxConcurrency))
.take(count)
Expand Down Expand Up @@ -150,6 +151,7 @@ object HttpServerSpec extends Properties("HttpServer"){
, service = failingResponse
, requestFailure = HttpServer.handleRequestParseError[IO] _
, sendFailure = (_, _, _) => Stream.empty
, blocker = util.mkBlocker(2)
).drain
).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients).parJoin(MaxConcurrency))
.take(count)
Expand Down