Skip to content

Commit

Permalink
Thread safety fix for the issue enragedginger#33
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Kuleshov authored and Ben Iofel committed May 20, 2020
1 parent d45e726 commit a754246
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 26 deletions.
6 changes: 6 additions & 0 deletions src/main/scala/Implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,10 @@ object `package` {
implicit val quartzJobLoggerType: LogSource[SimpleActorMessageJob] = new LogSource[SimpleActorMessageJob] {
def genString(t: SimpleActorMessageJob): String = "[QuartzJob]"
}

implicit def scalaFunctionToJavaBiFunction[From1, From2, To](function: (From1, From2) => To): java.util.function.BiFunction[From1, From2, To] = {
new java.util.function.BiFunction[From1, From2, To] {
override def apply(input: From1, value: From2): To = function(input, value)
}
}
}
58 changes: 32 additions & 26 deletions src/main/scala/QuartzSchedulerExtension.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.typesafe.akka.extension.quartz

import java.text.ParseException
import java.util.concurrent.ConcurrentHashMap
import java.util.{Date, TimeZone}

import akka.actor._
Expand All @@ -11,7 +12,6 @@ import org.quartz.impl.DirectSchedulerFactory
import org.quartz.simpl.{RAMJobStore, SimpleThreadPool}
import org.quartz.spi.JobStore

import scala.collection.{immutable, mutable}
import scala.collection.JavaConverters._
import scala.util.control.Exception._

Expand Down Expand Up @@ -57,8 +57,11 @@ class QuartzSchedulerExtension(system: ExtendedActorSystem) extends Extension {
*
* RECAST KEY AS UPPERCASE TO AVOID RUNTIME LOOKUP ISSUES
*/
var schedules: immutable.Map[String, QuartzSchedule] = QuartzSchedules(config, defaultTimezone)
val runningJobs: mutable.Map[String, JobKey] = mutable.Map.empty[String, JobKey]
val schedules: ConcurrentHashMap[String, QuartzSchedule] = new ConcurrentHashMap[String, QuartzSchedule]
QuartzSchedules(config, defaultTimezone).foreach { kv =>
schedules.put(kv._1.toUpperCase, kv._2)
}
val runningJobs: ConcurrentHashMap[String, JobKey] = new ConcurrentHashMap[String, JobKey]

log.debug("Configured Schedules: {}", schedules)

Expand Down Expand Up @@ -98,7 +101,7 @@ class QuartzSchedulerExtension(system: ExtendedActorSystem) extends Extension {
def nextTrigger(name: String): Option[Date] = {
import scala.collection.JavaConverters._
for {
jobKey <- runningJobs.get(name)
jobKey <- runningJobs.asScala.get(name)
trigger <- scheduler.getTriggersOfJob(jobKey).asScala.headOption
} yield trigger.getNextFireTime
}
Expand Down Expand Up @@ -127,7 +130,7 @@ class QuartzSchedulerExtension(system: ExtendedActorSystem) extends Extension {
* @return Success or Failure in a Boolean
*/
def suspendJob(name: String): Boolean = {
runningJobs.get(name) match {
Option(runningJobs.get(name)) match {
case Some(job) =>
log.info("Suspending Quartz Job '{}'", name)
scheduler.pauseJob(job)
Expand All @@ -146,7 +149,7 @@ class QuartzSchedulerExtension(system: ExtendedActorSystem) extends Extension {
* @return Success or Failure in a Boolean
*/
def resumeJob(name: String): Boolean = {
runningJobs.get(name) match {
Option(runningJobs.get(name)) match {
case Some(job) =>
log.info("Resuming Quartz Job '{}'", name)
scheduler.resumeJob(job)
Expand All @@ -173,11 +176,11 @@ class QuartzSchedulerExtension(system: ExtendedActorSystem) extends Extension {
* @return Success or Failure in a Boolean
*/
def cancelJob(name: String): Boolean = {
runningJobs.get(name) match {
Option(runningJobs.get(name)) match {
case Some(job) =>
log.info("Cancelling Quartz Job '{}'", name)
val result = scheduler.deleteJob(job)
runningJobs -= name
runningJobs.remove(name)
result
case None =>
log.warning("No running Job named '{}' found: Cannot cancel", name)
Expand Down Expand Up @@ -264,17 +267,20 @@ class QuartzSchedulerExtension(system: ExtendedActorSystem) extends Extension {
*
*/
def createSchedule(name: String, description: Option[String] = None, cronExpression: String, calendar: Option[String] = None,
timezone: TimeZone = defaultTimezone) = schedules.get(name) match {
case Some(sched) =>
throw new IllegalArgumentException(s"A schedule with this name already exists: [$name]")
case None =>
val expression = catching(classOf[ParseException]) either new CronExpression(cronExpression) match {
case Left(t) =>
throw new IllegalArgumentException(s"Invalid 'expression' for Cron Schedule '$name'. Failed to validate CronExpression.", t)
case Right(expr) => expr
timezone: TimeZone = defaultTimezone) = {
schedules.compute(name.toUpperCase, (key: String, value: QuartzSchedule) => {
Option(value) match {
case Some(sched) =>
throw new IllegalArgumentException(s"A schedule with this name already exists: [$key]")
case None =>
val expression = catching(classOf[ParseException]) either new CronExpression(cronExpression) match {
case Left(t) =>
throw new IllegalArgumentException(s"Invalid 'expression' for Cron Schedule '$key'. Failed to validate CronExpression.", t)
case Right(expr) => expr
}
new QuartzCronSchedule(key, description, expression, timezone, calendar)
}
val quartzSchedule = new QuartzCronSchedule(name, description, expression, timezone, calendar)
schedules += (name -> quartzSchedule)
})
}

/**
Expand All @@ -297,7 +303,7 @@ class QuartzSchedulerExtension(system: ExtendedActorSystem) extends Extension {
scheduleInternal(name, receiver, msg, None)
}

private def removeSchedule(name: String) = schedules = schedules - name
private def removeSchedule(name: String) = schedules.remove(name.toUpperCase)

/**
* Schedule a job, whose named configuration must be available
Expand Down Expand Up @@ -370,7 +376,7 @@ class QuartzSchedulerExtension(system: ExtendedActorSystem) extends Extension {
* @param startDate The optional date indicating the earliest time the job may fire.
* @return A date which indicates the first time the trigger will fire.
*/
private def scheduleInternal(name: String, receiver: AnyRef, msg: AnyRef, startDate: Option[Date]): Date = schedules.get(name) match {
private def scheduleInternal(name: String, receiver: AnyRef, msg: AnyRef, startDate: Option[Date]): Date = Option(schedules.get(name.toUpperCase)) match {
case Some(schedule) => scheduleJob(name, receiver, msg, startDate)(schedule)
case None => throw new IllegalArgumentException("No matching quartz configuration found for schedule '%s'".format(name))
}
Expand All @@ -397,17 +403,17 @@ class QuartzSchedulerExtension(system: ExtendedActorSystem) extends Extension {
.withDescription(schedule.description.orNull)
.build()

log.debug("Adding jobKey {} to runningJobs map.", job.getKey)

runningJobs += name -> job.getKey

log.debug("Building Trigger with startDate '{}", startDate.getOrElse(new Date()))
val trigger = schedule.buildTrigger(name, startDate)

log.debug("Scheduling Job '{}' and Trigger '{}'. Is Scheduler Running? {}", job, trigger, scheduler.isStarted)
scheduler.scheduleJob(job, trigger)
}
val firstFireTime = scheduler.scheduleJob(job, trigger)

log.debug("Adding jobKey {} to runningJobs map.", job.getKey)
runningJobs.putIfAbsent(name, job.getKey)

firstFireTime
}

/**
* Parses calendar configurations, creates Calendar instances and attaches them to the scheduler
Expand Down

0 comments on commit a754246

Please sign in to comment.