From e3d79f8a2f92aa09b834347ca3370f3ab14d2600 Mon Sep 17 00:00:00 2001 From: kerr Date: Wed, 19 Apr 2023 01:53:39 +0800 Subject: [PATCH] =act Extract AtomicCancellable in Scheduler. --- .../actor/LightArrayRevolverScheduler.scala | 34 ++------ .../src/main/scala/akka/actor/Scheduler.scala | 78 +++++++++++-------- 2 files changed, 52 insertions(+), 60 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/LightArrayRevolverScheduler.scala b/akka-actor/src/main/scala/akka/actor/LightArrayRevolverScheduler.scala index 41c74d6faa9..71e97557f01 100644 --- a/akka-actor/src/main/scala/akka/actor/LightArrayRevolverScheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/LightArrayRevolverScheduler.scala @@ -16,6 +16,7 @@ import scala.util.control.NonFatal import com.typesafe.config.Config +import akka.actor.Scheduler.AtomicCancellable import akka.dispatch.AbstractNodeQueue import akka.event.LoggingAdapter import akka.util.Helpers @@ -102,9 +103,8 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac override def schedule(initialDelay: FiniteDuration, delay: FiniteDuration, runnable: Runnable)( implicit executor: ExecutionContext): Cancellable = { checkMaxDelay(roundUp(delay).toNanos) - try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self => - compareAndSet( - InitialRepeatMarker, + new AtomicCancellable(InitialRepeatMarker) { self => + final override protected def scheduleFirst(): Cancellable = schedule( executor, new AtomicLong(clock() + initialDelay.toNanos) with Runnable { @@ -112,38 +112,14 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac try { runnable.run() val driftNanos = clock() - getAndAdd(delay.toNanos) - if (self.get != null) + if (self.get() != null) swap(schedule(executor, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)))) } catch { case _: SchedulerException => // ignore failure to enqueue or terminated target actor } } }, - roundUp(initialDelay))) - - @tailrec private def swap(c: Cancellable): Unit = { - get match { - case null => if (c != null) c.cancel() - case old => if (!compareAndSet(old, c)) swap(c) - } - } - - final def cancel(): Boolean = { - @tailrec def tailrecCancel(): Boolean = { - get match { - case null => false - case c => - if (c.cancel()) compareAndSet(c, null) - else compareAndSet(c, null) || tailrecCancel() - } - } - - tailrecCancel() - } - - override def isCancelled: Boolean = get == null - } catch { - case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause) + roundUp(initialDelay)) } } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 734d33071ce..5728fd565ca 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -12,6 +12,7 @@ import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.util.control.NoStackTrace +import akka.actor.Scheduler.AtomicCancellable import akka.annotation.InternalApi import akka.util.JavaDurationConverters @@ -70,17 +71,16 @@ trait Scheduler { * Note: For scheduling within actors `with Timers` should be preferred. */ def scheduleWithFixedDelay(initialDelay: FiniteDuration, delay: FiniteDuration)(runnable: Runnable)( - implicit executor: ExecutionContext): Cancellable = { - try new AtomicReference[Cancellable](Cancellable.initialNotCancelled) with Cancellable { self => - compareAndSet( - Cancellable.initialNotCancelled, + implicit executor: ExecutionContext): Cancellable = + new AtomicCancellable(Cancellable.initialNotCancelled) { + final override protected def scheduleFirst(): Cancellable = scheduleOnce( initialDelay, new Runnable { override def run(): Unit = { try { runnable.run() - if (self.get != null) + if (get != null) swap(scheduleOnce(delay, this)) } catch { // ignore failure to enqueue or terminated target actor @@ -88,33 +88,8 @@ trait Scheduler { case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] => } } - })) - - @tailrec private def swap(c: Cancellable): Unit = { - get match { - case null => if (c != null) c.cancel() - case old => if (!compareAndSet(old, c)) swap(c) - } - } - - final def cancel(): Boolean = { - @tailrec def tailrecCancel(): Boolean = { - get match { - case null => false - case c => - if (c.cancel()) compareAndSet(c, null) - else compareAndSet(c, null) || tailrecCancel() - } - } - - tailrecCancel() - } - - override def isCancelled: Boolean = get == null - } catch { - case SchedulerException(msg) => throw new IllegalStateException(msg) + }) } - } /** * Java API: Schedules a `Runnable` to be run repeatedly with an initial delay and @@ -561,4 +536,45 @@ object Scheduler { * a custom implementation of `Scheduler` must also implement this. */ trait TaskRunOnClose extends Runnable + + /** + * INTERNAL API + */ + @InternalApi + private[akka] abstract class AtomicCancellable(initialValue: Cancellable) + extends AtomicReference[Cancellable](initialValue) + with Cancellable { + try { + compareAndSet(initialValue, scheduleFirst()) + } catch { + case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause) + } + + protected def scheduleFirst(): Cancellable + + @tailrec final protected def swap(c: Cancellable): Unit = { + get match { + case null => if (c != null) c.cancel() + case old => + if (!compareAndSet(old, c)) + swap(c) + } + } + + final def cancel(): Boolean = { + @tailrec def tailrecCancel(): Boolean = { + get match { + case null => false + case c => + if (c.cancel()) compareAndSet(c, null) + else compareAndSet(c, null) || tailrecCancel() + } + } + + tailrecCancel() + } + + final override def isCancelled: Boolean = get == null + + } }