Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #15675: Leak in Cache of Node Compliance and NodeInfo and perfs improvement #2648

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
3af575c
Fixes #15675: Leak in Cache of Node Compliance and NodeInfo and perfs…
ncharles Dec 3, 2019
e6aeb1a
fixup! fixes #15675: fix memory usage
ncharles Dec 3, 2019
f80eee0
fixup! fixup! fixes #15675: fix memory usage
ncharles Dec 3, 2019
32ecee9
fixup
ncharles Dec 9, 2019
b5e4710
fixup
ncharles Dec 9, 2019
9b35e71
Try to prevent locking in yourkit
ncharles Dec 13, 2019
9598aaa
memory usage computation
ncharles Dec 13, 2019
38611b3
Revert "memory usage computation"
ncharles Dec 13, 2019
b530809
adding memory usage computation
ncharles Dec 14, 2019
f70ce20
allow instrumentaton
ncharles Dec 15, 2019
3ed9ec0
exclude jar to allow instrumentation to load
ncharles Dec 15, 2019
87c09cf
adding measure pour nodecontexts
ncharles Dec 16, 2019
78cdb29
originalVars is unused
ncharles Dec 16, 2019
74e664f
measuring policies
ncharles Dec 16, 2019
312da22
test wrapping inside for yield
ncharles Dec 18, 2019
1f92658
typo
ncharles Dec 18, 2019
16eafe2
remove unessary call to trim
ncharles Dec 18, 2019
18af296
more for yield, it's like violence
ncharles Dec 20, 2019
c56c638
lower number of created objects + fasten compliance computation
ncharles Dec 20, 2019
9bce1c0
try to help hotpot by adding all variables at once
ncharles Dec 22, 2019
9e2c74e
try to help hotpot by adding all variables at once - edge case
ncharles Dec 22, 2019
e916e3e
for yield, for yield for everyone, for you, for them
ncharles Dec 22, 2019
962c7a0
convert val to def to save memory and measure impact
ncharles Dec 30, 2019
78eab1e
randomize templates to write
ncharles Dec 31, 2019
36b0eea
adding timing info
ncharles Jan 1, 2020
e3001fa
aggregate timings
ncharles Jan 1, 2020
535e32e
improve timing
ncharles Jan 1, 2020
2001015
fix type
ncharles Jan 1, 2020
97c9c0b
fix type2
ncharles Jan 1, 2020
fbfcc9c
fix type2
ncharles Jan 1, 2020
355fbeb
fix compilation
ncharles Jan 4, 2020
9176763
remove some mapValues
ncharles Jan 6, 2020
7fe1bc6
Revert "remove some mapValues"
ncharles Jan 9, 2020
a0c1757
removing extra map of nodeconfiguration in the hope to save memory
ncharles Jan 9, 2020
5a9d3ea
attempt to lower pressure on memory usage
ncharles Jan 11, 2020
6225312
fix type
ncharles Jan 11, 2020
e680f67
fix type
ncharles Jan 11, 2020
33f4ca6
clean some usage
ncharles Jan 12, 2020
96ceafb
correct type
ncharles Jan 12, 2020
a93b67e
test to remove spike
ncharles Jan 12, 2020
236a035
try to lower memory pressur whil fetching reports
ncharles Jan 13, 2020
9ff671a
fix type
ncharles Jan 13, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion webapp/sources/rudder/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@ along with Rudder. If not, see <http://www.gnu.org/licenses/>.
<!-- fill-in rule templates -->
</modules>

<dependencies/>
<dependencies>
<dependency>
<groupId>com.github.jbellis</groupId>
<artifactId>jamm</artifactId>
<version>0.3.3</version>
<scope>provided</scope>
</dependency>
</dependencies>

<repositories>
<repository>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ object ComplianceLevel {
private def pc_for(i:Int, total:Int) : Double = if(total == 0) 0 else (i * 100 / BigDecimal(total)).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble


def compute(reports: Iterable[ReportType]): ComplianceLevel = {
def compute2(reports: Iterable[ReportType]): ComplianceLevel = {
import ReportType._
if(reports.isEmpty) { ComplianceLevel(notApplicable = 1)}
else reports.foldLeft(ComplianceLevel()) { case (compliance, report) =>
Expand All @@ -167,10 +167,122 @@ object ComplianceLevel {
}
}

def sum(compliances: Iterable[ComplianceLevel]): ComplianceLevel = {
def compute(reports: Iterable[ReportType]): ComplianceLevel = {
import ReportType._
if(reports.isEmpty) {
ComplianceLevel(notApplicable = 1)
} else {
var notApplicable = 0
var success = 0
var repaired = 0
var error = 0
var unexpected = 0
var missing = 0
var noAnswer = 0
var pending = 0
var reportsDisabled = 0
var compliant = 0
var auditNotApplicable = 0
var nonCompliant = 0
var auditError = 0
var badPolicyMode = 0

reports.foreach { report =>
report match {
case EnforceNotApplicable => notApplicable += 1
case EnforceSuccess => success += 1
case EnforceRepaired => repaired += 1
case EnforceError => error += 1
case Unexpected => unexpected += 1
case Missing => missing += 1
case NoAnswer => noAnswer += 1
case Pending => pending += 1
case Disabled => reportsDisabled += 1
case AuditCompliant => compliant += 1
case AuditNotApplicable => auditNotApplicable += 1
case AuditNonCompliant => nonCompliant += 1
case AuditError => auditError += 1
case BadPolicyMode => badPolicyMode += 1
}
}
ComplianceLevel(
pending = pending
, success = success
, repaired = repaired
, error =error
, unexpected =unexpected
, missing = missing
, noAnswer=noAnswer
, notApplicable=notApplicable
, reportsDisabled=reportsDisabled
, compliant = compliant
, auditNotApplicable= auditNotApplicable
, nonCompliant =nonCompliant
, auditError = auditError
, badPolicyMode =badPolicyMode
)
}
}

def sum2(compliances: Iterable[ComplianceLevel]): ComplianceLevel = {
if(compliances.isEmpty) ComplianceLevel()
else compliances.reduce( _ + _)
}

def sum(compliances: Iterable[ComplianceLevel]): ComplianceLevel = {
if (compliances.isEmpty) {
ComplianceLevel()
} else {
var pending: Int = 0
var success: Int = 0
var repaired: Int = 0
var error: Int = 0
var unexpected: Int = 0
var missing: Int = 0
var noAnswer: Int = 0
var notApplicable: Int = 0
var reportsDisabled: Int = 0
var compliant: Int = 0
var auditNotApplicable: Int = 0
var nonCompliant: Int = 0
var auditError: Int = 0
var badPolicyMode: Int = 0


compliances.foreach { compliance =>
pending += compliance.pending
success += compliance.success
repaired += compliance.repaired
error += compliance.error
unexpected += compliance.unexpected
missing += compliance.missing
noAnswer += compliance.noAnswer
notApplicable += compliance.notApplicable
reportsDisabled += compliance.reportsDisabled
compliant += compliance.compliant
auditNotApplicable += compliance.auditNotApplicable
nonCompliant += compliance.nonCompliant
auditError += compliance.auditError
badPolicyMode += compliance.badPolicyMode
}
ComplianceLevel(
pending = pending
, success = success
, repaired = repaired
, error = error
, unexpected = unexpected
, missing = missing
, noAnswer = noAnswer
, notApplicable = notApplicable
, reportsDisabled = reportsDisabled
, compliant = compliant
, auditNotApplicable = auditNotApplicable
, nonCompliant = nonCompliant
, auditError = auditError
, badPolicyMode = badPolicyMode
)
}
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ final class NodeStatusReport private (

object NodeStatusReport {
def apply(nodeId: NodeId, runInfo: RunAndConfigInfo, statusInfo: RunComplianceInfo, overrides : List[OverridenPolicy], reports: Iterable[RuleNodeStatusReport]) = {
new NodeStatusReport(nodeId, runInfo, statusInfo, overrides, AggregatedStatusReport(reports.toSet.filter( _.nodeId == nodeId)))
new NodeStatusReport(nodeId, runInfo, statusInfo, overrides, AggregatedStatusReport(reports.filter( _.nodeId == nodeId).toSet))
}

// To use when you are sure that all reports are indeed for the designated node.
Expand Down Expand Up @@ -200,11 +200,11 @@ final case class RuleNodeStatusReport(
| ${directives.values.toSeq.sortBy( _.directiveId.value ).map { x => s"${x}" }.mkString("\n ")}]
|""".stripMargin('|')


/*
def getValues(predicate: ComponentValueStatusReport => Boolean): Seq[(DirectiveId, String, ComponentValueStatusReport)] = {
directives.values.flatMap( _.getValues(predicate)).toSeq
}

*/
def withFilteredElements(
directive: DirectiveStatusReport => Boolean
, component: ComponentStatusReport => Boolean
Expand All @@ -226,7 +226,7 @@ object RuleNodeStatusReport {
val newDirectives = DirectiveStatusReport.merge(reports.flatMap( _.directives.values))

//the merge of two reports expire when the first one expire
val expire = new DateTime( reports.map( _.expirationDate.getMillis).min )
val expire = new DateTime(reports.minBy(_.expirationDate.getMillis).expirationDate.getMillis)
(id, RuleNodeStatusReport(id._1, id._2, id._3, id._4, newDirectives, expire))
}.toMap
}
Expand All @@ -239,7 +239,7 @@ final case class DirectiveStatusReport(
) extends StatusReport {
override lazy val compliance = ComplianceLevel.sum(components.map(_._2.compliance) )
def getValues(predicate: ComponentValueStatusReport => Boolean): Seq[(DirectiveId, String, ComponentValueStatusReport)] = {
components.values.flatMap( _.getValues(predicate) ).toSeq.map { case(s,v) => (directiveId,s,v) }
components.values.view.flatMap( _.getValues(predicate) ).map { case(s,v) => (directiveId,s,v) }.force.toSeq
}

override def toString() = s"""[${directiveId.value} =>
Expand Down Expand Up @@ -288,7 +288,7 @@ final case class ComponentStatusReport(
* Get all values matching the predicate
*/
def getValues(predicate: ComponentValueStatusReport => Boolean): Seq[(String, ComponentValueStatusReport)] = {
componentValues.values.filter(predicate(_)).toSeq.map(x => (componentName, x))
componentValues.values.view.filter(predicate(_)).map(x => (componentName, x)).force.toSeq
}

/*
Expand All @@ -307,7 +307,7 @@ object ComponentStatusReport extends Loggable {
components.groupBy( _.componentName).map { case (cptName, reports) =>
val newValues = ComponentValueStatusReport.merge(reports.flatMap( _.componentValues.values))
(cptName, ComponentStatusReport(cptName, newValues))
}.toMap
}
}
}

Expand Down Expand Up @@ -349,7 +349,7 @@ object ComponentValueStatusReport extends Loggable {
)

}
pairs.toMap
pairs
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class CachedReportsExecutionRepository(
cache = Map()
}

override def getNodesLastRun(nodeIds: Set[NodeId]): Box[Map[NodeId, Option[AgentRunWithNodeConfig]]] = this.synchronized {
override def getNodesLastRun(nodeIds: Set[NodeId]): Box[Map[NodeId, Option[AgentRunWithNodeConfig]]] = scala.concurrent.blocking { this.synchronized {
val n1 = System.currentTimeMillis
(for {
runs <- readBackend.getNodesLastRun(nodeIds.diff(cache.keySet))
Expand All @@ -127,7 +127,7 @@ class CachedReportsExecutionRepository(
cache = cache ++ runs
cache.filterKeys { x => nodeIds.contains(x) }
}) ?~! s"Error when trying to update the cache of Agent Runs informations"
}
} }

override def updateExecutions(executions : Seq[AgentRun]) : Seq[Box[AgentRun]] = this.synchronized {
logger.trace(s"Update runs for nodes [${executions.map( _.agentRunId.nodeId.value ).mkString(", ")}]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ class RoReportsExecutionRepositoryImpl (
(run.agentRunId.nodeId, AgentRunWithNodeConfig(run.agentRunId, config, run.isCompleted, run.insertionId))
}).toMap
ids.map(id => (id, runsMap.get(id))).toMap

}

query.transact(xa).attempt.unsafeRunSync.box
}
}.map(_.flatten.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ final case class Policy(
// == map .values (keep order) ==> Iterator[List[Variable]]
// == .toList (keep order) ==> List[List[Variable]]
// == flatten (keep order) ==> List[Variable]
val expandedVars = Policy.mergeVars(policyVars.map( _.expandedVars.values).toList.flatten)
val originalVars = Policy.mergeVars(policyVars.map( _.originalVars.values).toList.flatten)
def expandedVars = Policy.mergeVars(policyVars.map( _.expandedVars.values).toList.flatten)
//val originalVars = Policy.mergeVars(policyVars.map( _.originalVars.values).toList.flatten)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one seems to have a lot of impact
Size deepof nodeConfigs is 2 668 075 072
Size deepof ruleVals is 376 459 952

vs
Size deepof nodeConfigs is 3 474 356 896
Size deepof ruleVals is 748 776 080

but we need to validate as there is a lot of variability in ruleval (from 748 487 288 to 2 123 635 544)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nodeconfig is stable at about 2 668 075 048 with this line vs 3 474 356 896 without this line

val trackerVariable = policyVars.head.trackerVariable.spec.cloneSetMultivalued.toVariable(policyVars.map(_.trackerVariable.values).toList.flatten)
}

Expand All @@ -382,9 +382,9 @@ final object Policy {
path.replace(TAG_OF_RUDDER_MULTI_POLICY, p.id.getRudderUniqueId)
}

def withParams(p:Policy) : String = {
s"${p.technique.id.name.value}(${p.expandedVars.values.map(_.values.headOption.getOrElse(""))})"
}
//def withParams(p:Policy) : String = {
// s"${p.technique.id.name.value}(${p.expandedVars.values.map(_.values.headOption.getOrElse(""))})"
//}

/*
* merge an ordered seq of variables.
Expand Down
Loading