Skip to content

Commit

Permalink
Merge pull request #3775 from armanbilge/topic/sleep-internal-function0
Browse files Browse the repository at this point in the history
Elide thunk allocation when using `sleepInternal`
  • Loading branch information
armanbilge authored Aug 6, 2023
2 parents ccb8ff2 + 8dbd2bc commit ead57bc
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 13 deletions.
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,12 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
"cats.effect.IOFiberConstants.ExecuteRunnableR"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("cats.effect.IOLocal.scope"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.IOFiberConstants.ContStateResult")
"cats.effect.IOFiberConstants.ContStateResult"),
// #3775, changes to internal timers APIs
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"cats.effect.unsafe.TimerSkipList.insert"),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"cats.effect.unsafe.WorkerThread.sleep")
) ++ {
if (tlIsScala3.value) {
// Scala 3 specific exclusions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[effect] sealed abstract class WorkStealingThreadPool private ()
private[effect] def reschedule(runnable: Runnable): Unit
private[effect] def sleepInternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Runnable
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable
private[effect] def sleep(
delay: FiniteDuration,
task: Runnable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,21 @@ private final class TimerSkipList() extends AtomicLong(MARKER + 1L) { sequenceNu
cb: Callback,
next: Node
) extends TimerSkipListNodeBase[Callback, Node](cb, next)
with Function0[Unit]
with Runnable {

/**
* Cancels the timer
*/
final override def run(): Unit = {
final def apply(): Unit = {
// TODO: We could null the callback here directly,
// TODO: and the do the lookup after (for unlinking).
TimerSkipList.this.doRemove(triggerTime, sequenceNum)
()
}

final def run() = apply()

private[TimerSkipList] final def isMarker: Boolean = {
// note: a marker node also has `triggerTime == MARKER`,
// but that's also a valid trigger time, so we need
Expand Down Expand Up @@ -158,7 +161,7 @@ private final class TimerSkipList() extends AtomicLong(MARKER + 1L) { sequenceNu
delay: Long,
callback: Right[Nothing, Unit] => Unit,
tlr: ThreadLocalRandom
): Runnable = {
): Function0[Unit] with Runnable = {
require(delay >= 0L)
// we have to check for overflow:
val triggerTime = computeTriggerTime(now = now, delay = delay)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,9 @@ private[effect] final class WorkStealingThreadPool(
/**
* Tries to call the current worker's `sleep`, but falls back to `sleepExternal` if needed.
*/
def sleepInternal(delay: FiniteDuration, callback: Right[Nothing, Unit] => Unit): Runnable = {
def sleepInternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
val thread = Thread.currentThread()
if (thread.isInstanceOf[WorkerThread]) {
val worker = thread.asInstanceOf[WorkerThread]
Expand All @@ -642,7 +644,7 @@ private[effect] final class WorkStealingThreadPool(
*/
private[this] final def sleepExternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Runnable = {
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
val random = ThreadLocalRandom.current()
val idx = random.nextInt(threadCount)
val tsl = sleepers(idx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ private final class WorkerThread(
}
}

def sleep(delay: FiniteDuration, callback: Right[Nothing, Unit] => Unit): Runnable = {
def sleep(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
// take the opportunity to update the current time, just in case other timers can benefit
val _now = System.nanoTime()
now = _now
Expand Down
16 changes: 10 additions & 6 deletions core/shared/src/main/scala/cats/effect/IOFiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -925,13 +925,17 @@ private final class IOFiber[A](
IO {
val scheduler = runtime.scheduler

val cancel =
if (scheduler.isInstanceOf[WorkStealingThreadPool])
scheduler.asInstanceOf[WorkStealingThreadPool].sleepInternal(delay, cb)
else
scheduler.sleep(delay, () => cb(RightUnit))
val cancelIO =
if (scheduler.isInstanceOf[WorkStealingThreadPool]) {
val cancel =
scheduler.asInstanceOf[WorkStealingThreadPool].sleepInternal(delay, cb)
IO.Delay(cancel, null)
} else {
val cancel = scheduler.sleep(delay, () => cb(RightUnit))
IO(cancel.run())
}

Some(IO(cancel.run()))
Some(cancelIO)
}
}
else IO.cede
Expand Down

0 comments on commit ead57bc

Please sign in to comment.