diff --git a/.travis.yml b/.travis.yml index 2d6873e..d828197 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,6 @@ +dist: trusty + language : scala scala: diff --git a/build.sbt b/build.sbt index a1ed552..f6be80a 100644 --- a/build.sbt +++ b/build.sbt @@ -31,9 +31,9 @@ lazy val commonSettings = Seq( , "org.scodec" %% "scodec-core" % "1.10.3" , "com.spinoco" %% "protocol-http" % "0.3.15" , "com.spinoco" %% "protocol-websocket" % "0.3.15" - , "co.fs2" %% "fs2-core" % "1.0.0" - , "co.fs2" %% "fs2-io" % "1.0.0" - , "com.spinoco" %% "fs2-crypto" % "0.4.0" + , "co.fs2" %% "fs2-core" % "2.0.1" + , "co.fs2" %% "fs2-io" % "2.0.1" + , "com.spinoco" %% "fs2-crypto" % "0.5.0-SNAPSHOT" , "org.scalacheck" %% "scalacheck" % "1.13.4" % "test" ), scmInfo := Some(ScmInfo(url("https://github.com/Spinoco/fs2-http"), "git@github.com:Spinoco/fs2-http.git")), diff --git a/src/main/scala/spinoco/fs2/http/HttpClient.scala b/src/main/scala/spinoco/fs2/http/HttpClient.scala index 7b75ad9..c899783 100644 --- a/src/main/scala/spinoco/fs2/http/HttpClient.scala +++ b/src/main/scala/spinoco/fs2/http/HttpClient.scala @@ -110,12 +110,14 @@ trait HttpClient[F[_]] { * @param responseCodec Codec used to encode response header * @param sslExecutionContext Strategy used when communication with SSL (https or wss) * @param sslContext SSL Context to use with SSL Client (https, wss) + * @param blocker An execution context for blocking operations */ def apply[F[_] : ConcurrentEffect : ContextShift : Timer]( requestCodec : Codec[HttpRequestHeader] , responseCodec : Codec[HttpResponseHeader] , sslExecutionContext: => ExecutionContext , sslContext : => SSLContext + , blocker : Blocker )(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = Sync[F].delay { lazy val sslCtx = sslContext lazy val sslS = sslExecutionContext @@ -128,7 +130,7 @@ trait HttpClient[F[_]] { , timeout: Duration ): Stream[F, HttpResponse[F]] = { Stream.eval(addressForRequest[F](request.scheme, request.host)).flatMap { address => - Stream.resource(io.tcp.client[F](address)) + Stream.resource(new io.tcp.SocketGroup(AG, blocker).client[F](address)) .evalMap { socket => if (!request.isSecure) Applicative[F].pure(socket) else clientLiftToSecure[F](sslS, sslCtx)(socket, request.host) @@ -143,7 +145,7 @@ trait HttpClient[F[_]] { , chunkSize: Int , maxFrameSize: Int ): Stream[F, Option[HttpResponseHeader]] = - WebSocket.client(request,pipe,maxResponseHeaderSize,chunkSize,maxFrameSize, requestCodec, responseCodec, sslS, sslCtx) + WebSocket.client(request,pipe,maxResponseHeaderSize,chunkSize,maxFrameSize, requestCodec, responseCodec, sslS, sslCtx, blocker) def sse[A : SSEDecoder](rq: HttpRequest[F], maxResponseHeaderSize: Int, chunkSize: Int): Stream[F, A] = @@ -174,7 +176,7 @@ trait HttpClient[F[_]] { timeout match { case fin: FiniteDuration => eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { start => - HttpRequest.toStream(request, requestCodec).to(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ => + HttpRequest.toStream(request, requestCodec).through(socket.writes(Some(fin))).last.onFinalize(socket.endOfOutput).flatMap { _ => eval(SignallingRef[F, Boolean](true)).flatMap { timeoutSignal => eval(clock.realTime(TimeUnit.MILLISECONDS)).flatMap { sent => val remains = fin - (sent - start).millis @@ -186,7 +188,7 @@ trait HttpClient[F[_]] { }}}} case _ => - HttpRequest.toStream(request, requestCodec).to(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ => + HttpRequest.toStream(request, requestCodec).through(socket.writes(None)).last.onFinalize(socket.endOfOutput).flatMap { _ => socket.reads(chunkSize, None) through HttpResponse.fromStream[F](maxResponseHeaderSize, responseCodec) } } diff --git a/src/main/scala/spinoco/fs2/http/HttpServer.scala b/src/main/scala/spinoco/fs2/http/HttpServer.scala index 11d9b27..3ba9e51 100644 --- a/src/main/scala/spinoco/fs2/http/HttpServer.scala +++ b/src/main/scala/spinoco/fs2/http/HttpServer.scala @@ -3,7 +3,7 @@ package spinoco.fs2.http import java.net.InetSocketAddress import java.nio.channels.AsynchronousChannelGroup -import cats.effect.{ConcurrentEffect, Sync, Timer} +import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Sync, Timer} import cats.syntax.all._ import fs2._ import fs2.concurrent.SignallingRef @@ -34,9 +34,10 @@ object HttpServer { * This is also evaluated when the server failed to process the request itself (i.e. `service` did not handle the failure ) * @param sendFailure A function to be evaluated on failure to process the the response. * Request is not suplied if failure happened before request was constructed. + * @param blocker An execution context for blocking operations * */ - def apply[F[_] : ConcurrentEffect : Timer]( + def apply[F[_] : ConcurrentEffect : ContextShift : Timer]( maxConcurrent: Int = Int.MaxValue , receiveBufferSize: Int = 256 * 1024 , maxHeaderSize: Int = 10 *1024 @@ -47,6 +48,7 @@ object HttpServer { , service: (HttpRequestHeader, Stream[F,Byte]) => Stream[F,HttpResponse[F]] , requestFailure : Throwable => Stream[F, HttpResponse[F]] , sendFailure: (Option[HttpRequestHeader], HttpResponse[F], Throwable) => Stream[F, Nothing] + , blocker: Blocker )( implicit AG: AsynchronousChannelGroup @@ -58,7 +60,7 @@ object HttpServer { case _ => (false, 0.millis) } - io.tcp.server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => + new io.tcp.SocketGroup(AG, blocker).server[F](bindTo, receiveBufferSize = receiveBufferSize).map { resource => Stream.resource(resource).flatMap { socket => eval(SignallingRef(initial)).flatMap { timeoutSignal => readWithTimeout[F](socket, readDuration, timeoutSignal.get, receiveBufferSize) diff --git a/src/main/scala/spinoco/fs2/http/http.scala b/src/main/scala/spinoco/fs2/http/http.scala index 94624ec..b062fca 100644 --- a/src/main/scala/spinoco/fs2/http/http.scala +++ b/src/main/scala/spinoco/fs2/http/http.scala @@ -5,7 +5,7 @@ import java.nio.channels.AsynchronousChannelGroup import java.util.concurrent.Executors import javax.net.ssl.SSLContext -import cats.effect.{ConcurrentEffect, ContextShift, Timer} +import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Timer} import fs2._ import scodec.Codec import spinoco.protocol.http.{HttpRequestHeader, HttpResponseHeader} @@ -29,8 +29,9 @@ package object http { * @param requestHeaderReceiveTimeout A timeout to await request header to be fully received. * Request will fail, if the header won't be read within this timeout. * @param service Pipe that defines handling of each incoming request and produces a response + * @param blocker An execution context for blocking operations */ - def server[F[_] : ConcurrentEffect : Timer]( + def server[F[_] : ConcurrentEffect : ContextShift : Timer]( bindTo: InetSocketAddress , maxConcurrent: Int = Int.MaxValue , receiveBufferSize: Int = 256 * 1024 @@ -38,6 +39,7 @@ package object http { , requestHeaderReceiveTimeout: Duration = 5.seconds , requestCodec: Codec[HttpRequestHeader] = HttpRequestHeaderCodec.defaultCodec , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec + , blocker: Blocker = util.mkBlocker(2) )( service: (HttpRequestHeader, Stream[F,Byte]) => Stream[F,HttpResponse[F]] )(implicit AG: AsynchronousChannelGroup):Stream[F,Unit] = HttpServer( @@ -51,6 +53,7 @@ package object http { , service = service , requestFailure = HttpServer.handleRequestParseError[F] _ , sendFailure = HttpServer.handleSendFailure[F] _ + , blocker = blocker ) @@ -60,13 +63,16 @@ package object http { * @param requestCodec Codec used to decode request header * @param responseCodec Codec used to encode response header * @param sslStrategy Strategy used to perform blocking SSL operations + * @param blocker An execution context for blocking operations */ def client[F[_]: ConcurrentEffect : ContextShift : Timer]( requestCodec: Codec[HttpRequestHeader] = HttpRequestHeaderCodec.defaultCodec , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec , sslStrategy: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(util.mkThreadFactory("fs2-http-ssl", daemon = true))) , sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx } - )(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = - HttpClient(requestCodec, responseCodec, sslStrategy, sslContext) + , blocker: Blocker = util.mkBlocker(2) + )(implicit AG: AsynchronousChannelGroup):F[HttpClient[F]] = { + HttpClient(requestCodec, responseCodec, sslStrategy, sslContext, blocker) + } } diff --git a/src/main/scala/spinoco/fs2/http/util/util.scala b/src/main/scala/spinoco/fs2/http/util/util.scala index 3718fbd..bd51db0 100644 --- a/src/main/scala/spinoco/fs2/http/util/util.scala +++ b/src/main/scala/spinoco/fs2/http/util/util.scala @@ -4,6 +4,7 @@ import java.lang.Thread.UncaughtExceptionHandler import java.util.concurrent.{Executors, ThreadFactory} import java.util.concurrent.atomic.AtomicInteger +import cats.effect.Blocker import fs2.Chunk.ByteVectorChunk import fs2._ import scodec.bits.{BitVector, ByteVector} @@ -155,6 +156,11 @@ package object util { } } + def mkFixedExecutionContext(nThreads: Int) = + ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(nThreads)) + + def mkBlocker(nThreads: Int) = Blocker.liftExecutionContext(mkFixedExecutionContext(nThreads)) + def getCharset(ct: ContentType): Option[MIMECharset] = { ct match { case ContentType.TextContent(_, maybeCharset) => maybeCharset diff --git a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala index 38dfaec..330203b 100644 --- a/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala +++ b/src/main/scala/spinoco/fs2/http/websocket/WebSocket.scala @@ -6,7 +6,7 @@ import java.util.concurrent.Executors import cats.Applicative import javax.net.ssl.SSLContext -import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Timer} +import cats.effect.{Blocker, Concurrent, ConcurrentEffect, ContextShift, Timer} import fs2.Chunk.ByteVectorChunk import fs2._ import fs2.concurrent.Queue @@ -79,6 +79,7 @@ object WebSocket { * supplied value, websocket will fail. * @param requestCodec Codec to encode HttpRequests Header * @param responseCodec Codec to decode HttpResponse Header + * @param blocker An execution context for blocking operations * */ def client[F[_] : ConcurrentEffect : ContextShift : Timer, I : Decoder, O : Encoder]( @@ -91,11 +92,12 @@ object WebSocket { , responseCodec: Codec[HttpResponseHeader] = HttpResponseHeaderCodec.defaultCodec , sslES: => ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(spinoco.fs2.http.util.mkThreadFactory("fs2-http-ssl", daemon = true))) , sslContext: => SSLContext = { val ctx = SSLContext.getInstance("TLS"); ctx.init(null,null,null); ctx } + , blocker: Blocker = spinoco.fs2.http.util.mkBlocker(2) )(implicit AG: AsynchronousChannelGroup): Stream[F, Option[HttpResponseHeader]] = { import spinoco.fs2.http.internal._ import Stream._ eval(addressForRequest[F](if (request.secure) HttpScheme.WSS else HttpScheme.WS, request.hostPort)).flatMap { address => - Stream.resource(io.tcp.client[F](address, receiveBufferSize = receiveBufferSize)) + Stream.resource(new io.tcp.SocketGroup(AG, blocker).client[F](address, receiveBufferSize = receiveBufferSize)) .evalMap { socket => if (request.secure) clientLiftToSecure(sslES, sslContext)(socket, request.hostPort) else Applicative[F].pure(socket) } .flatMap { socket => val (header, fingerprint) = impl.createRequestHeaders(request.header) diff --git a/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala b/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala index 0880d70..d5713d6 100644 --- a/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala +++ b/src/test/scala/spinoco/fs2/http/HttpServerSpec.scala @@ -117,6 +117,7 @@ object HttpServerSpec extends Properties("HttpServer"){ , service = failRouteService , requestFailure = _ => { Stream(HttpResponse[IO](HttpStatusCode.BadRequest)).covary[IO] } , sendFailure = HttpServer.handleSendFailure[IO] _ + , blocker = util.mkBlocker(2) ).drain ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients).parJoin(MaxConcurrency)) .take(count) @@ -150,6 +151,7 @@ object HttpServerSpec extends Properties("HttpServer"){ , service = failingResponse , requestFailure = HttpServer.handleRequestParseError[IO] _ , sendFailure = (_, _, _) => Stream.empty + , blocker = util.mkBlocker(2) ).drain ).covary[IO] ++ Stream.sleep_[IO](1.second) ++ clients).parJoin(MaxConcurrency)) .take(count)