Skip to content

Commit

Permalink
The scheduling works and actually tested
Browse files Browse the repository at this point in the history
  • Loading branch information
fanf committed Dec 20, 2016
1 parent 05bf310 commit 002dd5f
Show file tree
Hide file tree
Showing 5 changed files with 360 additions and 130 deletions.
Expand Up @@ -39,6 +39,8 @@ package com.normation.rudder.datasources

import org.joda.time.DateTime
import org.joda.time.Seconds
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

sealed trait DataSourceType {
def name : String
Expand All @@ -57,15 +59,19 @@ case class AllNodesSourceType(

case class DataSourceName(value : String) extends AnyVal

case class DataSource (
final case class DataSource (
name : DataSourceName
, updateTimeout: Duration
, description: String
, sourceType : DataSourceType
, url : String
, headers : Map[String,String]
, httpMethod : String
, path : String
, frequency : Seconds
, httpTimeout: Duration
, frequency : Option[FiniteDuration]
, runOnNewNode: Boolean
, runOnGeneration: Boolean
, lastUpdate : Option[DateTime]
, enabled : Boolean
// do we check SSL or not? Typically, we must not
Expand Down
@@ -0,0 +1,180 @@
/*
*************************************************************************************
* Copyright 2016 Normation SAS
*************************************************************************************
*
* This file is part of Rudder.
*
* Rudder is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In accordance with the terms of section 7 (7. Additional Terms.) of
* the GNU General Public License version 3, the copyright holders add
* the following Additional permissions:
* Notwithstanding to the terms of section 5 (5. Conveying Modified Source
* Versions) and 6 (6. Conveying Non-Source Forms.) of the GNU General
* Public License version 3, when you create a Related Module, this
* Related Module is not considered as a part of the work and may be
* distributed under the license agreement of your choice.
* A "Related Module" means a set of sources files including their
* documentation that, without modification of the Source Code, enables
* supplementary functions or services in addition to those offered by
* the Software.
*
* Rudder is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Rudder. If not, see <http://www.gnu.org/licenses/>.
*
*************************************************************************************
*/

package com.normation.rudder.datasources

import com.normation.inventory.domain.NodeId
import com.normation.rudder.repository.RoParameterRepository

import com.normation.rudder.domain.eventlog._

import net.liftweb.common.Box
import com.normation.rudder.services.nodes.NodeInfoService
import com.normation.rudder.repository.WoNodeRepository
import com.normation.rudder.services.policies.InterpolatedValueCompiler
import com.normation.utils.Control
import net.liftweb.common.Failure
import com.normation.rudder.domain.parameters.Parameter
import net.liftweb.common.Full
import com.normation.rudder.domain.nodes.CompareProperties
import com.normation.eventlog.EventActor
import com.normation.eventlog.ModificationId
import monix.eval.Task
import monix.execution.Scheduler
import scala.concurrent.Await
import com.normation.rudder.domain.nodes.NodeInfo
import net.liftweb.common.Empty
import monix.execution.Cancelable
import scala.util.control.NonFatal
import org.joda.time.DateTime
import monix.reactive.Observable
import net.liftweb.common.Loggable


final case class UpdateCause(modId: ModificationId, actor:EventActor, reason:Option[String])

/**
* This object represent a statefull scheduler for fetching (or whatever action)
* data from datasource.
* Its contract is that:
* - data source is immutable for that scheduler
* - action is call periodically accordingly to data source period
* - the scheduler is initially STOPPED. It can be start with the start() method.
* - the scheduler can be stopped (when already stopped, it's a noop) with the cancel() method.
* - there is callback that should be call each time a node is added / a generation is
* started - the data source configuration will decide is something is to done or not.
*/
class DatasourceScheduler(
datasource: DataSource
, implicit val scheduler : Scheduler
, newUuid : () => ModificationId
, action : UpdateCause => Unit
) extends Loggable {

/**
* So, the idea is to build an observable that tick every period (if period defined)
* We start it when the datasource is initialized, and stop it/start it around
* each user-triggered event like "on new node".
*
* At each tick, we fetch data.
*/

//for that datasource, this is the timer
private[this] val source = (datasource.frequency match {
case Some(d) =>
Observable.interval(d)
case None => //in that case, our source does produce anything
Observable.empty[Long]
}
//and now, map the actual behavior to produce at each tick
).mapAsync { tick =>
Task(action(UpdateCause(newUuid(), RudderEventActor
, Some(s"Automatically fetching data for data source ${datasource.name}")
)))
}
// we add an auto restart in case a getData lead to an error
.onErrorRestart(5)

// here is the place where we will store the currently
// running time, so that we are able the stop it and restart
// it on user action.
private[this] var scheduledTask = Option.empty[Cancelable]


/*
* This is the main interesting method, seting
* things up for schedule
*/
def restartScheduleTask(): Unit = {
// clean existing
cancel()
// actually start the scheduler by subscribing to it
scheduledTask = Some(source.subscribe())
}

// the cancel method just stop the current time if
// exists, and clean things up
def cancel() : Unit = {
scheduledTask.foreach( _.cancel() )
scheduledTask = None
}

/*
* alias for restartScheduleTask
*/
def start() = restartScheduleTask

/**
* Callback when a new node is accepted
*/
def onNewNode(node: NodeInfo): Unit = {
if(datasource.runOnNewNode && datasource.enabled) {
doActionAndSchedule(UpdateCause(newUuid(), RudderEventActor
, Some(s"Getting data for source ${datasource.name.value} for newly accepted node ${node.hostname} [${node.id.value}]")
))
}
}

/**
* Callback when a generation is started
*/
def onNewNode(generationTimeStamp: DateTime): Unit = {
if(datasource.runOnGeneration && datasource.enabled) {
doActionAndSchedule(UpdateCause(newUuid(), RudderEventActor
, Some(s"Getting data for source ${datasource.name.value} for policy generation started at ${generationTimeStamp.toString()}")
))
}
}

/**
* This is the method that actually do a fetch data and manage
* the scheduler restart.
* We must avoid exceptions.
*/
private[this] def doActionAndSchedule(c: UpdateCause): Unit = {
cancel()
try {
action(c)
} catch {
case NonFatal(ex) => logger.error(s"Error when fetching data", ex)
} finally {
restartScheduleTask()
}
}

}

Expand Up @@ -66,7 +66,7 @@ import net.liftweb.common.Empty
* - (event log are generated because we are just changing node properties,
* so same behaviour)
*/
trait QueryDatasourceService {
trait QueryDataSourceService {
/**
* Here, we query the provided datasource and update
* all the node with the correct logic.
Expand All @@ -80,12 +80,12 @@ trait QueryDatasourceService {



class HttpQueryDatasourceService(
class HttpQueryDataSourceService(
nodeInfo : NodeInfoService
, parameterRepo : RoParameterRepository
, nodeRepository : WoNodeRepository
, interpolCompiler: InterpolatedValueCompiler
) extends QueryDatasourceService {
) extends QueryDataSourceService {

val getHttp = new GetDataset(interpolCompiler)

Expand Down Expand Up @@ -116,7 +116,7 @@ class HttpQueryDatasourceService(
case Some(p) => Full(p)
})
//connection timeout: 5s ; getdata timeout: freq ?
property <- getHttp.getNode(datasource, nodeInfo, policyServer, parameters, 5000, datasource.frequency.getSeconds*1000)
property <- getHttp.getNode(datasource, nodeInfo, policyServer, parameters, datasource.httpTimeout, datasource.httpTimeout)
newNode = nodeInfo.node.copy(properties = CompareProperties.updateProperties(nodeInfo.properties, Some(Seq(property))))
nodeUpdated <- nodeRepository.updateNode(newNode, modId, actor, reason)
} yield {
Expand All @@ -130,7 +130,7 @@ class HttpQueryDatasourceService(
}

// give a timeout for the whole tasks sufficiently large, but that won't overlap too much on following runs
val timeout = (datasource.frequency.getSeconds*2).seconds
val timeout = datasource.httpTimeout

for {
nodes <- nodeInfo.getAll()
Expand Down
Expand Up @@ -72,6 +72,7 @@ import scalaz._
import net.minidev.json.JSONObject
import scala.util.control.NonFatal
import net.liftweb.common.Empty
import scala.concurrent.duration.Duration

/*
* This file contain the logic to update dataset from an
Expand All @@ -95,7 +96,7 @@ class GetDataset(valueCompiler: InterpolatedValueCompiler) {

val compiler = new InterpolateNode(valueCompiler)

def getNode(datasource: DataSource, node: NodeInfo, policyServer: NodeInfo, parameters: Set[Parameter], connectionTimeout: Int, readTimeOut: Int): Box[NodeProperty] = {
def getNode(datasource: DataSource, node: NodeInfo, policyServer: NodeInfo, parameters: Set[Parameter], connectionTimeout: Duration, readTimeOut: Duration): Box[NodeProperty] = {
for {
p <- sequence(parameters.toSeq)(compiler.compileParameters)
parameters = p.toMap
Expand Down Expand Up @@ -131,10 +132,10 @@ object QueryHttp {
* Simple synchronous http get, return the response
* body as a string.
*/
def GET(url: String, headers: Map[String, String], checkSsl: Boolean, connectionTimeout: Int, readTimeOut: Int): Box[String] = {
def GET(url: String, headers: Map[String, String], checkSsl: Boolean, connectionTimeout: Duration, readTimeOut: Duration): Box[String] = {
val options = (
HttpOptions.connTimeout(connectionTimeout)
:: HttpOptions.readTimeout(readTimeOut)
HttpOptions.connTimeout(connectionTimeout.toMillis.toInt)
:: HttpOptions.readTimeout(readTimeOut.toMillis.toInt)
:: (if(checkSsl) {
Nil
} else {
Expand Down

0 comments on commit 002dd5f

Please sign in to comment.