Skip to content

Commit

Permalink
Stop deployment and reschedule actors less gracefully on reload (#6314)
Browse files Browse the repository at this point in the history
  • Loading branch information
wrzontek committed Jul 12, 2024
1 parent 4b5d26c commit 9be719f
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
import pl.touk.nussknacker.engine.deployment.{CustomActionDefinition, ExternalDeploymentId}
import pl.touk.nussknacker.engine.management.FlinkConfig
import pl.touk.nussknacker.engine.management.periodic.PeriodicProcessService.PeriodicProcessStatus
import pl.touk.nussknacker.engine.management.periodic.Utils.{gracefulStopActor, runSafely}
import pl.touk.nussknacker.engine.management.periodic.Utils.{createActorWithRetry, runSafely}
import pl.touk.nussknacker.engine.management.periodic.db.{DbInitializer, SlickPeriodicProcessesRepository}
import pl.touk.nussknacker.engine.management.periodic.flink.FlinkJarManager
import pl.touk.nussknacker.engine.management.periodic.service.{
AdditionalDeploymentDataProvider,
PeriodicProcessListenerFactory,
ProcessConfigEnricherFactory
}
import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies, newdeployment}
import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies}
import slick.jdbc
import slick.jdbc.JdbcProfile

Expand Down Expand Up @@ -60,21 +60,28 @@ object PeriodicDeploymentManager {
clock,
dependencies.actionService
)
val deploymentActor = dependencies.actorSystem.actorOf(

// These actors have to be created with retries because they can initially fail to create due to taken names,
// if the actors (with the same names) created before reload aren't fully stopped (and their names freed) yet
val deploymentActor = createActorWithRetry(
s"periodic-${periodicBatchConfig.processingType}-deployer",
DeploymentActor.props(service, periodicBatchConfig.deployInterval),
s"periodic-${periodicBatchConfig.processingType}-deployer"
dependencies.actorSystem
)
val rescheduleFinishedActor = dependencies.actorSystem.actorOf(
val rescheduleFinishedActor = createActorWithRetry(
s"periodic-${periodicBatchConfig.processingType}-rescheduler",
RescheduleFinishedActor.props(service, periodicBatchConfig.rescheduleCheckInterval),
s"periodic-${periodicBatchConfig.processingType}-rescheduler"
dependencies.actorSystem
)

val customActionsProvider = customActionsProviderFactory.create(scheduledProcessesRepository, service)

val toClose = () => {
runSafely(listener.close())
runSafely(gracefulStopActor(deploymentActor, dependencies.actorSystem))
runSafely(gracefulStopActor(rescheduleFinishedActor, dependencies.actorSystem))
// deploymentActor and rescheduleFinishedActor just call methods from PeriodicProcessService on interval,
// they don't have any internal state, so stopping them non-gracefully is safe
runSafely(dependencies.actorSystem.stop(deploymentActor))
runSafely(dependencies.actorSystem.stop(rescheduleFinishedActor))
runSafely(db.close())
}
new PeriodicDeploymentManager(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,65 +1,40 @@
package pl.touk.nussknacker.engine.management.periodic

import akka.actor.{ActorNotFound, ActorPath, ActorRef, ActorSystem}
import akka.pattern.{AskTimeoutException, gracefulStop}
import akka.actor.{ActorRef, ActorSystem, Props}
import com.typesafe.scalalogging.LazyLogging

import java.util.concurrent.TimeoutException
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.util.{Failure, Success, Try}

object Utils extends LazyLogging {

private val GracefulStopTimeout = 5 seconds
private val ActorResolutionTimeout = 5 seconds
private val ActorResolutionPause = 50 milliseconds
private val ActorResolutionRetries = 50
private val ActorCreationPause = 50 milliseconds
private val ActorCreationRetries = 50

def runSafely(action: => Unit): Unit = try {
action
} catch {
case t: Throwable => logger.error("Error occurred, but skipping it", t)
}

def gracefulStopActor(actorRef: ActorRef, actorSystem: ActorSystem): Unit = {
import actorSystem.dispatcher
logger.info(s"Gracefully stopping $actorRef")

val gracefulStopFuture = for {
_ <- gracefulStop(actorRef, GracefulStopTimeout)
_ <- waitUntilActorNameIsFree( // this step is necessary because gracefulStop does not guarantee that the supervisor is notified of the name being freed
actorRef.path,
actorSystem
)
} yield {}

Await.result(
gracefulStopFuture,
GracefulStopTimeout + ActorResolutionRetries * (ActorResolutionTimeout + ActorResolutionPause) + (1 second)
)

logger.info(s"Gracefully stopped $actorRef")
}

private def waitUntilActorNameIsFree(actorPath: ActorPath, actorSystem: ActorSystem)(implicit e: ExecutionContext) = {
retry
.Pause(ActorResolutionRetries, ActorResolutionPause)
def createActorWithRetry(actorName: String, props: Props, actorSystem: ActorSystem)(
implicit ec: ExecutionContext
): ActorRef = {
val actorRefFuture = retry
.Pause(ActorCreationRetries, ActorCreationPause)
.apply { () =>
val actorResolutionFuture =
actorSystem
.actorSelection(actorPath)
.resolveOne(ActorResolutionTimeout)
.map(_ => Left(s"Actor path $actorPath is still taken"))

actorResolutionFuture.recover { case _: ActorNotFound =>
Right(s"Actor path $actorPath is free")
Future {
Try(actorSystem.actorOf(props, actorName))
}
}
.map {
case Left(_) => throw new IllegalStateException(s"Failed to free actor path $actorPath within allowed retries")
case Right(_) => ()
case Failure(ex) =>
throw new IllegalStateException(s"Failed to create actor '$actorName' within allowed retries: $ex")
case Success(a) => a
}

Await.result(actorRefFuture, ActorCreationRetries * ActorCreationPause + (1 second))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,104 @@ package pl.touk.nussknacker.engine.management.periodic

import akka.actor.{Actor, ActorSystem, Props}
import akka.testkit.TestKit
import com.typesafe.scalalogging.LazyLogging
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import pl.touk.nussknacker.engine.management.periodic.Utils.createActorWithRetry

class UtilsSpec extends TestKit(ActorSystem("UtilsSpec")) with AnyWordSpecLike with Matchers with BeforeAndAfterAll {
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

class UtilsSpec
extends TestKit(ActorSystem("UtilsSpec"))
with AnyWordSpecLike
with Matchers
with BeforeAndAfterAll
with LazyLogging {

override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}

class TestActor extends Actor {
override def receive: Receive = { case _ => () }
}

"Utils" should {

"gracefully stop actor and free actor path" in {
class TestActor extends Actor {
override def receive: Receive = { case _ =>
()
"create an actor if it's name is free" in {
import system.dispatcher

val actorName = "actorName1" // unique name in each test so that they don't interfere with each other

createActorWithRetry(actorName, Props(new TestActor), system)
}

"create an actor if it's name isn't free but is freed before retrying gives up - idle actor" in {
import system.dispatcher

val actorName = "actorName2" // unique name in each test so that they don't interfere with each other

val actorRef = createActorWithRetry(actorName, Props(new TestActor), system)

val futureA = Future {
createActorWithRetry(actorName, Props(new TestActor), system)
}

val futureB = Future {
Thread.sleep(1000)
system.stop(actorRef)
}

Await.result(Future.sequence(Seq(futureA, futureB)), Duration.Inf)
}

"create an actor if it's name isn't free but is freed before retrying gives up - busy actor" in {
class BusyTestActor extends Actor {
override def receive: Receive = { case msg =>
logger.info(s"Sleeping on the job '$msg' ...")
Thread.sleep(1000)
}
}
val actorName = "actorName"

val actorRef = system.actorOf(Props(new TestActor), actorName)
import system.dispatcher

val actorName = "actorName3" // unique name in each test so that they don't interfere with each other

val actorRef = createActorWithRetry(actorName, Props(new BusyTestActor), system)

var messageCounter = 0
while (messageCounter < 1000) {
actorRef ! s"message number $messageCounter"
messageCounter += 1
}

val futureA = Future {
createActorWithRetry(actorName, Props(new BusyTestActor), system)
}

val futureB = Future {
Thread.sleep(1000)

// if this was gracefulStop, it would take too long to stop the actor, as it would continue processing it's messages
system.stop(actorRef)
}

Await.result(Future.sequence(Seq(futureA, futureB)), Duration.Inf)
}

"fail to create an actor if it's name isn't freed" in {
import system.dispatcher

val actorName = "actorName4" // unique name in each test so that they don't interfere with each other

createActorWithRetry(actorName, Props(new TestActor), system)

Utils.gracefulStopActor(actorRef, system)
(the[IllegalStateException] thrownBy {
createActorWithRetry(actorName, Props(new TestActor), system)
}).getMessage shouldEqual s"Failed to create actor '$actorName' within allowed retries: akka.actor.InvalidActorNameException: actor name [$actorName] is not unique!"

// with normal system.stop(actorRef) or akka.pattern.gracefulStop this throws "actor name is not unique"
system.actorOf(Props(new TestActor), actorName)
}

"ignore exceptions inside runSafely block" in {
Expand Down

0 comments on commit 9be719f

Please sign in to comment.