Skip to content

Commit

Permalink
Fixes #22552: General improvements on Directive compliance API
Browse files Browse the repository at this point in the history
  • Loading branch information
VinceMacBuche committed Mar 29, 2023
1 parent bb4928c commit 2efc650
Show file tree
Hide file tree
Showing 13 changed files with 423 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,20 +170,20 @@ object Doobie {
implicit val DateTimeMeta: Meta[DateTime] =
Meta[java.sql.Timestamp].imap(ts => new DateTime(ts.getTime()))(dt => new java.sql.Timestamp(dt.getMillis))

implicit val ReadRuleId: Read[RuleId] = {
Read[String].map(r => {
implicit val ReadRuleId: Get[RuleId] = {
Get[String].map(r => {
RuleId.parse(r) match {
case Right(rid) => rid
case Left(err) =>
throw new IllegalArgumentException(s"Error when unserializing a report from base: Can not parse rule ID: ${r}: ${err}.")
}
})
}
implicit val WriteRuleId: Write[RuleId] = {
Write[String].contramap(_.serialize)
implicit val PutRuleId: Put[RuleId] = {
Put[String].contramap(_.serialize)
}
implicit val ReadDirectiveId: Read[DirectiveId] = {
Read[String].map(r => {
implicit val ReadDirectiveId: Get[DirectiveId] = {
Get[String].map(r => {
DirectiveId.parse(r) match {
case Right(rid) => rid
case Left(err) =>
Expand All @@ -193,8 +193,8 @@ object Doobie {
}
})
}
implicit val WriteDirectiveId: Write[DirectiveId] = {
Write[String].contramap(_.serialize)
implicit val WriteDirectiveId: Put[DirectiveId] = {
Put[String].contramap(_.serialize)
}

implicit val ReportRead: Read[Reports] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,20 @@ object NodeStatusReport {
nodeStatusReport.reports.filter(r => ruleIds.contains(r.ruleId))
)
}

def filterByDirectives(nodeStatusReport: NodeStatusReport, directiveIds: Set[DirectiveId]): NodeStatusReport = {

new NodeStatusReport(
nodeStatusReport.nodeId,
nodeStatusReport.runInfo,
nodeStatusReport.statusInfo,
nodeStatusReport.overrides,
nodeStatusReport.reports.flatMap { r =>
val filterRule = r.copy(directives = r.directives.filter(d => directiveIds.contains(d._1)))
if (filterRule.directives.isEmpty) None else Some(filterRule)
}
)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ package com.normation.rudder.repository
import com.normation.box._
import com.normation.errors._
import com.normation.inventory.domain.NodeId
import com.normation.rudder.domain.policies.DirectiveId
import com.normation.rudder.domain.policies.RuleId
import com.normation.rudder.domain.reports._
import net.liftweb.common.Box
Expand Down Expand Up @@ -109,6 +110,13 @@ trait FindExpectedReportRepository {
*/
def findCurrentNodeIdsForRule(ruleId: RuleId, nodeIds: Set[NodeId]): IOResult[Set[NodeId]]

/**
* Return node ids associated to the rule (based on expectedreports (the one still pending)) for this Rule,
* only limited on the nodeIds in parameter (used when cache is incomplete)
*/
def findCurrentNodeIdsForDirective(ruleId: DirectiveId, nodeIds: Set[NodeId]): IOResult[Set[NodeId]]
def findCurrentNodeIdsForDirective(ruleId: DirectiveId): IOResult[Set[NodeId]]

/*
* Retrieve the expected reports by config version of the nodes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ package com.normation.rudder.repository

import com.normation.errors.IOResult
import com.normation.inventory.domain.NodeId
import com.normation.rudder.domain.policies.DirectiveId
import com.normation.rudder.domain.policies.RuleId
import com.normation.rudder.domain.reports._
import com.normation.rudder.reports.execution.AgentRun
Expand Down Expand Up @@ -72,7 +73,11 @@ trait ReportsRepository {
* That method doesn't check if there is missing execution in
* the result compared to inputs.
*/
def getExecutionReports(runs: Set[AgentRunId], filterByRules: Set[RuleId]): Box[Map[NodeId, Seq[Reports]]]
def getExecutionReports(
runs: Set[AgentRunId],
filterByRules: Set[RuleId],
filterByDirectives: Set[DirectiveId]
): Box[Map[NodeId, Seq[Reports]]]

/**
* Returns all reports for the node, between the two differents date (optionnal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,30 @@ class FindExpectedReportsJdbcRepository(
}
}

/**
* Return node ids associated to the directive (based on expectedreports (the one still pending)) for this Directive,
* only limited on the nodeIds in parameter (used when cache is incomplete)
*/
override def findCurrentNodeIdsForDirective(directiveId: DirectiveId, nodeIds: Set[NodeId]): IOResult[Set[NodeId]] = {
if (nodeIds.isEmpty) Set.empty[NodeId].succeed
else {
transactIOResult(s"Error when getting nodes for directive '${directiveId.serialize}' from expected reports")(xa => sql"""
select distinct nodeid from nodeconfigurations
where enddate is null and configuration like ${"%" + directiveId.serialize + "%"}
and nodeid in (${nodeIds.map(id => s"'${id}'").mkString(",")})
""".query[NodeId].to[Set].transact(xa))
}
}
override def findCurrentNodeIdsForDirective(directiveId: DirectiveId): IOResult[Set[NodeId]] = {
transactIOResult(s"Error when getting nodes for directive '${directiveId.serialize}' from expected reports")(xa => sql"""
select distinct nodeid from nodeconfigurations
where enddate is null and configuration like ${"%" + directiveId.serialize + "%"}
""".query[NodeId].to[Set].transact(xa))
}
/*
* Retrieve the list of node config ids
*/
override def getNodeConfigIdInfos(nodeIds: Set[NodeId]): Box[Map[NodeId, Option[Vector[NodeConfigIdInfo]]]] = {
override def getNodeConfigIdInfos(nodeIds: Set[NodeId]): Box[Map[NodeId, Option[Vector[NodeConfigIdInfo]]]] = {
if (nodeIds.isEmpty) Full(Map.empty[NodeId, Option[Vector[NodeConfigIdInfo]]])
else {
val batchedNodesId = nodeIds.grouped(jdbcMaxBatchSize).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import com.normation.errors.IOResult
import com.normation.inventory.domain.NodeId
import com.normation.rudder.db.Doobie
import com.normation.rudder.db.Doobie._
import com.normation.rudder.domain.policies.DirectiveId
import com.normation.rudder.domain.policies.RuleId
import com.normation.rudder.domain.reports._
import com.normation.rudder.domain.reports.Reports
Expand Down Expand Up @@ -77,26 +78,23 @@ class ReportsJdbcRepository(doobie: Doobie) extends ReportsRepository with Logga
private[this] val idQuery = þ(s"select id, ${common_reports_column} from ruddersysevents where 1=1 ")

// We assume that this method is called with a limited list of runs
override def getExecutionReports(runs: Set[AgentRunId], filterByRules: Set[RuleId]): Box[Map[NodeId, Seq[Reports]]] = {
if (runs.isEmpty) Full(Map())
else {
val ruleClause = {
if (filterByRules.isEmpty) ""
else s"and ruleid in ${filterByRules.map(_.serialize).mkString("('", "','", "')")}"
}
override def getExecutionReports(
runs: Set[AgentRunId],
filterByRules: Set[RuleId],
filterByDirectives: Set[DirectiveId]
): Box[Map[NodeId, Seq[Reports]]] = {
runs.map(n => (n.nodeId.value, n.date)).toList.toNel match {
case None => Full(Map())
case Some(nodeValues) =>
val ruleClause = filterByRules.toList.toNel.map(r => Fragments.in(fr"ruleid", r))
val directiveClause = filterByDirectives.toList.toNel.map(r => Fragments.in(fr"directiveid", r))
val values = Fragments.in(fr"(nodeid, executiontimestamp)", nodeValues)
val where = Fragments.whereAndOpt(Some(values), ruleClause, directiveClause)

val nodeParam = runs.map(x => s"('${x.nodeId.value}','${new Timestamp(x.date.getMillis)}'::timestamp)").mkString(",")
/*
* be careful in the number of parenthesis for "in values", it is:
* ... in (VALUES ('a', 'b') );
* ... in (VALUES ('a', 'b'), ('c', 'd') );
* etc. No more, no less.
*/
transactRunBox(xa => query[Reports](s"""select ${common_reports_column}
from RudderSysEvents
where (nodeid, executiontimestamp) in (VALUES ${nodeParam})
""" + ruleClause).to[Vector].transact(xa)).map(_.groupBy(_.nodeId)) ?~!
s"Error when trying to get last run reports for ${runs.size} nodes"
val q =
sql"select executiondate, ruleid, directiveid, nodeid, reportid, component, keyvalue, executiontimestamp, eventtype, msg from RudderSysEvents " ++ where
transactRunBox(xa => q.query[Reports].to[Vector].transact(xa)).map(_.groupBy(_.nodeId)) ?~!
s"Error when trying to get last run reports for ${runs.size} nodes"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ package com.normation.rudder.services.reports
import com.normation.errors._
import com.normation.inventory.domain.NodeId
import com.normation.rudder.domain.logger.ReportLoggerPure
import com.normation.rudder.domain.policies.DirectiveId
import com.normation.rudder.domain.policies.RuleId
import com.normation.rudder.domain.reports.NodeAndConfigId
import com.normation.rudder.domain.reports.NodeExpectedReports
Expand Down Expand Up @@ -78,7 +79,8 @@ trait NodeConfigurationService {
* get the nodes applying the rule
*
*/
def findNodesApplyingRule(ruleId: RuleId): IOResult[Set[NodeId]]
def findNodesApplyingRule(ruleId: RuleId): IOResult[Set[NodeId]]
def findNodesApplyingDirective(directiveId: DirectiveId): IOResult[Set[NodeId]]
}

trait NewExpectedReportsAvailableHook {
Expand Down Expand Up @@ -281,6 +283,32 @@ class CachedNodeConfigurationService(
}
}

/**
* get the nodes applying the rule
*
*/
def findNodesApplyingDirective(directiveId: DirectiveId): IOResult[Set[NodeId]] = {
// this don't need to be in a semaphore, since it's only one atomic cache read
for {
nodeConfs <- cache.get
nodesNotInCache = nodeConfs.collect { case (k, value) if (value.isEmpty) => k }.toSet
dataFromCache = {
nodeConfs.collect {
case (k, Some(nodeExpectedReports))
if (nodeExpectedReports.ruleExpectedReports.flatMap(_.directives.map(_.directiveId)).contains(directiveId)) =>
k
}.toSet
}
fromRepo <- if (nodesNotInCache.isEmpty) {
Set.empty[NodeId].succeed
} else { // query the repo
confExpectedRepo.findCurrentNodeIdsForDirective(directiveId, nodesNotInCache)
}
} yield {
dataFromCache ++ fromRepo
}
}

/**
* retrieve expected reports by config version
*/
Expand Down Expand Up @@ -346,4 +374,13 @@ class NodeConfigurationServiceImpl(
def findNodesApplyingRule(ruleId: RuleId): IOResult[Set[NodeId]] = {
confExpectedRepo.findCurrentNodeIds(ruleId).toIO
}

/**
* get the nodes applying the rule
*
*/
def findNodesApplyingDirective(directiveId: DirectiveId): IOResult[Set[NodeId]] = {
confExpectedRepo.findCurrentNodeIdsForDirective(directiveId)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ package com.normation.rudder.services.reports
import com.normation.errors.IOResult
import com.normation.inventory.domain.NodeId
import com.normation.rudder.domain.logger.TimingDebugLogger
import com.normation.rudder.domain.policies.DirectiveId
import com.normation.rudder.domain.policies.RuleId
import com.normation.rudder.domain.reports.ComplianceLevel
import com.normation.rudder.domain.reports.NodeStatusReport
Expand All @@ -57,6 +58,10 @@ trait ReportingService {
* find node status reports for all rules)
*/
def findRuleNodeStatusReports(nodeIds: Set[NodeId], filterByRules: Set[RuleId]): Box[Map[NodeId, NodeStatusReport]]
def findDirectiveNodeStatusReports(
nodeIds: Set[NodeId],
filterByDirectives: Set[DirectiveId]
): Box[Map[NodeId, NodeStatusReport]]

def findUncomputedNodeStatusReports(): Box[Map[NodeId, NodeStatusReport]]

Expand Down Expand Up @@ -103,6 +108,8 @@ trait ReportingService {
*/
def getUserNodeStatusReports(): Box[Map[NodeId, NodeStatusReport]]

def findStatusReportsForDirective(directiveId: DirectiveId): IOResult[Map[NodeId, NodeStatusReport]]

/**
* find node status reports for user and system rules but in a separated couple (system is first element, user second)
*/
Expand Down Expand Up @@ -145,6 +152,25 @@ trait ReportingService {
}.toMap
}

def filterReportsByDirectives(
reports: Map[NodeId, NodeStatusReport],
directiveIds: Set[DirectiveId]
): Map[NodeId, NodeStatusReport] = {
if (directiveIds.isEmpty) {
reports
} else
{
val n1 = System.currentTimeMillis
val result = reports.view.mapValues {
case status =>
NodeStatusReport.filterByDirectives(status, directiveIds)
}.filter { case (_, v) => v.reports.nonEmpty || v.overrides.nonEmpty }
val n2 = System.currentTimeMillis
TimingDebugLogger.trace(s"Filter Node Status Reports on ${directiveIds.size} Directives in : ${n2 - n1}ms")
result
}.toMap
}

def complianceByRules(report: NodeStatusReport, ruleIds: Set[RuleId]): ComplianceLevel = {
if (ruleIds.isEmpty) {
report.compliance
Expand All @@ -158,4 +184,22 @@ trait ReportingService {
})
}
}

def complianceByRulesByDirectives(
report: NodeStatusReport,
ruleIds: Set[RuleId],
directiveIds: Set[DirectiveId]
): ComplianceLevel = {
if (ruleIds.isEmpty) {
report.compliance
} else {
// compute compliance only for the selected rules
// BE CAREFUL: reports is a SET - and it's likely that
// some compliance will be equals. So change to seq.
ComplianceLevel.sum(report.reports.toSeq.collect {
case report if (ruleIds.contains(report.ruleId)) =>
report.compliance
})
}
}
}
Loading

0 comments on commit 2efc650

Please sign in to comment.