Skip to content

Commit

Permalink
Work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
fanf committed Mar 29, 2018
1 parent fd5b98f commit f12ddd4
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class UpdateDynamicGroups(
dynGroupService.getAllDynGroups match {
case Full(groupIds) =>
updateId = updateId + 1
LAUpdateDyngroup ! StartDynamicUpdate(updateId, ModificationId(uuidGen.newUuid), DateTime.now, GroupsToUpdate(groupIds))
LAUpdateDyngroup ! StartDynamicUpdate(updateId, ModificationId(uuidGen.newUuid), DateTime.now, GroupsToUpdate(groupIds.map(_.id)))
case e:EmptyBox =>
val error = (e?~! "Error when trying to get the list of dynamic group to update")
logger.error( error.messageChain )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,18 @@ object ScheduledJobLogger extends Logger {
override protected def _logger = LoggerFactory.getLogger("scheduledJob")
}


/**
* A logger for new nodes informations
*/
object NodeLogger extends Logger {
override protected def _logger = LoggerFactory.getLogger("nodes")
object PendingNode extends Logger {
// the logger for information about pending nodes (accept/refuse)
override protected def _logger = LoggerFactory.getLogger("nodes.pending")
// the logger for info about what policies will be applied to the new node
object Policies extends Logger {
override protected def _logger = LoggerFactory.getLogger("nodes.pending.policies")
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@
package com.normation.rudder.services.queries

import com.normation.inventory.domain.NodeId
import com.normation.rudder.domain.nodes.{NodeGroup,NodeGroupId}
import com.normation.rudder.domain.nodes.{NodeGroup, NodeGroupId}
import com.normation.ldap.sdk._
import BuildFilter._
import com.normation.rudder.domain.{RudderDit,RudderLDAPConstants}
import com.normation.rudder.domain.{RudderDit, RudderLDAPConstants}
import RudderLDAPConstants._
import com.normation.utils.Control.sequence
import com.normation.inventory.ldap.core.LDAPConstants
import com.normation.rudder.domain.logger.NodeLogger
import com.normation.rudder.domain.queries.CriterionLine
import com.normation.rudder.domain.queries.Equals
import com.normation.rudder.domain.queries.Query
import com.normation.rudder.repository.ldap.LDAPEntityMapper
import net.liftweb.common._

Expand All @@ -58,25 +62,14 @@ trait DynGroupService {
/**
* Retrieve the list of all dynamic groups.
*/
def getAllDynGroups() : Box[Seq[NodeGroupId]]

/**
* For each node in the list, find
* the list of dynamic group they belongs to.
*
* A node ID which does not belong to any dyn group
* won't be in the resulting map.
*/
def findDynGroups(nodeIds:Seq[NodeId]) : Box[Map[NodeId,Seq[NodeGroupId]]]

def getAllDynGroups(): Box[Seq[NodeGroup]]
}

class DynGroupServiceImpl(
rudderDit: RudderDit,
ldap:LDAPConnectionProvider[RoLDAPConnection],
mapper:LDAPEntityMapper,
queryChecker: PendingNodesLDAPQueryChecker
) extends DynGroupService with Loggable {
) extends DynGroupService {

/**
* Get all dyn groups
Expand All @@ -94,7 +87,7 @@ class DynGroupServiceImpl(
*/
private[this] def dynGroupAttrs = (LDAPConstants.OC(OC_RUDDER_NODE_GROUP).attributes - LDAPConstants.A_NODE_UUID).toSeq

override def getAllDynGroups() : Box[Seq[NodeGroupId]] = {
override def getAllDynGroups() : Box[Seq[NodeGroup]] = {
for {
con <- ldap
dyngroupIds <- sequence(con.searchSub(rudderDit.GROUP.dn, dynGroupFilter, dynGroupAttrs:_*)) { entry =>
Expand All @@ -105,57 +98,187 @@ class DynGroupServiceImpl(
// This does not treat all cases (what happens when you have a group depending on a group which also depends on another group content)
// We will sort by number of group queries we have in our group (the more group we depend on, the more we want to update it last)
def numberOfQuery(group : NodeGroup) = group.query.map( _.criteria.filter(_.objectType.objectType == "group").size).getOrElse(0)
dyngroupIds.sortBy(numberOfQuery).map(_.id)
dyngroupIds.sortBy(numberOfQuery)
}
}
}

/**
* A service that check if nodes are in dynamique groups
*/
class CheckPendingNodeInDynGroups(
queryChecker: QueryChecker
) {
/**
* Default algorithm, not expected to be performant:
* - get all dynamic groups
* - for each query, test if the groups match or not.
* For that, use the LdapQueryProcessor on "pending"
* branch, limiting queries to the argument list of nodes
* For each node in the list, find
* the list of dynamic group they belongs to from the parameter.
*
* If any error is encountered during the sequence of tests,
* the whole process is in error.
* A node ID which does not belong to any dyn group
* won't be in the resulting map.
*/
override def findDynGroups(nodeIds:Seq[NodeId]) : Box[Map[NodeId,Seq[NodeGroupId]]] = {
def findDynGroups(nodeIds: Set[NodeId], groups: List[NodeGroup]): Box[Map[NodeId, Seq[NodeGroupId]]] = {

for {
con <- ldap
dyngroups <- sequence(con.searchSub(rudderDit.GROUP.dn, dynGroupFilter, dynGroupAttrs:_*)) { entry =>
mapper.entry2NodeGroup(entry) ?~! "Can not map entry to a node group: %s".format(entry)
mapGroupAndNodes <- processDynGroups(groups, nodeIds) ?~! s"Can not find dynamic groups for nodes: '${nodeIds.map(_.value).mkString("','")}'"
} yield {
swapMap(mapGroupAndNodes)
}
}

/**
* Given a list of dynamique groups and a list of nodes, find which groups contains which
* nodes.
* This method proceed recursively, starting by dealing with nodes without any other group
* dependency (ie: group of groups are set aside), and then continuing until everything
* is done. See https://www.rudder-project.org/redmine/issues/12060#note-6 for
* algo detail in image.
*
* Raw behavior: 3 queues: TODO, BLOCKED, DONE
* All dyn groups start in TODO with the list of nodeIds and their query splitted in two parts:
* - a list of depandent groups
* - simple query criterions
*
* Init: all groups with at least one dependency go to BLOCKED
* Then iterativelly proceed groups in TODO (until TODO empty) so that for each group G:
* - compute nodes in G
* - put G in DONE
* - find all node in BLOCKED with a dependency toward G, and for each of these H group, do:
* - union or intersect G nodes with H groups (depending of the H composition kind)
* - remove G as dependency from H
* - if H has no more dependencies, put it back at the end of TODO
* When TODO is empty, do:
* - for each remaining groups in BLOCKED, act as if there dependencies have 0 nodes (and => empty, or => keep
* only simple query part)
* - put empty group into DONE, and non-empty one into TODO
* - process TODO
*
* And done ! :)
*/
def processDynGroups(groups: List[NodeGroup], nodeIds: Set[NodeId]): Box[List[(NodeGroupId, Set[NodeId])]] = {
// a data structure to keep a group ID, set of nodes, dependencies, query and composition/
// the query does not contain group anymore.

final case class DynGroup(id: NodeGroupId, dependencies: Set[NodeGroupId], testNodes: Set[NodeId], query: Query, includeNodes: Set[NodeId])

NodeLogger.PendingNode.Policies.debug(s"Checking dyn-groups belonging for nodes [${nodeIds.map(_.value).mkString(", ")}]:${groups.map(g => s"${g.id.value}: ${g.name}").sorted.mkString("{", "}{", "}")}")

// for debuging message
implicit class DynGroupsToString(gs: List[(NodeGroupId, Set[NodeId])]) {
def show: String = gs.map { case (id, nodes) =>
id.value + ":" + nodes.map(_.value).mkString(",")
}.mkString("[", "][", "]")
}
implicit class ResToString(gs: List[DynGroup]) {
def show: String = gs.map { case DynGroup(id, dep, nodes, q, inc) =>
id.value + ":" + nodes.size + "{"+dep.map(_.value).mkString(",")+"}"
}.mkString("[", "][", "]")
}

/*
* one step of the algo
*/
def recProcess(todo: List[DynGroup], blocked: List[DynGroup], done: List[(NodeGroupId, Set[NodeId])]): Box[List[(NodeGroupId, Set[NodeId])]] = {
import com.normation.rudder.domain.queries.{ And => CAnd}

NodeLogger.PendingNode.Policies.trace("TODO :" + todo.show )
NodeLogger.PendingNode.Policies.trace("BLOCKED:" + blocked.show)
NodeLogger.PendingNode.Policies.trace("DONE :" + done.show )

(todo, blocked, done) match {

case (Nil, Nil, res) => // termination condition
NodeLogger.PendingNode.Policies.trace("==> end")
Full(res)

case (Nil, b, res) => // end of main phase: zero-ïze group dependencies in b and put them back in the other two queues
NodeLogger.PendingNode.Policies.trace("==> unblock things")
val (newTodo, newRes) = ( (List.empty[DynGroup], res) /: b) { case ( (t, r), next ) =>
if(next.query.composition == CAnd) { // the group has zero node b/c intersect with 0 => new result
NodeLogger.PendingNode.Policies.trace(" -> evicting " + next.id.value)
(t, (next.id, Set.empty[NodeId])::r )
} else { // we can just ignore the dependencies and proceed the remaining group as a normal dyn group
NodeLogger.PendingNode.Policies.trace(" -> process back" + next.id.value)
(next :: t, r)
}
}
// start back the process with empty BLOCKED
recProcess(newTodo, Nil, newRes)


case (h::tail, b, res) => // standard step: takes the group and deals with it
NodeLogger.PendingNode.Policies.trace("==> process " + h.id.value)
(queryChecker.check(h.query, h.testNodes.toSeq).flatMap { nIds =>
// node matching that group - also include the one from "include" coming from "or" dep
val setNodeIds = nIds.toSet ++ h.includeNodes
// get blocked group with h as a dep
val (withDep, stillBlocked) = b.partition( _.dependencies.contains(h.id) )
// for each node with that dep: intersect or union nodeids, remove dep
val newTodos = withDep.map { case DynGroup(id, dependencies, nodes, query, inc) =>
val (newNodeIds, newInc) = if(query.composition == CAnd) {
(nodes.intersect(setNodeIds), inc)
} else {
(nodes, setNodeIds ++ inc)
}
val newDep = dependencies - h.id
DynGroup(id, newDep, newNodeIds, query, newInc)
}
NodeLogger.PendingNode.Policies.trace(" -> unblock: " + newTodos.map(_.id.value).mkString(", "))
recProcess(tail ::: newTodos, stillBlocked, (h.id, setNodeIds) :: res)
}) ?~! s"Error when trying to find what nodes belong to dynamic group ${h.id}"
}
//now, for each test the query
mapGroupAndNodes <- sequence(dyngroups) { g =>
(for {
matchedIds <- g match {
case NodeGroup(id, _, _, Some(query), true, _, _, _) =>
queryChecker.check(query,nodeIds)
case g => { //what ?
logger.error("Found a group without a query or not dynamic: %s".format(g))
Full(Seq())
}

// init: transform NodeGroups into DynGroup.
// group without query are filtered out (they will have 0 node so that does not change
// anything if a group had a dep towards one of them)
val dynGroups = groups.flatMap { g =>
if(g._isEnabled) {
g.query.map { query =>
// partition group query into Subgroup / simple criterion
val (dep, criteria) = ( (Set.empty[NodeGroupId], List.empty[CriterionLine])/: query.criteria) { case ( (g, q), next) =>
if(next.objectType.objectType == "group") { // it's a dependency
// we only know how to process the comparator "exact string match" for group
next.comparator match {
case Equals =>
( g + NodeGroupId(next.value), q)
case _ =>
NodeLogger.PendingNode.Policies.warn("Warning: group criteria use something else than exact string match comparator: " + next)
(g, q)
}
} else { // keep that criterium
(g, next :: q)
}
}
} yield {
(g.id, matchedIds)
}) ?~! "Error when trying to find what nodes belong to dynamic group %s".format(g)
DynGroup(g.id, dep, nodeIds, query.copy(criteria = criteria), Set())
}
} else {
None
}
} yield {
swapMap(mapGroupAndNodes)
}


// partition nodes with dependencies / node without:
val (nodep, withdep) = dynGroups.partition( _.dependencies.isEmpty )

// start the process ! End at the end, transform the result into a map.
val res = recProcess(nodep, withdep, Nil)
// end result
res.foreach(r => NodeLogger.PendingNode.Policies.debug("Result: " + r.show) )
res
}


/**
* Transform the map of (groupid => seq(nodeids) into a map of
* (nodeid => seq(groupids)
*/
private[this] def swapMap(source:Seq[(NodeGroupId,Seq[NodeId])]) : Map[NodeId,Seq[NodeGroupId]] = {
private[this] def swapMap(source:Seq[(NodeGroupId, Set[NodeId])]) : Map[NodeId, Seq[NodeGroupId]] = {
val dest = scala.collection.mutable.Map[NodeId,List[NodeGroupId]]()
for {
(gid, seqNodeIds) <- source
nodeId <- seqNodeIds
nodeId <- seqNodeIds
} {
dest(nodeId) = gid :: dest.getOrElse(nodeId,Nil)
dest(nodeId) = gid :: dest.getOrElse(nodeId, Nil)
}
dest.toMap
}
Expand Down
Loading

0 comments on commit f12ddd4

Please sign in to comment.