Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #20462: Delay policy generation until rudder app is fully boot #4066

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,30 @@ import net.liftweb.actor._
import org.joda.time._
import com.normation.inventory.domain.NodeId
import com.normation.rudder.services.policies.PromiseGenerationService

import net.liftweb.http.ListenerManager
import com.normation.eventlog.{EventActor, EventLog}
import com.normation.rudder.domain.eventlog._
import com.normation.rudder.services.marshalling.DeploymentStatusSerialisation
import com.normation.rudder.services.eventlog.EventLogDeploymentService

import net.liftweb.common._
import com.normation.eventlog.EventLogDetails

import scala.xml.NodeSeq
import com.normation.eventlog.ModificationId

import com.normation.errors.RudderError
import com.normation.errors.Unexpected
import com.normation.rudder.domain.logger.PolicyGenerationLogger
import com.normation.zio._
import com.normation.rudder.domain.logger.PolicyGenerationLoggerPure

import com.normation.zio._
import scala.concurrent.duration.Duration

import zio._
import zio.syntax._
import com.normation.errors._

sealed trait StartDeploymentMessage

//ask for a new deployment - automatic deployment !
Expand Down Expand Up @@ -144,6 +150,7 @@ final class AsyncDeploymentActor(
, deploymentStatusSerialisation: DeploymentStatusSerialisation
, getGenerationDelay : () => IOResult[Duration]
, deploymentPolicy : () => IOResult[PolicyGenerationTrigger]
, bootGuard : Promise[Nothing, Unit]
) extends LiftActor with ListenerManager with AsyncDeploymentAgent {

deploymentManager =>
Expand Down Expand Up @@ -221,7 +228,8 @@ final class AsyncDeploymentActor(
override protected def lowPriority = {

//
// Start a new deployment
// Start a new deployment. Some triggers can be inhibited with
// the `rudder_generation_trigger` (all, none, onlymanual) setting.
//
case AutomaticStartDeployment(modId, actor) => {
implicit val a = actor
Expand Down Expand Up @@ -381,39 +389,43 @@ final class AsyncDeploymentActor(
*/
private[this] object DeployerAgent extends LiftActor {

def doDeployement(bootSemaphore: Promise[Nothing, Unit], delay: IOResult[Duration], nd: NewDeployment): UIO[Unit] = {
val prog = for {
_ <- PolicyGenerationLoggerPure.manager.trace("Policy updater Agent: start to update policies")
d <- delay
_ <- ZIO.when(d.toMillis > 0) {
PolicyGenerationLoggerPure.manager.debug(s"Policy generation will start in ${delay.toString}")
}
_ <- (for {
_ <- PolicyGenerationLoggerPure.manager.debug(s"Policy generation starts now!")
res <- deploymentService.deploy().toIO.foldM(
err => PolicyGenerationLoggerPure.manager.error(s"Error when updating policy, reason was: ${err.fullMsg}") *>
Failure(err.fullMsg).succeed
, ok => Full(ok).succeed
)
_ <- IOResult.effect(deploymentManager ! DeploymentResult(nd.id, nd.modId, nd.started, DateTime.now, res, nd.actor, nd.eventLogId))
} yield ()).delay(zio.duration.Duration.fromScala(d)).provide(ZioRuntime.environment)
} yield ()

val managedErr = prog.catchAll { err =>
val failure = Failure(s"Exception caught during policy update process: ${err.fullMsg}")
IOResult.effect(
deploymentManager ! DeploymentResult(nd.id, nd.modId, nd.started, DateTime.now, failure, nd.actor, nd.eventLogId)
).catchAll(fatal =>
PolicyGenerationLoggerPure.manager.error(s"Fatal error when trying to send previous error to policy generation manager. First error was: ${err.fullMsg}. Fatal error is: ${fatal.fullMsg}")
)
}

bootSemaphore.await *> managedErr
}

override protected def messageHandler = {
//
// Start a new deployment
// Start a new deployment. Wait for the guard to be released (in `Boot.boot`).
// Each generation can be delayed by a given duration with the `rudder_generation_delay` setting.
//
case NewDeployment(id, modId, startTime, actor, eventId) =>
PolicyGenerationLogger.manager.trace("Policy updater Agent: start to update policies")
try {
val delay = getGenerationDelay().either.runNow match {
case Right(delay) => delay
case Left(_) => Duration("0s")
}
if (delay.toMillis != 0L) {
PolicyGenerationLogger.debug(s"Policy generation will start in ${delay.toString}")
}
Thread.sleep(delay.toMillis)
PolicyGenerationLogger.debug(s"Policy generation starts now!")

val result = deploymentService.deploy()

result match {
case Full(_) => // nothing to report
case m: Failure =>
PolicyGenerationLogger.manager.error(s"Error when updating policy, reason was: ${m.messageChain}")
m.rootExceptionCause.foreach { ex =>
PolicyGenerationLogger.manager.error(s"Root exception was: ${RudderError.formatException(ex)}")
}
case Empty => PolicyGenerationLogger.manager.error("Error when updating policy (no reason given)")
}

deploymentManager ! DeploymentResult(id, modId, startTime,DateTime.now, result, actor, eventId)
} catch {
case e:Exception => deploymentManager ! DeploymentResult(id, modId, startTime, DateTime.now, Failure(s"Exception caught during policy update process: ${e.getMessage}",Full(e), Empty), actor, eventId)
}
case nd:NewDeployment =>
doDeployement(bootGuard, getGenerationDelay(),nd).runNow

//
//Unexpected messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ object PolicyGenerationLogger extends Logger {
object PolicyGenerationLoggerPure extends NamedZioLogger {
override def loggerName = "policy.generation"

// async deployment agent
object manager extends NamedZioLogger {
override def loggerName = "policy.generation.manager"
}

object expectedReports extends NamedZioLogger {
override def loggerName = "policy.generation.expected_reports"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ final case class FullNodeGroupCategory(
case FullOtherTarget(t) => t match {
case AllTarget => true
case AllTargetExceptPolicyServers => !node.isPolicyServer
case AllPolicyServers => node.isPolicyServer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file should not change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it removes a warning missing pattern match (and the corresponding runtime exception) for a previous pr regarding migration.
I'm adding a comment in head.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

case PolicyServerTarget(id) => id == node.id
}
} }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ import com.normation.rudder.ncf.TechniqueReader
import com.normation.rudder.ncf.TechniqueSerializer
import com.normation.rudder.ncf.TechniqueWriter
import com.normation.rudder.services.policies.RuleApplicationStatusServiceImpl
import com.normation.zio._

/*
* This file provides all the necessary plumbing to allow test REST API.
Expand Down Expand Up @@ -308,7 +309,11 @@ class RestTestSetUp {
override def runFailureHooks(generationTime: DateTime, endTime: DateTime, systemEnv: HookEnvPairs, errorMessage: String, errorMessagePath: String): Box[Unit] = ???
override def invalidateComplianceCache(actions: Seq[(NodeId, CacheExpectedReportAction)]): IOResult[Unit] = ???
}
val asyncDeploymentAgent = new AsyncDeploymentActor(policyGeneration, eventLogger, deploymentStatusSerialisation, () => Duration("0s").succeed, () => AllGeneration.succeed)
val bootGuard = (for {
p <- Promise.make[Nothing, Unit]
_ <- p.succeed(())
} yield p).runNow
val asyncDeploymentAgent = new AsyncDeploymentActor(policyGeneration, eventLogger, deploymentStatusSerialisation, () => Duration("0s").succeed, () => AllGeneration.succeed, bootGuard)

val findDependencies = new FindDependencies { //never find any dependencies
override def findRulesForDirective(id: DirectiveUid): IOResult[Seq[Rule]] = Nil.succeed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,10 @@ class Boot extends Loggable {
ApplicationLogger.error(s"Error when trying to save the EventLog for application start: ${err.fullMsg}")
case Right(_) => ApplicationLogger.info("Application Rudder started")
}

// release guard for promise generation awaiting end of boot
RudderConfig.policyGenerationBootGuard.succeed(()).runNow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would never have thought to put it there, i was more thinking to put it at the end of bootcheck. It is without doubt better the way you did


}

// Run a health check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2066,13 +2066,17 @@ object RudderConfig extends Loggable {
, POSTGRESQL_IS_LOCAL
)}


lazy val policyGenerationBootGuard = zio.Promise.make[Nothing, Unit].runNow

private[this] lazy val asyncDeploymentAgentImpl: AsyncDeploymentActor = {
val agent = new AsyncDeploymentActor(
deploymentService
, eventLogDeploymentServiceImpl
, deploymentStatusSerialisation
, () => configService.rudder_generation_delay()
, () => configService.rudder_generation_trigger()
, policyGenerationBootGuard
)
techniqueRepositoryImpl.registerCallback(
new DeployOnTechniqueCallback("DeployOnPTLibUpdate", 1000, agent)
Expand Down