Skip to content

Commit

Permalink
Deprecate the org.http4s.util.execution package
Browse files Browse the repository at this point in the history
  • Loading branch information
rossabaker committed Apr 27, 2020
1 parent 13f92fd commit f5c9f91
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 84 deletions.
Expand Up @@ -14,7 +14,7 @@ import org.http4s.blaze.pipeline.stages.SSLStage
import org.http4s.blaze.pipeline.{Command, LeafBuilder}
import org.http4s.blaze.util.TickWheelExecutor
import org.http4s.headers.`User-Agent`
import org.http4s.internal.fromFuture
import org.http4s.blazecore.util.fromFutureNoShift
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -49,7 +49,7 @@ final private class Http1Support[F[_]](

def makeClient(requestKey: RequestKey): F[BlazeConnection[F]] =
getAddress(requestKey) match {
case Right(a) => fromFuture(F.delay(buildPipeline(requestKey, a)))
case Right(a) => fromFutureNoShift(F.delay(buildPipeline(requestKey, a)))
case Left(t) => F.raiseError(t)
}

Expand Down
Expand Up @@ -5,7 +5,6 @@ package util
import cats.effect._
import cats.implicits._
import fs2._
import org.http4s.internal.fromFuture
import scala.concurrent._

private[http4s] trait EntityBodyWriter[F[_]] {
Expand Down Expand Up @@ -50,7 +49,7 @@ private[http4s] trait EntityBodyWriter[F[_]] {
*/
def writeEntityBody(p: EntityBody[F]): F[Boolean] = {
val writeBody: F[Unit] = p.through(writePipe).compile.drain
val writeBodyEnd: F[Boolean] = fromFuture(F.delay(writeEnd(Chunk.empty)))
val writeBodyEnd: F[Boolean] = fromFutureNoShift(F.delay(writeEnd(Chunk.empty)))
writeBody *> writeBodyEnd
}

Expand All @@ -61,9 +60,9 @@ private[http4s] trait EntityBodyWriter[F[_]] {
*/
private def writePipe: Pipe[F, Byte, Unit] = { s =>
val writeStream: Stream[F, Unit] =
s.chunks.evalMap(chunk => fromFuture(F.delay(writeBodyChunk(chunk, flush = false))))
s.chunks.evalMap(chunk => fromFutureNoShift(F.delay(writeBodyChunk(chunk, flush = false))))
val errorStream: Throwable => Stream[F, Unit] = e =>
Stream.eval(fromFuture(F.delay(exceptionFlush()))).flatMap(_ => Stream.raiseError[F](e))
Stream.eval(fromFutureNoShift(F.delay(exceptionFlush()))).flatMap(_ => Stream.raiseError[F](e))
writeStream.handleErrorWith(errorStream)
}
}
Expand Up @@ -5,14 +5,13 @@ package util
import cats.implicits._
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import org.http4s.internal.fromFuture
import org.http4s.util.StringWriter
import org.log4s.getLogger
import scala.concurrent._

private[http4s] trait Http1Writer[F[_]] extends EntityBodyWriter[F] {
final def write(headerWriter: StringWriter, body: EntityBody[F]): F[Boolean] =
fromFuture(F.delay(writeHeaders(headerWriter))).attempt.flatMap {
fromFutureNoShift(F.delay(writeHeaders(headerWriter))).attempt.flatMap {
case Right(()) =>
writeEntityBody(body)
case Left(t) =>
Expand Down
22 changes: 22 additions & 0 deletions blaze-core/src/main/scala/org/http4s/blazecore/util/package.scala
@@ -1,8 +1,11 @@
package org.http4s
package blazecore

import cats.effect.Async
import fs2._
import org.http4s.blaze.util.Execution.directec
import scala.concurrent.Future
import scala.util.{Failure, Success}

package object util {

Expand All @@ -19,4 +22,23 @@ package object util {

private[http4s] val FutureUnit =
Future.successful(())

// Adapted from https://github.com/typelevel/cats-effect/issues/199#issuecomment-401273282
/** Inferior to `Async[F].fromFuture` for general use because it doesn't shift, but
* in blaze internals, we don't want to shift. */
private[http4s] def fromFutureNoShift[F[_], A](f: F[Future[A]])(implicit F: Async[F]): F[A] =
F.flatMap(f) { future =>
future.value match {
case Some(value) =>
F.fromTry(value)
case None =>
F.async { cb =>
future.onComplete {
case Success(a) => cb(Right(a))
case Failure(t) => cb(Left(t))
}(directec)
}
}
}

}
66 changes: 66 additions & 0 deletions core/src/main/scala/org/http4s/internal/Trampoline.scala
@@ -0,0 +1,66 @@
package org.http4s.internal

import java.util.ArrayDeque
import scala.annotation.tailrec
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}

private[http4s] object Trampoline extends ExecutionContextExecutor {
private val local = new ThreadLocal[ThreadLocalTrampoline]

def execute(runnable: Runnable): Unit = {
var queue = local.get()
if (queue == null) {
queue = new ThreadLocalTrampoline
local.set(queue)
}

queue.execute(runnable)
}

def reportFailure(t: Throwable): Unit = ExecutionContext.defaultReporter(t)

// Only safe to use from a single thread
private final class ThreadLocalTrampoline extends ExecutionContext {
private var running = false
private var r0, r1, r2: Runnable = null
private var rest: ArrayDeque[Runnable] = null

override def execute(runnable: Runnable): Unit = {
if (r0 == null) r0 = runnable
else if (r1 == null) r1 = runnable
else if (r2 == null) r2 = runnable
else {
if (rest == null) rest = new ArrayDeque[Runnable]()
rest.add(runnable)
}

if (!running) {
running = true
run()
}
}

override def reportFailure(cause: Throwable): Unit = ExecutionContext.defaultReporter(cause)

@tailrec
private def run(): Unit = {
val r = next()
if (r == null) {
rest = null // don't want a memory leak from potentially large array buffers
running = false
} else {
try r.run()
catch { case e: Throwable => reportFailure(e) }
run()
}
}

private def next(): Runnable = {
val r = r0
r0 = r1
r1 = r2
r2 = if (rest != null) rest.pollFirst() else null
r
}
}
}
1 change: 1 addition & 0 deletions core/src/main/scala/org/http4s/internal/package.scala
Expand Up @@ -113,6 +113,7 @@ package object internal {
}

// Adapted from https://github.com/typelevel/cats-effect/issues/199#issuecomment-401273282
@deprecated("Replaced by cats.effect.Async.fromFuture. You will need a ContextShift[F].", "0.21.4")
private[http4s] def fromFuture[F[_], A](f: F[Future[A]])(implicit F: Async[F]): F[A] =
f.flatMap { future =>
future.value match {
Expand Down
64 changes: 2 additions & 62 deletions core/src/main/scala/org/http4s/util/execution.scala
@@ -1,9 +1,8 @@
package org.http4s.util

import java.util.ArrayDeque
import scala.annotation.tailrec
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}

@deprecated("Not related to HTTP. Will be removed from public API.", "0.21.4")
object execution {

/** Execute `Runnable`s directly on the current thread, using a stack frame.
Expand All @@ -26,64 +25,5 @@ object execution {
* the submitted `Runnable`s and the thread becomes blocked, there will be
* a deadlock.
*/
val trampoline: ExecutionContextExecutor = new ExecutionContextExecutor {
private val local = new ThreadLocal[ThreadLocalTrampoline]

def execute(runnable: Runnable): Unit = {
var queue = local.get()
if (queue == null) {
queue = new ThreadLocalTrampoline
local.set(queue)
}

queue.execute(runnable)
}

def reportFailure(t: Throwable): Unit = ExecutionContext.defaultReporter(t)
}

// Only safe to use from a single thread
private final class ThreadLocalTrampoline extends ExecutionContext {
private var running = false
private var r0, r1, r2: Runnable = null
private var rest: ArrayDeque[Runnable] = null

override def execute(runnable: Runnable): Unit = {
if (r0 == null) r0 = runnable
else if (r1 == null) r1 = runnable
else if (r2 == null) r2 = runnable
else {
if (rest == null) rest = new ArrayDeque[Runnable]()
rest.add(runnable)
}

if (!running) {
running = true
run()
}
}

override def reportFailure(cause: Throwable): Unit = ExecutionContext.defaultReporter(cause)

@tailrec
private def run(): Unit = {
val r = next()
if (r == null) {
rest = null // don't want a memory leak from potentially large array buffers
running = false
} else {
try r.run()
catch { case e: Throwable => reportFailure(e) }
run()
}
}

private def next(): Runnable = {
val r = r0
r0 = r1
r1 = r2
r2 = if (rest != null) rest.pollFirst() else null
r
}
}
val trampoline: ExecutionContextExecutor = org.http4s.internal.Trampoline
}
10 changes: 5 additions & 5 deletions servlet/src/main/scala/org/http4s/servlet/ServletIo.scala
Expand Up @@ -7,8 +7,8 @@ import fs2._
import java.util.concurrent.atomic.AtomicReference
import javax.servlet.{ReadListener, WriteListener}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import org.http4s.internal.Trampoline
import org.http4s.util.bug
import org.http4s.util.execution.trampoline
import org.log4s.getLogger
import scala.annotation.tailrec

Expand Down Expand Up @@ -100,7 +100,7 @@ final case class NonBlockingServletIo[F[_]: Effect](chunkSize: Int) extends Serv
// This effect sets the callback and waits for the first bytes to read
val registerRead =
// Shift execution to a different EC
Async.shift(trampoline) *>
Async.shift(Trampoline) *>
F.async[Option[Chunk[Byte]]] { cb =>
if (!state.compareAndSet(Init, Blocked(cb))) {
cb(Left(bug("Shouldn't have gotten here: I should be the first to set a state")))
Expand Down Expand Up @@ -131,7 +131,7 @@ final case class NonBlockingServletIo[F[_]: Effect](chunkSize: Int) extends Serv
val readStream = Stream.eval(registerRead) ++ Stream
.repeatEval( // perform the initial set then transition into normal read mode
// Shift execution to a different EC
Async.shift(trampoline) *>
Async.shift(Trampoline) *>
F.async[Option[Chunk[Byte]]] { cb =>
@tailrec
def go(): Unit = state.get match {
Expand Down Expand Up @@ -221,7 +221,7 @@ final case class NonBlockingServletIo[F[_]: Effect](chunkSize: Int) extends Serv

val awaitLastWrite = Stream.eval_ {
// Shift execution to a different EC
Async.shift(trampoline) *>
Async.shift(Trampoline) *>
F.async[Unit] { cb =>
state.getAndSet(AwaitingLastWrite(cb)) match {
case Ready if out.isReady => cb(Right(()))
Expand All @@ -236,7 +236,7 @@ final case class NonBlockingServletIo[F[_]: Effect](chunkSize: Int) extends Serv
response.body.chunks
.evalMap { chunk =>
// Shift execution to a different EC
Async.shift(trampoline) *>
Async.shift(Trampoline) *>
F.async[Chunk[Byte] => Unit] { cb =>
val blocked = Blocked(cb)
state.getAndSet(blocked) match {
Expand Down
4 changes: 2 additions & 2 deletions tests/src/test/scala/org/http4s/EntityDecoderSpec.scala
Expand Up @@ -10,13 +10,13 @@ import java.nio.charset.StandardCharsets
import cats.data.Chain
import org.http4s.Status.Ok
import org.http4s.headers.`Content-Type`
import org.http4s.internal.Trampoline
import org.http4s.testing.Http4sLegacyMatchersIO
import org.http4s.util.execution.trampoline
import org.specs2.execute.PendingUntilFixed
import scala.concurrent.ExecutionContext

class EntityDecoderSpec extends Http4sSpec with Http4sLegacyMatchersIO with PendingUntilFixed {
implicit val executionContext: ExecutionContext = trampoline
implicit val executionContext: ExecutionContext = Trampoline
implicit val testContext: TestContext = TestContext()

val `application/excel`: MediaType =
Expand Down
@@ -1,5 +1,5 @@
package org.http4s
package util
package internal

import org.http4s.testing.ErrorReporting._
import org.specs2.mutable.Specification
Expand Down Expand Up @@ -84,7 +84,7 @@ abstract class ExecutionSpec extends Specification {
}

class TrampolineSpec extends ExecutionSpec {
def ec = execution.trampoline
def ec = Trampoline
def ecName = "trampoline"

"trampoline" should {
Expand All @@ -105,8 +105,3 @@ class TrampolineSpec extends ExecutionSpec {
}
}
}

class DirectSpec extends ExecutionSpec {
def ec = execution.direct
def ecName = "direct"
}

0 comments on commit f5c9f91

Please sign in to comment.