Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #25092: Refactor CachedReportingService to make persistance simpler #5757

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ object ComplianceLogger extends Logger {
override protected def _logger: slf4j.Logger = LoggerFactory.getLogger("compliance")
}

object ComplianceLoggerPure extends NamedZioLogger {
override def loggerName: String = "compliance"
}

object ReportLogger extends Logger {
override protected def _logger: slf4j.Logger = LoggerFactory.getLogger("report")

Expand All @@ -267,12 +271,15 @@ object FactQueryProcessorLoggerPure extends NamedZioLogger {
object ReportLoggerPure extends NamedZioLogger {
override def loggerName: String = "report"

object Changes extends NamedZioLogger {
object Changes extends NamedZioLogger {
override def loggerName: String = "report.changes"
}
object Cache extends NamedZioLogger {
object Cache extends NamedZioLogger {
override def loggerName: String = "report.cache"
}
object Repository extends NamedZioLogger {
override def loggerName: String = "report.repository"
}
}

object GitRepositoryLogger extends NamedZioLogger() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ class GenerationOnChange(
* Callback related to cache invalidation when a node changes
*/
class CacheInvalidateNodeFactEventCallback(
cacheExpectedReports: InvalidateCache[CacheExpectedReportAction],
cacheConfiguration: InvalidateCache[CacheComplianceQueueAction],
cacheToClear: List[CachedRepository]
cacheExpectedReports: InvalidateCache[CacheExpectedReportAction],
cacheNodeStatusReports: InvalidateCache[CacheComplianceQueueAction],
cacheToClear: List[CachedRepository]
) extends NodeFactChangeEventCallback {

import com.normation.rudder.services.reports.CacheExpectedReportAction.*
Expand All @@ -170,7 +170,7 @@ class CacheInvalidateNodeFactEventCallback(
// ping the NodeConfiguration Cache and NodeCompliance Cache about this new node
val i = InsertNodeInCache(node.id)
for {
_ <- cacheConfiguration
_ <- cacheNodeStatusReports
.invalidateWithAction(Seq((node.id, CacheComplianceQueueAction.ExpectedReportAction(i))))
.chainError(s"Error when adding node ${node.id.value} to node configuration cache")
_ <- cacheExpectedReports
Expand All @@ -187,7 +187,7 @@ class CacheInvalidateNodeFactEventCallback(
for {
_ <- NodeLoggerPure.Delete.debug(s" - remove node ${node.id.value} from compliance and expected report cache")
_ <-
cacheConfiguration
cacheNodeStatusReports
.invalidateWithAction(Seq((node.id, CacheComplianceQueueAction.ExpectedReportAction(a))))
.catchAll(err => {
NodeLoggerPure.Delete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ import com.normation.rudder.repository.ReportsRepository
import com.normation.rudder.score.ComplianceScoreEvent
import com.normation.rudder.score.ScoreServiceManager
import com.normation.rudder.services.reports.CacheComplianceQueueAction.UpdateCompliance
import com.normation.rudder.services.reports.CachedFindRuleNodeStatusReports
import com.normation.rudder.services.reports.CachedNodeChangesServiceImpl
import com.normation.rudder.services.reports.ComputeNodeStatusReportService
import com.normation.rudder.services.reports.FindNewNodeStatusReports
import net.liftweb.common.*
import org.joda.time.DateTime
import org.joda.time.format.PeriodFormat
Expand All @@ -65,12 +66,13 @@ final case class InvalidateComplianceCacheMsg(updatedNodeIds: Set[NodeId])
* interval of same criticity together.
*/
class ReportsExecutionService(
reportsRepository: ReportsRepository,
statusUpdateRepository: LastProcessedReportRepository,
cachedChanges: CachedNodeChangesServiceImpl,
cachedCompliance: CachedFindRuleNodeStatusReports,
complianceRepos: ComplianceRepository,
scoreServiceManager: ScoreServiceManager
reportsRepository: ReportsRepository,
statusUpdateRepository: LastProcessedReportRepository,
cachedChanges: CachedNodeChangesServiceImpl,
computeNodeStatusReportService: ComputeNodeStatusReportService,
findNewNodeStatusReports: FindNewNodeStatusReports,
complianceRepos: ComplianceRepository,
scoreServiceManager: ScoreServiceManager
) {

val logger = ReportLogger
Expand Down Expand Up @@ -158,18 +160,17 @@ class ReportsExecutionService(

val startCompliance = System.currentTimeMillis
for {
nodeWithCompliances <- cachedCompliance.findUncomputedNodeStatusReports()
_ <- cachedCompliance.invalidateWithAction(nodeWithCompliances.map {
nodeWithCompliances <- findNewNodeStatusReports.findUncomputedNodeStatusReports()
_ <- computeNodeStatusReportService.invalidateWithAction(nodeWithCompliances.map {
case (nodeid, compliance) => (nodeid, UpdateCompliance(nodeid, compliance))
}.toSeq)
_ <- ReportLoggerPure.Cache.debug(
s"Invalidated and updated compliance for nodes ${nodeWithCompliances.map(_._1.value).mkString(",")}"
)
_ <- complianceRepos.saveRunCompliance(nodeWithCompliances.values.toList) // unsure if here or in the queue
_ <- ZIO.foreachDiscard(nodeWithCompliances) { case (id, r) => updateScore(id, r) }

_ <- cachedCompliance.outDatedCompliance()
_ <-
_ <- computeNodeStatusReportService.outDatedCompliance()
_ <-
ReportLoggerPure.Cache.debug(
s"Computing compliance in : ${PeriodFormat.getDefault().print(Duration.millis(System.currentTimeMillis - startCompliance).toPeriod())}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ package com.normation.rudder.repository

import com.normation.errors.IOResult
import com.normation.inventory.domain.NodeId
import com.normation.rudder.domain.policies.DirectiveId
import com.normation.rudder.domain.policies.RuleId
import com.normation.rudder.domain.reports.*
import com.normation.rudder.reports.execution.AgentRun
Expand Down Expand Up @@ -74,9 +73,7 @@ trait ReportsRepository {
* the result compared to inputs.
*/
def getExecutionReports(
runs: Set[AgentRunId],
filterByRules: Set[RuleId],
filterByDirectives: Set[DirectiveId]
runs: Set[AgentRunId]
): IOResult[Map[NodeId, Seq[Reports]]]

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import com.normation.errors.IOResult
import com.normation.inventory.domain.NodeId
import com.normation.rudder.db.Doobie
import com.normation.rudder.db.Doobie.*
import com.normation.rudder.domain.policies.DirectiveId
import com.normation.rudder.domain.policies.RuleId
import com.normation.rudder.domain.reports.*
import com.normation.rudder.domain.reports.Reports
Expand Down Expand Up @@ -79,17 +78,13 @@ class ReportsJdbcRepository(doobie: Doobie) extends ReportsRepository with Logga

// We assume that this method is called with a limited list of runs
override def getExecutionReports(
runs: Set[AgentRunId],
filterByRules: Set[RuleId],
filterByDirectives: Set[DirectiveId]
runs: Set[AgentRunId]
): IOResult[Map[NodeId, Seq[Reports]]] = {
runs.map(n => (n.nodeId.value, n.date)).toList.toNel match {
case None => Map().succeed
case Some(nodeValues) =>
val ruleClause = filterByRules.toList.toNel.map(r => Fragments.in(fr"ruleid", r))
val directiveClause = filterByDirectives.toList.toNel.map(r => Fragments.in(fr"directiveid", r))
val values = Fragments.in(fr"(nodeid, executiontimestamp)", nodeValues)
val where = Fragments.whereAndOpt(Some(values), ruleClause, directiveClause)
val values = Fragments.in(fr"(nodeid, executiontimestamp)", nodeValues)
val where = Fragments.whereAnd(values)

val q =
sql"select executiondate, ruleid, directiveid, nodeid, reportid, component, keyvalue, executiontimestamp, eventtype, msg from RudderSysEvents " ++ where
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ import com.normation.rudder.services.policies.nodeconfig.FileBasedNodeConfigurat
import com.normation.rudder.services.policies.nodeconfig.NodeConfigurationHash
import com.normation.rudder.services.policies.nodeconfig.NodeConfigurationHashRepository
import com.normation.rudder.services.policies.write.PolicyWriterService
import com.normation.rudder.services.reports.CachedFindRuleNodeStatusReports
import com.normation.rudder.services.reports.CachedNodeConfigurationService
import com.normation.rudder.services.reports.CacheExpectedReportAction
import com.normation.rudder.services.reports.FindNewNodeStatusReports
import com.normation.rudder.utils.ParseMaxParallelism
import com.normation.utils.Control.*
import com.softwaremill.quicklens.*
Expand Down Expand Up @@ -827,7 +827,7 @@ class PromiseGenerationServiceImpl(
override val interpolatedValueCompiler: InterpolatedValueCompiler,
override val complianceModeService: ComplianceModeService,
override val agentRunService: AgentRunIntervalService,
override val complianceCache: CachedFindRuleNodeStatusReports,
override val complianceCache: FindNewNodeStatusReports,
override val promisesFileWriterService: PolicyWriterService,
override val writeNodeCertificatesPem: WriteNodeCertificatesPem,
override val cachedNodeConfigurationService: CachedNodeConfigurationService,
Expand Down Expand Up @@ -1649,7 +1649,7 @@ trait PromiseGeneration_updateAndWriteRule extends PromiseGenerationService {
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

trait PromiseGeneration_setExpectedReports extends PromiseGenerationService {
def complianceCache: CachedFindRuleNodeStatusReports
def complianceCache: FindNewNodeStatusReports
def confExpectedRepo: UpdateExpectedReportsRepository
def cachedNodeConfigurationService: CachedNodeConfigurationService

Expand Down
Loading