Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #19151: add caching for NodeExpectedReports #3728

Conversation

ncharles
Copy link
Member

@ncharles ncharles commented Jul 26, 2021

https://issues.rudder.io/issues/19151

This PR adds a cache for NodeExpectedReports. This cache works as a map of [NodeId, Option]. The cache is inited with all nodes, and None for each NodeExpectedReports.
Then once we search for current expected reports, cache is explored, and fetched expected reports are added to the cache.
When a policy generation happens, it also update the current expected reports for modified nodes

The rational is that it is really expensive to fetch these from the database, and keeping them in the cache does improve perfs.

@ncharles
Copy link
Member Author

PR updated with a new commit

*/
def getCurrentExpectedReports(nodeIds: Set[NodeId]): Box[Map[NodeId, Option[NodeExpectedReports]]] = {
// First, get all nodes from cache (even the none)
val dataFromCache = cache.filter{ case (nodeId, _) => nodeIds.contains(nodeId) }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm unsure if this should be in a semaphore or not

@@ -96,6 +97,7 @@ object CacheComplianceQueueAction {
final case class RemoveNodeInCache (nodeId: NodeId ) extends CacheComplianceQueueAction
final case class InitializeCompliance (nodeId: NodeId, nodeCompliance: Option[NodeStatusReport]) extends CacheComplianceQueueAction // do we need this?
final case class UpdateCompliance (nodeId: NodeId, nodeCompliance: NodeStatusReport ) extends CacheComplianceQueueAction
final case class InitNodeConfiguration (nodeId: NodeId, nodeConfiguration: NodeExpectedReports ) // unsure about keeping this one
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's unused, i' pretty sure i should not keep it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clean it, it's cognitively more costly to have something and wonder why it's there than needing to add it if needed later ont

, val batchSize : Int
, val complianceRepository : ComplianceRepository
) extends ReportingService with RuleOrNodeReportingServiceImpl with CachedFindRuleNodeStatusReports {
val confExpectedRepo = defaultFindRuleNodeStatusReports.confExpectedRepo
val directivesRepo = defaultFindRuleNodeStatusReports.directivesRepo
val rulesRepo = defaultFindRuleNodeStatusReports.rulesRepo

val nodeConfigrationService = nodeConfigService
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THIS is bothering me. i need to register the nodeConfigrationService because it in the CachedFindRuleNodeStatusReports

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps you need to create a facade/new trait on top of FindExpectedReportRepository and NodeConfigurationService to only expose what is needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea !

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, it's never used, so I removed it

@ncharles
Copy link
Member Author

PR updated with a new commit

@ncharles ncharles force-pushed the arch_19151/add_caching_for_nodeexpectedreports branch from 2adedf5 to 6122d3b Compare July 26, 2021 20:03
*/
def findNodeExpectedReports(
nodeConfigIds: Set[NodeAndConfigId]
): Box[Map[NodeAndConfigId, Option[NodeExpectedReports]]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you return IOResult in place of Box? We try to get ride of them, andIOResult` compose better

* Get all nodes id
*/

def getAllNodeIds(): Box[Set[NodeId]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer use of IOResult, Box is deprecated in rudder

cached.init().runNow
cached
}

/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to completly hide findExpectedRepo from public use with a facade that delegate to either it or cache ? We would be sure to never call the bad service

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is a private[this] - i'm unsure how to do better than that (unless putting a big comment around it to not use it)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or it is used twice, once per cache, so I could instanciate one per cache, overhead should be minimal

Copy link
Member

@fanf fanf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the general logic is ok. Please, limit the use of Box to compat (and if possible, remove some more of them). I'm don't understand fully enought all the usage to be sure it's sound. If you can, you should hide from public use (ie, in RudderConfig) the non-cached version of the feature, to avoid people (in plugins etc, for ex) to not take benefits of them

@ncharles
Copy link
Member Author

PR updated with a new commit

case delete: RemoveNodeInCache => IOResult.effectNonBlocking { cache = cache.removed(delete.nodeId) }
case update: UpdateNodeConfiguration => IOResult.effectNonBlocking { cache = cache + (update.nodeId -> Some(update.nodeConfiguration)) }
case something =>
Inconsistency(s"NodeConfiguration service cache received unknown command : ${something}").fail
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to try to make a subtype for action to be sure that that property holds

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works (with some boilerplate), so we can make it checkable: ncharles#10

// * if the nodeId is in cache, and value is None, augment cache
// * if the nodeid is in cache, and value is not None, pick the most recent value to augment cache
// Returns the values merged with cache
private[this] def mergeDataFromDBWithCache(fromDb: Map[NodeId, Option[NodeExpectedReports]]):IOResult[ Map[NodeId, Option[NodeExpectedReports]] ] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that that function should be split appart from the "getCurrentExpectedReports" part. It makes harder to see what need to be atomic, and it hides the fact that nodes "fromDb" should all have beed not initialized in cache (since it's called for that reason).
Let me try to merge them

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it needs to be appart. It'll be probably used in other methods

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but for now, it is not, and it hads substential complexity.

semaphore.withPermit(
IOResult.effect({
val mergedEntries = fromDb.map { case (nodeId, expFromDb) => cache.get(nodeId) match {
case None => None // nodeid not in cache, dropping
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example, here I don't understand how the nodeId could be not in cache, since we restricted the call to fromDb to only node in cache but not initialized

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding/removing nodes can happen at the same time
also, a node may exist without compliance (ignored, or policy not yet generated)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the case without compliance, it's ok: it was none, it's still none after.
For the case "adding/removing nodes can happen at the same time", I don't understand: here, we are just augmenting the cache knwon entries, we don't remove entries. And there's a semaphore to avoid side effects from performAction

case Some(entry) => // now we must compare with the one from DB
expFromDb match {
case None => Some((nodeId -> value))
case Some(db) if db.beginDate.isAfter(entry.beginDate) => Some((nodeId -> expFromDb)) // data fom db is newer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here, only node with "none" in cache were called for db, so that should not happen

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query can take several seconds, cache might have been updated in between with a fresh nodeconfiguration pushed by policy generation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an alternative proposal: ncharles#10
Cache should not be updated in between, that doesn't make sense to look for empty nodeIds and then consider that that data is not the truth - or it was not usefull to check for that at first. I can't see how the DB query can't be in the semaphore if you need consistency (for ex, if you don't want to miss the fact that a cache reset was done in between).

@ncharles
Copy link
Member Author

PR updated with a new commit

@ncharles ncharles force-pushed the arch_19151/add_caching_for_nodeexpectedreports branch from f33a730 to 07043a1 Compare July 29, 2021 20:02
// In a semaphore, nothing should change the cache
semaphore.withPermit(for {
timeInSemaphore <- currentTimeMillis
_ = logger.trace(s"Entered the semaphore after ${timeInSemaphore - before_semaphoreTime} ms")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it needs to be <- logger ...

@ncharles
Copy link
Member Author

PR updated with a new commit

@ncharles
Copy link
Member Author

PR updated with a new commit

@Normation-Quality-Assistant
Copy link
Contributor

OK, squash merging this PR

fixup! Fixes #19151: add caching for NodeExpectedReports

Fixes #19151: add caching for NodeExpectedReports

fixup! fixup! Fixes #19151: add caching for NodeExpectedReports

Fixes #19151: add caching for NodeExpectedReports

fixup! fixup! fixup! Fixes #19151: add caching for NodeExpectedReports

Fixes #19151: add caching for NodeExpectedReports

Proposal for update in nodeconfigcache

split cache action to allows exhaustive checks

fixup! Merge pull request #10 from fanf/pr/3728

Fixes #19151: add caching for NodeExpectedReports
@Normation-Quality-Assistant Normation-Quality-Assistant force-pushed the arch_19151/add_caching_for_nodeexpectedreports branch from 7fb6313 to ea691a4 Compare July 29, 2021 20:56
@Normation-Quality-Assistant Normation-Quality-Assistant merged commit ea691a4 into Normation:master Jul 29, 2021
@@ -180,19 +182,19 @@ class RuleGrid(
val start = System.currentTimeMillis

( for {
rules <- roRuleRepository.getAll(false).toBox.map { allRules => onlyRules match {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

override def invalidateComplianceCache(actions: Seq[(NodeId, CacheComplianceQueueAction)]): Unit = {
complianceCache.invalidateWithAction(actions).runNow
override def invalidateComplianceCache(actions: Seq[(NodeId, CacheExpectedReportAction)]): IOResult[Unit] = {
cachedNodeConfigurationService.invalidateWithAction(actions) *>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is probably not doing what we want

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants