diff --git a/api/build.sbt b/api/build.sbt index 3af0ec2..60fda12 100644 --- a/api/build.sbt +++ b/api/build.sbt @@ -28,5 +28,5 @@ organization := "app.softnetwork.scheduler" name := "scheduler-api" libraryDependencies ++= Seq( - "app.softnetwork.persistence" %% "persistence-jdbc" % Versions.genericPersistence + "app.softnetwork.persistence" %% "akka-persistence-jdbc" % Versions.genericPersistence ) diff --git a/build.sbt b/build.sbt index 2ce200f..dc2c256 100644 --- a/build.sbt +++ b/build.sbt @@ -31,7 +31,7 @@ ThisBuild / organization := "app.softnetwork" name := "scheduler" -ThisBuild / version := "0.2.0" +ThisBuild / version := "0.2.1" ThisBuild / scalaVersion := "2.12.15" diff --git a/core/src/main/scala/app/softnetwork/scheduler/persistence/query/Scheduler2EntityProcessorStream.scala b/common/src/main/scala/app/softnetwork/scheduler/persistence/query/Scheduler2EntityProcessorStream.scala similarity index 87% rename from core/src/main/scala/app/softnetwork/scheduler/persistence/query/Scheduler2EntityProcessorStream.scala rename to common/src/main/scala/app/softnetwork/scheduler/persistence/query/Scheduler2EntityProcessorStream.scala index b35488a..8e76a09 100644 --- a/core/src/main/scala/app/softnetwork/scheduler/persistence/query/Scheduler2EntityProcessorStream.scala +++ b/common/src/main/scala/app/softnetwork/scheduler/persistence/query/Scheduler2EntityProcessorStream.scala @@ -5,7 +5,6 @@ import akka.persistence.typed.PersistenceId import app.softnetwork.persistence.message.{Command, CommandResult} import app.softnetwork.persistence.query.{EventProcessorStream, JournalProvider} import app.softnetwork.persistence.typed.scaladsl.EntityPattern -import app.softnetwork.scheduler.handlers.SchedulerDao import app.softnetwork.scheduler.message.SchedulerEvents.{ CronTabTriggeredEvent, CronTabsResetedEvent, @@ -22,8 +21,6 @@ trait Scheduler2EntityProcessorStream[C <: Command, R <: CommandResult] extends EventProcessorStream[SchedulerEvent] { _: JournalProvider with EntityPattern[C, R] => - def schedulerDao: SchedulerDao = SchedulerDao - def forTests: Boolean = false /** Processing event @@ -49,11 +46,7 @@ trait Scheduler2EntityProcessorStream[C <: Command, R <: CommandResult] if (persistenceId.startsWith(schedule.persistenceId)) { val entityId = persistenceId.split("\\|").last if (entityId != ALL_KEY) { - if (forTests) { - schedulerDao.addSchedule(schedule.withEntityId(entityId).withDelay(1)) - } else { - schedulerDao.addSchedule(schedule.withEntityId(entityId)) - } + triggerSchedule(schedule.withEntityId(entityId)) } else { Future.successful(true) } @@ -80,11 +73,7 @@ trait Scheduler2EntityProcessorStream[C <: Command, R <: CommandResult] if (persistenceId.startsWith(cronTab.persistenceId)) { val entityId = persistenceId.split("\\|").last if (entityId != ALL_KEY) { - if (forTests) { - schedulerDao.addSchedule(schedule.withEntityId(entityId).withDelay(1)) - } else { - schedulerDao.addSchedule(schedule.withEntityId(entityId)) - } + triggerSchedule(schedule.withEntityId(entityId)) } else { Future.successful(true) } diff --git a/core/src/main/scala/app/softnetwork/scheduler/handlers/SchedulerHandler.scala b/core/src/main/scala/app/softnetwork/scheduler/handlers/SchedulerHandler.scala index ba1c9cd..9314d98 100644 --- a/core/src/main/scala/app/softnetwork/scheduler/handlers/SchedulerHandler.scala +++ b/core/src/main/scala/app/softnetwork/scheduler/handlers/SchedulerHandler.scala @@ -7,7 +7,7 @@ import app.softnetwork.persistence.typed.scaladsl.EntityPattern import app.softnetwork.persistence.typed.CommandTypeKey import app.softnetwork.scheduler.message._ import app.softnetwork.scheduler.config.{SchedulerConfig, SchedulerSettings} -import app.softnetwork.scheduler.model.{CronTab, Schedule, Scheduler} +import app.softnetwork.scheduler.model.Scheduler import app.softnetwork.scheduler.persistence.typed.SchedulerBehavior import scala.concurrent.duration.DurationInt @@ -60,26 +60,6 @@ trait SchedulerDao extends Completion { } } - private[scheduler] def addSchedule( - schedule: Schedule - )(implicit system: ActorSystem[_]): Future[Boolean] = { - implicit val ec: ExecutionContextExecutor = system.executionContext - !?(AddSchedule(schedule)).map { - case _: ScheduleAdded => true - case _ => false - } - } - - private[scheduler] def addCronTab( - cronTab: CronTab - )(implicit system: ActorSystem[_]): Future[Boolean] = { - implicit val ec: ExecutionContextExecutor = system.executionContext - !?(AddCronTab(cronTab)).map { - case _: CronTabAdded => true - case _ => false - } - } - def loadScheduler( schedulerId: Option[String] = None )(implicit system: ActorSystem[_]): Future[Option[Scheduler]] = { diff --git a/core/src/main/scala/app/softnetwork/scheduler/persistence/typed/SchedulerBehavior.scala b/core/src/main/scala/app/softnetwork/scheduler/persistence/typed/SchedulerBehavior.scala index a59337a..69165d4 100644 --- a/core/src/main/scala/app/softnetwork/scheduler/persistence/typed/SchedulerBehavior.scala +++ b/core/src/main/scala/app/softnetwork/scheduler/persistence/typed/SchedulerBehavior.scala @@ -71,8 +71,7 @@ private[scheduler] trait SchedulerBehavior )(implicit context: ActorContext[SchedulerCommand]): Effect[SchedulerEvent, Option[Scheduler]] = command match { case cmd: ResetCronTabsAndSchedules => - def trigerResetCronTabsAndSchedules( - scheduler: Scheduler, + def triggerResetCronTabsAndSchedules( switch: Boolean ): EffectBuilder[SchedulerEvent, Option[Scheduler]] = { implicit val system: ActorSystem[_] = context.system @@ -96,18 +95,27 @@ private[scheduler] trait SchedulerBehavior ((now().getTime - scheduler.getLastCronTabsAndSchedulesReseted.getTime) > SchedulerSettings.SchedulerConfig.resetScheduler.delay * 1000) ) { + // add cron tabs scheduler.cronTabs.foreach { cronTab => context.self ! AddCronTab(cronTab) } - scheduler.schedules.filter(_.scheduledDate.isDefined).foreach { schedule => - context.self ! AddSchedule(schedule) + // remove schedules + scheduler.schedules.filter(_.removable).foreach { schedule => + context.self ! RemoveSchedule( + schedule.persistenceId, + schedule.entityId, + schedule.key + ) + } + // trigger schedules + scheduler.schedules.filter(_.triggerable).foreach { schedule => + triggerSchedule(timers, context, schedule) } if (context.log.isInfoEnabled) context.log.info( s"${scheduler.cronTabs.size} cron tabs and ${scheduler.schedules.size} schedules reseted" ) - trigerResetCronTabsAndSchedules( - scheduler, + triggerResetCronTabsAndSchedules( switch = !scheduler.getTriggerResetCronTabsAndSchedules ) } else { @@ -117,7 +125,7 @@ private[scheduler] trait SchedulerBehavior if scheduler.lastCronTabsAndSchedulesReseted.isEmpty || ((now().getTime - scheduler.getLastCronTabsAndSchedulesReseted.getTime) > SchedulerSettings.SchedulerConfig.resetScheduler.delay * 1000) => - trigerResetCronTabsAndSchedules(scheduler, switch = false) + triggerResetCronTabsAndSchedules(switch = false) case _ => Effect.none.thenRun(_ => CronTabsAndSchedulesNotReseted ~> replyTo) } case ResetScheduler => // add all schedules @@ -135,9 +143,9 @@ private[scheduler] trait SchedulerBehavior Effect .persist(events) .thenRun(_ => { - scheduler.schedules.foreach { schedule => - context.self ! AddSchedule(schedule) - } +// scheduler.schedules/*.filter(_.scheduledDate.isEmpty)*/.foreach { schedule => +// context.self ! AddSchedule(schedule) +// } if (context.log.isInfoEnabled) context.log.info("Scheduler reseted") SchedulerReseted ~> replyTo @@ -186,23 +194,13 @@ private[scheduler] trait SchedulerBehavior ScheduleNotAdded } } else { + if (context.log.isInfoEnabled) + context.log.info(s"$schedule added") if (updatedSchedule.triggerable) { - if (context.log.isInfoEnabled) - context.log.info(s"Triggering schedule $updatedSchedule") - timers.startSingleTimer( - updatedSchedule.uuid, - TriggerSchedule( - updatedSchedule.persistenceId, - updatedSchedule.entityId, - updatedSchedule.key - ), - updatedSchedule.delay.seconds - ) + triggerSchedule(timers, context, updatedSchedule) } else if (context.log.isDebugEnabled) { context.log.debug(s"Schedule $updatedSchedule has not been triggered") } - if (context.log.isInfoEnabled) - context.log.info(s"$schedule added") ScheduleAdded(updatedSchedule) }) ~> replyTo }) @@ -419,6 +417,26 @@ private[scheduler] trait SchedulerBehavior case _ => super.handleCommand(entityId, state, command, replyTo, timers) } + private def triggerSchedule( + timers: TimerScheduler[SchedulerCommand], + context: ActorContext[SchedulerCommand], + schedule: Schedule + ): Unit = { + if (!timers.isTimerActive(schedule.uuid)) { + if (context.log.isInfoEnabled) + context.log.info(s"Triggering schedule $schedule") + timers.startSingleTimer( + schedule.uuid, + TriggerSchedule( + schedule.persistenceId, + schedule.entityId, + schedule.key + ), + schedule.delay.seconds + ) + } + } + /** @param state * - current state * @param event diff --git a/testkit/src/main/scala/app/softnetwork/scheduler/persistence/typed/SampleBehavior.scala b/testkit/src/main/scala/app/softnetwork/scheduler/persistence/typed/SampleBehavior.scala index 47e0c8a..abbef53 100644 --- a/testkit/src/main/scala/app/softnetwork/scheduler/persistence/typed/SampleBehavior.scala +++ b/testkit/src/main/scala/app/softnetwork/scheduler/persistence/typed/SampleBehavior.scala @@ -6,7 +6,7 @@ import akka.persistence.typed.scaladsl.Effect import app.softnetwork.persistence.message.{Command, CommandResult, Event} import app.softnetwork.persistence.typed._ import app.softnetwork.scheduler.config.SchedulerSettings -import app.softnetwork.scheduler.message.RemoveSchedule +import app.softnetwork.scheduler.message.AddSchedule import app.softnetwork.scheduler.message.SampleMessages.{ AddSample, LoadSample, @@ -19,7 +19,7 @@ import app.softnetwork.scheduler.message.SampleMessages.{ TriggerSample } import app.softnetwork.scheduler.message.SchedulerEvents.ExternalEntityToSchedulerEvent -import app.softnetwork.scheduler.model.Sample +import app.softnetwork.scheduler.model.{Sample, Schedule} trait SampleBehavior extends EntityBehavior[Command, Sample, Event, CommandResult] { override def persistenceId: String = "Sample" @@ -48,11 +48,14 @@ trait SampleBehavior extends EntityBehavior[Command, Sample, Event, CommandResul List( SampleTriggeredEvent(state.map(_.triggered + 1).getOrElse(1)), ExternalEntityToSchedulerEvent( - ExternalEntityToSchedulerEvent.Wrapped.RemoveSchedule( - RemoveSchedule( - persistenceId, - entityId, - cmd.key + ExternalEntityToSchedulerEvent.Wrapped.AddSchedule( + AddSchedule( + Schedule( + persistenceId, + entityId, + cmd.key, + 1 + ) ) ) ) diff --git a/testkit/src/main/scala/app/softnetwork/scheduler/scalatest/SchedulerTestKit.scala b/testkit/src/main/scala/app/softnetwork/scheduler/scalatest/SchedulerTestKit.scala index 5c480c6..6085a14 100644 --- a/testkit/src/main/scala/app/softnetwork/scheduler/scalatest/SchedulerTestKit.scala +++ b/testkit/src/main/scala/app/softnetwork/scheduler/scalatest/SchedulerTestKit.scala @@ -1,11 +1,18 @@ package app.softnetwork.scheduler.scalatest +import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed.ActorSystem import app.softnetwork.persistence.query.InMemoryJournalProvider import app.softnetwork.persistence.scalatest.InMemoryPersistenceTestKit import app.softnetwork.scheduler.config.SchedulerSettings import app.softnetwork.scheduler.handlers.SchedulerHandler import app.softnetwork.scheduler.launch.SchedulerGuardian +import app.softnetwork.scheduler.message.{ + CronTabAdded, + CronTabRemoved, + ScheduleAdded, + ScheduleRemoved +} import app.softnetwork.scheduler.persistence.query.Entity2SchedulerProcessorStream import org.scalatest.Suite @@ -27,4 +34,16 @@ trait SchedulerTestKit extends SchedulerGuardian with InMemoryPersistenceTestKit logger.info(tag) } + val probeScheduleAdded: TestProbe[ScheduleAdded] = createTestProbe[ScheduleAdded]() + subscribeProbe(probeScheduleAdded) + + val probeScheduleRemoved: TestProbe[ScheduleRemoved] = createTestProbe[ScheduleRemoved]() + subscribeProbe(probeScheduleRemoved) + + val probeCronTabAdded: TestProbe[CronTabAdded] = createTestProbe[CronTabAdded]() + subscribeProbe(probeCronTabAdded) + + val probeCronTabRemoved: TestProbe[CronTabRemoved] = createTestProbe[CronTabRemoved]() + subscribeProbe(probeCronTabRemoved) + } diff --git a/testkit/src/main/scala/app/softnetwork/scheduler/scalatest/SchedulerWithSampleTestKit.scala b/testkit/src/main/scala/app/softnetwork/scheduler/scalatest/SchedulerWithSampleTestKit.scala index d41de6f..d1781f7 100644 --- a/testkit/src/main/scala/app/softnetwork/scheduler/scalatest/SchedulerWithSampleTestKit.scala +++ b/testkit/src/main/scala/app/softnetwork/scheduler/scalatest/SchedulerWithSampleTestKit.scala @@ -6,7 +6,6 @@ import app.softnetwork.persistence.launch import app.softnetwork.persistence.launch.PersistenceGuardian._ import app.softnetwork.persistence.query.InMemoryJournalProvider import app.softnetwork.scheduler.config.SchedulerSettings -import app.softnetwork.scheduler.message.ScheduleRemoved import app.softnetwork.scheduler.persistence.query.{ SampleScheduleTriggered, Scheduler2EntityProcessorStream, @@ -31,7 +30,4 @@ trait SchedulerWithSampleTestKit extends SchedulerTestKit { _: Suite => createTestProbe[SampleScheduleTriggered]() subscribeProbe(probeSampleSchedule) - val probeScheduleRemoved: TestProbe[ScheduleRemoved] = createTestProbe[ScheduleRemoved]() - subscribeProbe(probeScheduleRemoved) - } diff --git a/testkit/src/test/scala/app/softnetwork/scheduler/handlers/SchedulerHandlerSpec.scala b/testkit/src/test/scala/app/softnetwork/scheduler/handlers/SchedulerHandlerSpec.scala index d3541c3..f6111f0 100644 --- a/testkit/src/test/scala/app/softnetwork/scheduler/handlers/SchedulerHandlerSpec.scala +++ b/testkit/src/test/scala/app/softnetwork/scheduler/handlers/SchedulerHandlerSpec.scala @@ -45,7 +45,7 @@ class SchedulerHandlerSpec case _: CronTabTriggered => succeed case other => fail(other.getClass) } - // a schedule for the Sample[sample] entity has been triggered at the next cron job date + // a schedule for the Sample[sample] entity has been triggered probeSampleSchedule.receiveMessage() SampleHandler ? ("sample", LoadSample) assert { case r: SampleLoaded => assert(r.sample.triggered == 1) @@ -59,24 +59,27 @@ class SchedulerHandlerSpec scheduler.schedules.find(s => s.persistenceId == SampleBehavior.persistenceId && s.entityId == "sample" && s.key == cronTab.key ) match { - case Some(schedule) => - assert(!schedule.repeatedly.getOrElse(false)) - assert(schedule.getScheduledDate.equals(schedule.getLastTriggered)) - assert(schedule.getCronTab == cronTab.uuid) - assert(!schedule.triggerable) - assert(schedule.removable) - case _ => fail("schedule not found") + case None => succeed + case _ => fail("schedule found") } case _ => fail() } - // the schedule for the Sample[sample] entity has been removed - probeScheduleRemoved.receiveMessage() + // a schedule for the Sample[sample] entity has been added + probeScheduleAdded.receiveMessage() + // the schedule for the Sample[sample] entity has been triggered + probeSampleSchedule.receiveMessage() + SampleHandler ? ("sample", LoadSample) assert { + case r: SampleLoaded => assert(r.sample.triggered == 2) + case other => fail(other.getClass) + } } "remove Cron Tab" in { this !? RemoveCronTab(cronTab.persistenceId, cronTab.entityId, cronTab.key) assert { case _: CronTabRemoved => succeed case other => fail(other.getClass) } + // the schedule for the Sample[sample] entity has been removed + probeScheduleRemoved.receiveMessage() this !? LoadScheduler assert { case r: SchedulerLoaded => val scheduler = r.scheduler