Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ organization := "app.softnetwork.scheduler"
name := "scheduler-api"

libraryDependencies ++= Seq(
"app.softnetwork.persistence" %% "persistence-jdbc" % Versions.genericPersistence
"app.softnetwork.persistence" %% "akka-persistence-jdbc" % Versions.genericPersistence
)
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ThisBuild / organization := "app.softnetwork"

name := "scheduler"

ThisBuild / version := "0.2.0"
ThisBuild / version := "0.2.1"

ThisBuild / scalaVersion := "2.12.15"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import akka.persistence.typed.PersistenceId
import app.softnetwork.persistence.message.{Command, CommandResult}
import app.softnetwork.persistence.query.{EventProcessorStream, JournalProvider}
import app.softnetwork.persistence.typed.scaladsl.EntityPattern
import app.softnetwork.scheduler.handlers.SchedulerDao
import app.softnetwork.scheduler.message.SchedulerEvents.{
CronTabTriggeredEvent,
CronTabsResetedEvent,
Expand All @@ -22,8 +21,6 @@ trait Scheduler2EntityProcessorStream[C <: Command, R <: CommandResult]
extends EventProcessorStream[SchedulerEvent] {
_: JournalProvider with EntityPattern[C, R] =>

def schedulerDao: SchedulerDao = SchedulerDao

def forTests: Boolean = false

/** Processing event
Expand All @@ -49,11 +46,7 @@ trait Scheduler2EntityProcessorStream[C <: Command, R <: CommandResult]
if (persistenceId.startsWith(schedule.persistenceId)) {
val entityId = persistenceId.split("\\|").last
if (entityId != ALL_KEY) {
if (forTests) {
schedulerDao.addSchedule(schedule.withEntityId(entityId).withDelay(1))
} else {
schedulerDao.addSchedule(schedule.withEntityId(entityId))
}
triggerSchedule(schedule.withEntityId(entityId))
} else {
Future.successful(true)
}
Expand All @@ -80,11 +73,7 @@ trait Scheduler2EntityProcessorStream[C <: Command, R <: CommandResult]
if (persistenceId.startsWith(cronTab.persistenceId)) {
val entityId = persistenceId.split("\\|").last
if (entityId != ALL_KEY) {
if (forTests) {
schedulerDao.addSchedule(schedule.withEntityId(entityId).withDelay(1))
} else {
schedulerDao.addSchedule(schedule.withEntityId(entityId))
}
triggerSchedule(schedule.withEntityId(entityId))
} else {
Future.successful(true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import app.softnetwork.persistence.typed.scaladsl.EntityPattern
import app.softnetwork.persistence.typed.CommandTypeKey
import app.softnetwork.scheduler.message._
import app.softnetwork.scheduler.config.{SchedulerConfig, SchedulerSettings}
import app.softnetwork.scheduler.model.{CronTab, Schedule, Scheduler}
import app.softnetwork.scheduler.model.Scheduler
import app.softnetwork.scheduler.persistence.typed.SchedulerBehavior

import scala.concurrent.duration.DurationInt
Expand Down Expand Up @@ -60,26 +60,6 @@ trait SchedulerDao extends Completion {
}
}

private[scheduler] def addSchedule(
schedule: Schedule
)(implicit system: ActorSystem[_]): Future[Boolean] = {
implicit val ec: ExecutionContextExecutor = system.executionContext
!?(AddSchedule(schedule)).map {
case _: ScheduleAdded => true
case _ => false
}
}

private[scheduler] def addCronTab(
cronTab: CronTab
)(implicit system: ActorSystem[_]): Future[Boolean] = {
implicit val ec: ExecutionContextExecutor = system.executionContext
!?(AddCronTab(cronTab)).map {
case _: CronTabAdded => true
case _ => false
}
}

def loadScheduler(
schedulerId: Option[String] = None
)(implicit system: ActorSystem[_]): Future[Option[Scheduler]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ private[scheduler] trait SchedulerBehavior
)(implicit context: ActorContext[SchedulerCommand]): Effect[SchedulerEvent, Option[Scheduler]] =
command match {
case cmd: ResetCronTabsAndSchedules =>
def trigerResetCronTabsAndSchedules(
scheduler: Scheduler,
def triggerResetCronTabsAndSchedules(
switch: Boolean
): EffectBuilder[SchedulerEvent, Option[Scheduler]] = {
implicit val system: ActorSystem[_] = context.system
Expand All @@ -96,18 +95,27 @@ private[scheduler] trait SchedulerBehavior
((now().getTime - scheduler.getLastCronTabsAndSchedulesReseted.getTime) >
SchedulerSettings.SchedulerConfig.resetScheduler.delay * 1000)
) {
// add cron tabs
scheduler.cronTabs.foreach { cronTab =>
context.self ! AddCronTab(cronTab)
}
scheduler.schedules.filter(_.scheduledDate.isDefined).foreach { schedule =>
context.self ! AddSchedule(schedule)
// remove schedules
scheduler.schedules.filter(_.removable).foreach { schedule =>
context.self ! RemoveSchedule(
schedule.persistenceId,
schedule.entityId,
schedule.key
)
}
// trigger schedules
scheduler.schedules.filter(_.triggerable).foreach { schedule =>
triggerSchedule(timers, context, schedule)
}
if (context.log.isInfoEnabled)
context.log.info(
s"${scheduler.cronTabs.size} cron tabs and ${scheduler.schedules.size} schedules reseted"
)
trigerResetCronTabsAndSchedules(
scheduler,
triggerResetCronTabsAndSchedules(
switch = !scheduler.getTriggerResetCronTabsAndSchedules
)
} else {
Expand All @@ -117,7 +125,7 @@ private[scheduler] trait SchedulerBehavior
if scheduler.lastCronTabsAndSchedulesReseted.isEmpty ||
((now().getTime - scheduler.getLastCronTabsAndSchedulesReseted.getTime) >
SchedulerSettings.SchedulerConfig.resetScheduler.delay * 1000) =>
trigerResetCronTabsAndSchedules(scheduler, switch = false)
triggerResetCronTabsAndSchedules(switch = false)
case _ => Effect.none.thenRun(_ => CronTabsAndSchedulesNotReseted ~> replyTo)
}
case ResetScheduler => // add all schedules
Expand All @@ -135,9 +143,9 @@ private[scheduler] trait SchedulerBehavior
Effect
.persist(events)
.thenRun(_ => {
scheduler.schedules.foreach { schedule =>
context.self ! AddSchedule(schedule)
}
// scheduler.schedules/*.filter(_.scheduledDate.isEmpty)*/.foreach { schedule =>
// context.self ! AddSchedule(schedule)
// }
if (context.log.isInfoEnabled)
context.log.info("Scheduler reseted")
SchedulerReseted ~> replyTo
Expand Down Expand Up @@ -186,23 +194,13 @@ private[scheduler] trait SchedulerBehavior
ScheduleNotAdded
}
} else {
if (context.log.isInfoEnabled)
context.log.info(s"$schedule added")
if (updatedSchedule.triggerable) {
if (context.log.isInfoEnabled)
context.log.info(s"Triggering schedule $updatedSchedule")
timers.startSingleTimer(
updatedSchedule.uuid,
TriggerSchedule(
updatedSchedule.persistenceId,
updatedSchedule.entityId,
updatedSchedule.key
),
updatedSchedule.delay.seconds
)
triggerSchedule(timers, context, updatedSchedule)
} else if (context.log.isDebugEnabled) {
context.log.debug(s"Schedule $updatedSchedule has not been triggered")
}
if (context.log.isInfoEnabled)
context.log.info(s"$schedule added")
ScheduleAdded(updatedSchedule)
}) ~> replyTo
})
Expand Down Expand Up @@ -419,6 +417,26 @@ private[scheduler] trait SchedulerBehavior
case _ => super.handleCommand(entityId, state, command, replyTo, timers)
}

private def triggerSchedule(
timers: TimerScheduler[SchedulerCommand],
context: ActorContext[SchedulerCommand],
schedule: Schedule
): Unit = {
if (!timers.isTimerActive(schedule.uuid)) {
if (context.log.isInfoEnabled)
context.log.info(s"Triggering schedule $schedule")
timers.startSingleTimer(
schedule.uuid,
TriggerSchedule(
schedule.persistenceId,
schedule.entityId,
schedule.key
),
schedule.delay.seconds
)
}
}

/** @param state
* - current state
* @param event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.persistence.typed.scaladsl.Effect
import app.softnetwork.persistence.message.{Command, CommandResult, Event}
import app.softnetwork.persistence.typed._
import app.softnetwork.scheduler.config.SchedulerSettings
import app.softnetwork.scheduler.message.RemoveSchedule
import app.softnetwork.scheduler.message.AddSchedule
import app.softnetwork.scheduler.message.SampleMessages.{
AddSample,
LoadSample,
Expand All @@ -19,7 +19,7 @@ import app.softnetwork.scheduler.message.SampleMessages.{
TriggerSample
}
import app.softnetwork.scheduler.message.SchedulerEvents.ExternalEntityToSchedulerEvent
import app.softnetwork.scheduler.model.Sample
import app.softnetwork.scheduler.model.{Sample, Schedule}

trait SampleBehavior extends EntityBehavior[Command, Sample, Event, CommandResult] {
override def persistenceId: String = "Sample"
Expand Down Expand Up @@ -48,11 +48,14 @@ trait SampleBehavior extends EntityBehavior[Command, Sample, Event, CommandResul
List(
SampleTriggeredEvent(state.map(_.triggered + 1).getOrElse(1)),
ExternalEntityToSchedulerEvent(
ExternalEntityToSchedulerEvent.Wrapped.RemoveSchedule(
RemoveSchedule(
persistenceId,
entityId,
cmd.key
ExternalEntityToSchedulerEvent.Wrapped.AddSchedule(
AddSchedule(
Schedule(
persistenceId,
entityId,
cmd.key,
1
)
)
)
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package app.softnetwork.scheduler.scalatest

import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorSystem
import app.softnetwork.persistence.query.InMemoryJournalProvider
import app.softnetwork.persistence.scalatest.InMemoryPersistenceTestKit
import app.softnetwork.scheduler.config.SchedulerSettings
import app.softnetwork.scheduler.handlers.SchedulerHandler
import app.softnetwork.scheduler.launch.SchedulerGuardian
import app.softnetwork.scheduler.message.{
CronTabAdded,
CronTabRemoved,
ScheduleAdded,
ScheduleRemoved
}
import app.softnetwork.scheduler.persistence.query.Entity2SchedulerProcessorStream
import org.scalatest.Suite

Expand All @@ -27,4 +34,16 @@ trait SchedulerTestKit extends SchedulerGuardian with InMemoryPersistenceTestKit
logger.info(tag)
}

val probeScheduleAdded: TestProbe[ScheduleAdded] = createTestProbe[ScheduleAdded]()
subscribeProbe(probeScheduleAdded)

val probeScheduleRemoved: TestProbe[ScheduleRemoved] = createTestProbe[ScheduleRemoved]()
subscribeProbe(probeScheduleRemoved)

val probeCronTabAdded: TestProbe[CronTabAdded] = createTestProbe[CronTabAdded]()
subscribeProbe(probeCronTabAdded)

val probeCronTabRemoved: TestProbe[CronTabRemoved] = createTestProbe[CronTabRemoved]()
subscribeProbe(probeCronTabRemoved)

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import app.softnetwork.persistence.launch
import app.softnetwork.persistence.launch.PersistenceGuardian._
import app.softnetwork.persistence.query.InMemoryJournalProvider
import app.softnetwork.scheduler.config.SchedulerSettings
import app.softnetwork.scheduler.message.ScheduleRemoved
import app.softnetwork.scheduler.persistence.query.{
SampleScheduleTriggered,
Scheduler2EntityProcessorStream,
Expand All @@ -31,7 +30,4 @@ trait SchedulerWithSampleTestKit extends SchedulerTestKit { _: Suite =>
createTestProbe[SampleScheduleTriggered]()
subscribeProbe(probeSampleSchedule)

val probeScheduleRemoved: TestProbe[ScheduleRemoved] = createTestProbe[ScheduleRemoved]()
subscribeProbe(probeScheduleRemoved)

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SchedulerHandlerSpec
case _: CronTabTriggered => succeed
case other => fail(other.getClass)
}
// a schedule for the Sample[sample] entity has been triggered at the next cron job date
// a schedule for the Sample[sample] entity has been triggered
probeSampleSchedule.receiveMessage()
SampleHandler ? ("sample", LoadSample) assert {
case r: SampleLoaded => assert(r.sample.triggered == 1)
Expand All @@ -59,24 +59,27 @@ class SchedulerHandlerSpec
scheduler.schedules.find(s =>
s.persistenceId == SampleBehavior.persistenceId && s.entityId == "sample" && s.key == cronTab.key
) match {
case Some(schedule) =>
assert(!schedule.repeatedly.getOrElse(false))
assert(schedule.getScheduledDate.equals(schedule.getLastTriggered))
assert(schedule.getCronTab == cronTab.uuid)
assert(!schedule.triggerable)
assert(schedule.removable)
case _ => fail("schedule not found")
case None => succeed
case _ => fail("schedule found")
}
case _ => fail()
}
// the schedule for the Sample[sample] entity has been removed
probeScheduleRemoved.receiveMessage()
// a schedule for the Sample[sample] entity has been added
probeScheduleAdded.receiveMessage()
// the schedule for the Sample[sample] entity has been triggered
probeSampleSchedule.receiveMessage()
SampleHandler ? ("sample", LoadSample) assert {
case r: SampleLoaded => assert(r.sample.triggered == 2)
case other => fail(other.getClass)
}
}
"remove Cron Tab" in {
this !? RemoveCronTab(cronTab.persistenceId, cronTab.entityId, cronTab.key) assert {
case _: CronTabRemoved => succeed
case other => fail(other.getClass)
}
// the schedule for the Sample[sample] entity has been removed
probeScheduleRemoved.receiveMessage()
this !? LoadScheduler assert {
case r: SchedulerLoaded =>
val scheduler = r.scheduler
Expand Down