Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #15768: Make inventory processing less memory consuming #2474

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -121,10 +121,10 @@ class FusionReportEndpoint(
case Full(status) =>
import com.normation.inventory.provisioning.endpoint.StatusLog.LogMessage
status match {
case InventoryProcessStatus.MissingSignature(_) => new ResponseEntity(status.msg, HttpStatus.UNAUTHORIZED)
case InventoryProcessStatus.SignatureInvalid(_) => new ResponseEntity(status.msg, HttpStatus.UNAUTHORIZED)
case InventoryProcessStatus.QueueFull(_) => new ResponseEntity(status.msg, HttpStatus.SERVICE_UNAVAILABLE)
case InventoryProcessStatus.Accepted(_) => new ResponseEntity(status.msg, HttpStatus.ACCEPTED)
case InventoryProcessStatus.MissingSignature(_,_) => new ResponseEntity(status.msg, HttpStatus.UNAUTHORIZED)
case InventoryProcessStatus.SignatureInvalid(_,_) => new ResponseEntity(status.msg, HttpStatus.UNAUTHORIZED)
case InventoryProcessStatus.QueueFull(_,_) => new ResponseEntity(status.msg, HttpStatus.SERVICE_UNAVAILABLE)
case InventoryProcessStatus.Accepted(_,_) => new ResponseEntity(status.msg, HttpStatus.ACCEPTED)
}
case eb: EmptyBox =>
val fail = eb ?~! s"Error when trying to process inventory '${inventoryFile.getOriginalFilename}'"
Expand Down
Expand Up @@ -275,7 +275,7 @@ class ProcessFile(
}
}
inventoryProcessor.saveInventory(() => inventory.newInputStream, inventory.name, signature.map(s => () => s.newInputStream)) match {
case Full(InventoryProcessStatus.Accepted(report)) =>
case Full(InventoryProcessStatus.Accepted(_,_)) =>
//move to received dir
safeMove(signature.map(s => s.moveTo(received / s.name, overwrite = true)))
safeMove(inventory.moveTo(received / inventory.name, overwrite = true))
Expand Down
Expand Up @@ -39,10 +39,7 @@ package com.normation.inventory.provisioning.endpoint

import java.security.{PublicKey => JavaSecPubKey}

import com.normation.inventory.domain.CertifiedKey
import com.normation.inventory.domain.InventoryReport
import com.normation.inventory.domain.KeyStatus
import com.normation.inventory.domain.UndefinedKey
import com.normation.inventory.domain.{CertifiedKey, InventoryReport, KeyStatus, NodeId, UndefinedKey}
import com.normation.inventory.ldap.core.InventoryDit
import com.normation.inventory.provisioning.endpoint.FusionReportEndpoint._
import com.normation.inventory.services.core.FullInventoryRepository
Expand Down Expand Up @@ -72,36 +69,40 @@ object InventoryLogger extends Logger {
override protected def _logger = LoggerFactory.getLogger("inventory-processing")
}

sealed trait InventoryProcessStatus { def report: InventoryReport }
sealed trait InventoryProcessStatus {
def reportName: String
def nodeId: NodeId

}
final object InventoryProcessStatus {
final case class Accepted (report: InventoryReport) extends InventoryProcessStatus
final case class QueueFull (report: InventoryReport) extends InventoryProcessStatus
final case class SignatureInvalid(report: InventoryReport) extends InventoryProcessStatus
final case class MissingSignature(report: InventoryReport) extends InventoryProcessStatus
final case class Accepted (reportName: String, nodeId: NodeId) extends InventoryProcessStatus
final case class QueueFull (reportName: String, nodeId: NodeId) extends InventoryProcessStatus
final case class SignatureInvalid(reportName: String, nodeId: NodeId) extends InventoryProcessStatus
final case class MissingSignature(reportName: String, nodeId: NodeId) extends InventoryProcessStatus
}


object StatusLog {
implicit class LogMessage(status: InventoryProcessStatus) {
def msg: String = status match {
case InventoryProcessStatus.MissingSignature(report) =>
s"Rejecting Inventory '${report.name}' for Node '${report.node.main.id.value}' because its signature is missing. " +
case InventoryProcessStatus.MissingSignature(reportName, nodeId) =>
s"Rejecting Inventory '${reportName}' for Node '${nodeId.value}' because its signature is missing. " +
s"You can go back to unsigned state by running the following command on the Rudder Server: " +
s"'/opt/rudder/bin/rudder-keys reset-status ${report.node.main.id.value}'"
s"'/opt/rudder/bin/rudder-keys reset-status ${nodeId.value}'"

case InventoryProcessStatus.SignatureInvalid(report) =>
s"Rejecting Inventory '${report.name}' for Node '${report.node.main.id.value}' because the Inventory signature is " +
case InventoryProcessStatus.SignatureInvalid(reportName, nodeId) =>
s"Rejecting Inventory '${reportName}' for Node '${nodeId.value}' because the Inventory signature is " +
s"not valid: the Inventory was not signed with the same agent key as the one saved within Rudder for that Node. If " +
s"you updated the agent key on this node, you can update the key stored within Rudder with the following command on " +
s"the Rudder Server: '/opt/rudder/bin/rudder-keys change-key ${report.node.main.id.value} <your new public key>'. " +
s"the Rudder Server: '/opt/rudder/bin/rudder-keys change-key ${nodeId.value} <your new public key>'. " +
s"If you did not change the key, please ensure that the node sending that inventory is actually the node registered " +
s"within Rudder"

case InventoryProcessStatus.QueueFull(report) =>
s"Rejecting Inventory '${report.name}' for Node '${report.node.main.id.value}' because processing queue is full."
case InventoryProcessStatus.QueueFull(reportName, nodeId) =>
s"Rejecting Inventory '${reportName}' for Node '${nodeId.value}' because processing queue is full."

case InventoryProcessStatus.Accepted(report) =>
s"Inventory '${report.name}' for Node '${report.node.main.id.value}' added to processing queue."
case InventoryProcessStatus.Accepted(reportName, nodeId) =>
s"Inventory '${reportName}' for Node '${nodeId.value}' added to processing queue."
}
}
}
Expand Down Expand Up @@ -153,7 +154,7 @@ class InventoryProcessor(
checkQueueAndSave(certifiedReport)
} else {
// Signature is not valid, reject inventory
Full(InventoryProcessStatus.SignatureInvalid(report))
Full(InventoryProcessStatus.SignatureInvalid(report.name, report.node.main.id))
}
} yield {
saved
Expand All @@ -169,7 +170,7 @@ class InventoryProcessor(
// Status is undefined => We accept unsigned inventory
case UndefinedKey => checkQueueAndSave(report)
// We are in certified state, refuse inventory with no signature
case CertifiedKey => Full(InventoryProcessStatus.MissingSignature(report))
case CertifiedKey => Full(InventoryProcessStatus.MissingSignature(report.name, report.node.main.id))
}
}

Expand Down Expand Up @@ -220,18 +221,20 @@ class InventoryProcessor(
parsed <- digestService.parseSecurityToken(tk)
_ <- checkCertificateSubject(report, parsed.subject)
afterParsing = System.currentTimeMillis()
_ = logger.debug(s"Inventory '${report.name}' parsed in ${printer.print(new Duration(afterParsing, System.currentTimeMillis).toPeriod)} ms, now saving")
reportName = report.name
nodeId = report.node.main.id
_ = logger.debug(s"Inventory '${reportName}' parsed in ${printer.print(new Duration(afterParsing, System.currentTimeMillis).toPeriod)} ms, now saving")
saved <- optNewSignatureStream match { // Do we have a signature ?
// Signature here, check it
case Some(sig) =>
saveWithSignature(report, parsed.publicKey, sig) ?~! "Error when trying to check inventory signature"

// There is no Signature
case None =>
saveNoSignature(report, status) ?~! s"Error when trying to check inventory key status for Node '${report.node.main.id.value}'"
saveNoSignature(report, status) ?~! s"Error when trying to check inventory key status for Node '${nodeId.value}'"
}
} yield {
logger.debug(s"Inventory '${report.name}' for node '${report.node.main.id.value}' pre-processed in ${printer.print(new Duration(start, System.currentTimeMillis).toPeriod)} ms")
logger.debug(s"Inventory '${reportName}' for node '${nodeId.value}' pre-processed in ${printer.print(new Duration(start, System.currentTimeMillis).toPeriod)} ms")
saved
}

Expand All @@ -240,10 +243,10 @@ class InventoryProcessor(
case Full(status) =>
import com.normation.inventory.provisioning.endpoint.StatusLog.LogMessage
status match {
case InventoryProcessStatus.MissingSignature(_) => logger.error(status.msg)
case InventoryProcessStatus.SignatureInvalid(_) => logger.error(status.msg)
case InventoryProcessStatus.QueueFull(_) => logger.warn(status.msg)
case InventoryProcessStatus.Accepted(_) => logger.trace(status.msg)
case InventoryProcessStatus.MissingSignature(_,_) => logger.error(status.msg)
case InventoryProcessStatus.SignatureInvalid(_,_) => logger.error(status.msg)
case InventoryProcessStatus.QueueFull(_,_) => logger.warn(status.msg)
case InventoryProcessStatus.Accepted(_,_) => logger.trace(status.msg)
}
case eb: EmptyBox =>
val fail = eb ?~! s"Error when trying to process inventory '${inventoryFileName}'"
Expand Down Expand Up @@ -277,9 +280,9 @@ class InventoryProcessor(
if(canDo) {
//queue the inventory processing
reportProcess.onNext(report)
Full(InventoryProcessStatus.Accepted(report))
Full(InventoryProcessStatus.Accepted(report.name, report.node.main.id))
} else {
Full(InventoryProcessStatus.QueueFull(report))
Full(InventoryProcessStatus.QueueFull(report.name, report.node.main.id))
}

case eb: EmptyBox =>
Expand Down