Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #15012: When receving pleinty of inventories at the same time, the web interface starts to parse them all at once #2367

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -33,6 +33,15 @@ ldif.tracelog.rootdir=/var/rudder/inventories/debug
#
waiting.inventory.queue.size=50

#
# You may want to limit the number of inventory file parsed in parallele.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parallel

# The goal is to avoid parsing hundreds of XML in parallel when we prefer
# to totally parse some (and then the others) and send them to backend.
# You can specify a positive integer or a string formated "Nx" where
# "N" is a double and x means "number of available core".
# A safe default is "0.5x"
inventory.parse.parallelization=0.5x

#
# Rudder 5.0.5 introduce a file watcher which react to new
# inventories put in ${inventories.root.directory}/incoming.
Expand Down
Expand Up @@ -108,19 +108,24 @@ class InventoryProcessor(
unmarshaller : ReportUnmarshaller
, reportSaver : ReportSaver[Seq[LDIFChangeRecord]]
, val maxQueueSize: Int
, val maxParallel : Long
, repo : FullInventoryRepository[Seq[LDIFChangeRecord]]
, digestService : InventoryDigestServiceV1
, checkAliveLdap : () => IOResult[Unit]
, nodeInventoryDit: InventoryDit
) {

val logger = InventoryProcessingLogger
// logs are not available here, need "print"
println(s"INFO Configure inventory processing with parallelism of '${maxParallel}' and queue size of '${maxQueueSize}'")

// we want to limit the number of reports concurrently parsed
lazy val xmlParsingSemaphore = ZioRuntime.unsafeRun(Semaphore.make(maxParallel))

/*
* We manage inventories buffering with a bounded queue.
* In case of overflow, we reject new inventories.
*/
val queue = ZioRuntime.unsafeRun(Queue.dropping[InventoryReport](maxQueueSize))
lazy val queue = ZioRuntime.unsafeRun(Queue.dropping[InventoryReport](maxQueueSize))

def currentQueueSize: Int = ZioRuntime.unsafeRun(queue.size)

Expand Down Expand Up @@ -198,16 +203,19 @@ class InventoryProcessor(
val start = System.currentTimeMillis()

val res = for {
_ <- logger.debug(s"Start parsing inventory '${info.fileName}'")
report <- parseSafe(info.inventoryStream, info.fileName).chainError("Can't parse the input inventory, aborting")
report <- xmlParsingSemaphore.withPermit(
InventoryProcessingLogger.debug(s"Start parsing inventory '${info.fileName}'") *>
parseSafe(info.inventoryStream, info.fileName).chainError("Can't parse the input inventory, aborting") <*
InventoryProcessingLogger.trace(s"Parsing done for inventory '${info.fileName}'")
)
secPair <- digestService.getKey(report).chainError(s"Error when trying to check inventory key for Node '${report.node.main.id.value}'")
parsed <- digestService.parseSecurityToken(secPair._1)
_ <- parsed.subject match {
case None => UIO.unit
case Some(list) => SecurityToken.checkCertificateSubject(report.node.main.id, list)
}
afterParsing = System.currentTimeMillis()
_ = logger.debug(s"Inventory '${report.name}' parsed in ${printer.print(new Duration(afterParsing, System.currentTimeMillis).toPeriod)} ms, now saving")
_ = InventoryProcessingLogger.debug(s"Inventory '${report.name}' parsed in ${printer.print(new Duration(afterParsing, System.currentTimeMillis).toPeriod)} ms, now saving")
saved <- info.optSignatureStream match { // Do we have a signature ?
// Signature here, check it
case Some(sig) =>
Expand All @@ -217,7 +225,7 @@ class InventoryProcessor(
case None =>
saveNoSignature(report, secPair._2).chainError(s"Error when trying to check inventory key status for Node '${report.node.main.id.value}'")
}
_ <- logger.debug(s"Inventory '${report.name}' for node '${report.node.main.id.value}' pre-processed in ${printer.print(new Duration(start, System.currentTimeMillis).toPeriod)} ms")
_ <- InventoryProcessingLogger.debug(s"Inventory '${report.name}' for node '${report.node.main.id.value}' pre-processed in ${printer.print(new Duration(start, System.currentTimeMillis).toPeriod)} ms")
} yield {
saved
}
Expand All @@ -226,10 +234,10 @@ class InventoryProcessor(
res map { status =>
import com.normation.inventory.provisioning.endpoint.StatusLog.LogMessage
status match {
case InventoryProcessStatus.MissingSignature(_) => logger.error(status.msg)
case InventoryProcessStatus.SignatureInvalid(_) => logger.error(status.msg)
case InventoryProcessStatus.QueueFull(_) => logger.warn(status.msg)
case InventoryProcessStatus.Accepted(_) => logger.trace(status.msg)
case InventoryProcessStatus.MissingSignature(_) => InventoryProcessingLogger.error(status.msg)
case InventoryProcessStatus.SignatureInvalid(_) => InventoryProcessingLogger.error(status.msg)
case InventoryProcessStatus.QueueFull(_) => InventoryProcessingLogger.warn(status.msg)
case InventoryProcessStatus.Accepted(_) => InventoryProcessingLogger.trace(status.msg)
}
} catchAll { err =>
val fail = Chained(s"Error when trying to process inventory '${info.fileName}'", err)
Expand Down Expand Up @@ -275,16 +283,16 @@ class InventoryProcessor(
*/
def saveReport(report:InventoryReport): UIO[Unit] = {
for {
_ <- logger.trace(s"Start post processing of inventory '${report.name}' for node '${report.node.main.id.value}'")
_ <- InventoryProcessingLogger.trace(s"Start post processing of inventory '${report.name}' for node '${report.node.main.id.value}'")
start <- UIO(System.currentTimeMillis)
saved <- reportSaver.save(report).chainError("Can't merge inventory report in LDAP directory, aborting").either
_ <- saved match {
case Left(err) =>
logger.error(s"Error when trying to process report: ${err.fullMsg}")
InventoryProcessingLogger.error(s"Error when trying to process report: ${err.fullMsg}")
case Right(report) =>
logger.debug("Report saved.")
InventoryProcessingLogger.debug("Report saved.")
}
_ <- logger.info(s"Report '${report.name}' for node '${report.node.main.hostname}' [${report.node.main.id.value}] (signature:${report.node.main.keyStatus.value}) "+
_ <- InventoryProcessingLogger.info(s"Report '${report.name}' for node '${report.node.main.hostname}' [${report.node.main.id.value}] (signature:${report.node.main.keyStatus.value}) "+
s"processed in ${printer.print(new Duration(start, System.currentTimeMillis).toPeriod)} ms")
} yield ()
}
Expand Down
Expand Up @@ -98,6 +98,14 @@ class AppConfig {
@Value("${waiting.inventory.queue.size}")
var WAITING_QUEUE_SIZE = 50

// the number of inventories parsed in parallel.
// It also limits the number of inventories proccessed in
// parallel, because obviously you need to parse the inventory before saving it.
// Minimum 1, 1x mean "0.5x number of cores"
@Value("${inventory.parse.parallelization:0.5x}")
var MAX_PARSE_PARALLEL = "0.5x"


@Value("${inventories.root.directory}")
var INVENTORY_ROOT_DIR = ""

Expand Down Expand Up @@ -283,15 +291,34 @@ class AppConfig {
}

@Bean
def inventoryProcessor() = new InventoryProcessor(
def inventoryProcessor() = {
val maxParallel = try {
val user = if(MAX_PARSE_PARALLEL.endsWith("x")) {
val xx = MAX_PARSE_PARALLEL.substring(0, MAX_PARSE_PARALLEL.size-1)
java.lang.Double.parseDouble(xx) * Runtime.getRuntime.availableProcessors()
} else {
java.lang.Double.parseDouble(MAX_PARSE_PARALLEL)
}
Math.max(1, user).toLong
} catch {
case ex: Exception =>
// logs are not available here
println(s"ERROR Error when parsing configuration properties for the parallelisation of inventory processing. " +
s"Expecting a positive integer or number of time the avaiblable processors. Default to '0.5x': " +
s"inventory.parse.parallelization=${MAX_PARSE_PARALLEL}")
Math.max(1, Math.ceil(Runtime.getRuntime.availableProcessors().toDouble/2).toLong)
}
new InventoryProcessor(
pipelinedReportUnmarshaller
, reportSaver
, WAITING_QUEUE_SIZE
, fullInventoryRepository
, new InventoryDigestServiceV1(fullInventoryRepository)
, checkLdapAlive
, pendingNodesDit
)
, reportSaver
, WAITING_QUEUE_SIZE
, maxParallel
, fullInventoryRepository
, new InventoryDigestServiceV1(fullInventoryRepository)
, checkLdapAlive
, pendingNodesDit
)
}

@Bean
def inventoryWatcher() = {
Expand Down
Expand Up @@ -16,23 +16,23 @@
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<load-on-startup>1</load-on-startup>

<!-- Configure DispatcherServlet to use JavaConfigWebApplicationContext
instead of the default XmlWebApplicationContext -->
<init-param>
<param-name>contextClass</param-name>
<param-value>org.springframework.web.context.support.AnnotationConfigWebApplicationContext</param-value>
</init-param>

<!-- Again, config locations must consist of one or more comma- or space-delimited
and fully-qualified @Configuration classes -->
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>com.normation.inventory.provisioning.endpoint.config.AppConfig</param-value>
</init-param>

<!-- Configure DispatcherServlet to use JavaConfigWebApplicationContext
instead of the default XmlWebApplicationContext -->
<init-param>
<param-name>contextClass</param-name>
<param-value>org.springframework.web.context.support.AnnotationConfigWebApplicationContext</param-value>
</init-param>
</servlet>

<servlet-mapping>
<servlet-name>dispatcher</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
</web-app>
</web-app>
Expand Up @@ -101,6 +101,7 @@ class TestCertificate extends Specification {
parser
, reportSaver
, 1000
, 2
, fullInventoryRepo
, new InventoryDigestServiceV1(fullInventoryRepo)
, () => UIO.unit
Expand Down
Expand Up @@ -23,7 +23,6 @@ package com.normation.ldap.listener
import com.unboundid.ldap.listener.{InMemoryDirectoryServer, InMemoryDirectoryServerConfig}
import com.unboundid.ldap.sdk.schema.Schema
import com.normation.ldap.ldif._
import com.normation.ldap.sdk.LDAPIOResult.LDAPIOResult
import com.normation.ldap.sdk._
import com.normation.ldap.sdk.syntax.UnboundidLDAPConnection
import zio.blocking.Blocking
Expand Down