Skip to content

Commit

Permalink
=act Extract AtomicCancellable in Scheduler.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Aug 15, 2023
1 parent 861a188 commit e3d79f8
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import scala.util.control.NonFatal

import com.typesafe.config.Config

import akka.actor.Scheduler.AtomicCancellable
import akka.dispatch.AbstractNodeQueue
import akka.event.LoggingAdapter
import akka.util.Helpers
Expand Down Expand Up @@ -102,48 +103,23 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
override def schedule(initialDelay: FiniteDuration, delay: FiniteDuration, runnable: Runnable)(
implicit executor: ExecutionContext): Cancellable = {
checkMaxDelay(roundUp(delay).toNanos)
try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self =>
compareAndSet(
InitialRepeatMarker,
new AtomicCancellable(InitialRepeatMarker) { self =>
final override protected def scheduleFirst(): Cancellable =
schedule(
executor,
new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
override def run(): Unit = {
try {
runnable.run()
val driftNanos = clock() - getAndAdd(delay.toNanos)
if (self.get != null)
if (self.get() != null)
swap(schedule(executor, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1))))
} catch {
case _: SchedulerException => // ignore failure to enqueue or terminated target actor
}
}
},
roundUp(initialDelay)))

@tailrec private def swap(c: Cancellable): Unit = {
get match {
case null => if (c != null) c.cancel()
case old => if (!compareAndSet(old, c)) swap(c)
}
}

final def cancel(): Boolean = {
@tailrec def tailrecCancel(): Boolean = {
get match {
case null => false
case c =>
if (c.cancel()) compareAndSet(c, null)
else compareAndSet(c, null) || tailrecCancel()
}
}

tailrecCancel()
}

override def isCancelled: Boolean = get == null
} catch {
case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause)
roundUp(initialDelay))
}
}

Expand Down
78 changes: 47 additions & 31 deletions akka-actor/src/main/scala/akka/actor/Scheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.control.NoStackTrace

import akka.actor.Scheduler.AtomicCancellable
import akka.annotation.InternalApi
import akka.util.JavaDurationConverters

Expand Down Expand Up @@ -70,51 +71,25 @@ trait Scheduler {
* Note: For scheduling within actors `with Timers` should be preferred.
*/
def scheduleWithFixedDelay(initialDelay: FiniteDuration, delay: FiniteDuration)(runnable: Runnable)(
implicit executor: ExecutionContext): Cancellable = {
try new AtomicReference[Cancellable](Cancellable.initialNotCancelled) with Cancellable { self =>
compareAndSet(
Cancellable.initialNotCancelled,
implicit executor: ExecutionContext): Cancellable =
new AtomicCancellable(Cancellable.initialNotCancelled) {
final override protected def scheduleFirst(): Cancellable =
scheduleOnce(
initialDelay,
new Runnable {
override def run(): Unit = {
try {
runnable.run()
if (self.get != null)
if (get != null)
swap(scheduleOnce(delay, this))
} catch {
// ignore failure to enqueue or terminated target actor
case _: SchedulerException =>
case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] =>
}
}
}))

@tailrec private def swap(c: Cancellable): Unit = {
get match {
case null => if (c != null) c.cancel()
case old => if (!compareAndSet(old, c)) swap(c)
}
}

final def cancel(): Boolean = {
@tailrec def tailrecCancel(): Boolean = {
get match {
case null => false
case c =>
if (c.cancel()) compareAndSet(c, null)
else compareAndSet(c, null) || tailrecCancel()
}
}

tailrecCancel()
}

override def isCancelled: Boolean = get == null
} catch {
case SchedulerException(msg) => throw new IllegalStateException(msg)
})
}
}

/**
* Java API: Schedules a `Runnable` to be run repeatedly with an initial delay and
Expand Down Expand Up @@ -561,4 +536,45 @@ object Scheduler {
* a custom implementation of `Scheduler` must also implement this.
*/
trait TaskRunOnClose extends Runnable

/**
* INTERNAL API
*/
@InternalApi
private[akka] abstract class AtomicCancellable(initialValue: Cancellable)
extends AtomicReference[Cancellable](initialValue)
with Cancellable {
try {
compareAndSet(initialValue, scheduleFirst())
} catch {
case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause)
}

protected def scheduleFirst(): Cancellable

@tailrec final protected def swap(c: Cancellable): Unit = {
get match {
case null => if (c != null) c.cancel()
case old =>
if (!compareAndSet(old, c))
swap(c)
}
}

final def cancel(): Boolean = {
@tailrec def tailrecCancel(): Boolean = {
get match {
case null => false
case c =>
if (c.cancel()) compareAndSet(c, null)
else compareAndSet(c, null) || tailrecCancel()
}
}

tailrecCancel()
}

final override def isCancelled: Boolean = get == null

}
}

0 comments on commit e3d79f8

Please sign in to comment.