Skip to content

Commit

Permalink
Fixes #16926: Port datasources to zio RC18-2
Browse files Browse the repository at this point in the history
  • Loading branch information
fanf authored and Jenkins CI committed Mar 18, 2020
1 parent 9c451ef commit 7484faa
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 51 deletions.
8 changes: 8 additions & 0 deletions datasources/pom-template.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@
<version>${http4s-version}</version>
<scope>test</scope>
</dependency>

<!-- zio tests -->
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-test_${scala-binary-version}</artifactId>
<version>${dev-zio-version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<!-- I hate maven. 100 lines for a F**** if/then/else. -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class DataSourcesPluginDef(override val status: PluginStatus) extends DefaultPlu

def init = {
// initialize datasources to start schedule
ZioRuntime.unsafeRun(DatasourcesConf.dataSourceRepository.initialize().fork)
ZioRuntime.unsafeRun(DatasourcesConf.dataSourceRepository.initialize().forkDaemon)
}

def oneTimeInit : Unit = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import com.normation.utils.StringUuidGenerator
import doobie._
import doobie.implicits._
import org.joda.time.DateTime

import zio.duration._
import com.normation.plugins.PluginStatus
import com.normation.errors._
Expand All @@ -60,6 +59,7 @@ import zio._
import zio.clock.Clock
import zio.syntax._
import com.github.ghik.silencer.silent
import zio.console.Console
import zio.interop.catz._


Expand Down Expand Up @@ -132,7 +132,7 @@ trait NoopDataSourceCallbacks extends DataSourceUpdateCallbacks {
}

class MemoryDataSourceRepository extends DataSourceRepository {
val print = ZioRuntime.environment.console.putStrLn _
def print(s: String) = ZIO.accessM[Console](_.get.putStrLn(s)).provide(ZioRuntime.environment)

private[this] val sourcesRef = zio.Ref.make(Map[DataSourceId, DataSource]()).runNow

Expand Down Expand Up @@ -201,15 +201,15 @@ class DataSourceRepoImpl(
// Initialize data sources scheduler, with all sources present in backend
def initialize(): IOResult[Unit] = {
getAll.flatMap(sources =>
ZIO.traverse(sources) { case (_, source) =>
ZIO.foreach(sources) { case (_, source) =>
updateDataSourceScheduler(clock, source, Some(source.runParam.schedule.duration))
}.unit
).chainError("Error when initializing datasources")
}

// get datasource scheduler which match the condition
private[this] def foreachDatasourceScheduler(condition: DataSource => Boolean)(action: DataSourceScheduler => IOResult[Unit]): IOResult[Unit] = {
datasources.all.flatMap(m => ZIO.traverse(m.toIterable) { case (_, dss) =>
datasources.all.flatMap(m => ZIO.foreach(m.toIterable) { case (_, dss) =>
if(condition(dss.datasource)) {
action(dss)
} else {
Expand Down Expand Up @@ -371,7 +371,7 @@ class DataSourceRepoImpl(
}
}.toList.sortBy( _._1.toMillis ).zipWithIndex)

toStart.map(l => ZIO.traverse(l) { case ((period, dss), i) =>
toStart.map(l => ZIO.foreach(l) { case ((period, dss), i) =>
dss.startWithDelay((i+1).minutes)
}): @silent //suppress "a type was inferred to be `Any`"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class DataSourceScheduler(
case Scheduled(d) =>
if(datasource.enabled) {
DataSourceLoggerPure.Scheduler.info(s"Datasource '${datasource.name.value}' (${datasource.id.value}) is enabled and scheduled every ${d.asScala.toMinutes.toString} minutes") *>
ZSchedule.spaced(d).succeed
Schedule.spaced(d).succeed
} else {
DataSourceLoggerPure.Scheduler.info(s"Datasource '${datasource.name.value}' (${datasource.id.value}) is disabled") *>
Schedule.never.succeed
Expand Down Expand Up @@ -126,7 +126,7 @@ class DataSourceScheduler(
*/
def startWithDelay(delay: Duration): IOResult[Unit] = {
// don't forget to fork is you don't want to block for "delay"!
restartScheduleTask().delay(delay).provide(clock).fork.unit
restartScheduleTask().delay(delay).provide(clock).forkDaemon.unit
}

/*
Expand All @@ -141,7 +141,7 @@ class DataSourceScheduler(
if(pluginStatus.isEnabled) {
for {
_ <- DataSourceLoggerPure.debug(s"Scheduling runs for data source with id '${datasource.id.value}'")
fiber <- source.fork
fiber <- source.forkDaemon
_ <- scheduledTask.set(Some(fiber))
} yield ()
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class GetDataset(valueCompiler: InterpolatedValueCompiler) {
) : IOResult[Option[NodeProperty]] = {
//utility to expand both key and values of a map
def expandMap(expand: String => PureResult[String], map: Map[String, String]): IOResult[Map[String, String]] = {
(ZIO.traverse(map.toList) { case (key, value) =>
(ZIO.foreach(map.toList) { case (key, value) =>
(for {
newKey <- expand(key)
newValue <- expand(value)
Expand All @@ -117,7 +117,7 @@ class GetDataset(valueCompiler: InterpolatedValueCompiler) {
//actual logic

for {
parameters <- ZIO.traverse(parameters)(compiler.compileParameters(_).toIO).chainError("Error when transforming Rudder Parameter for variable interpolation")
parameters <- ZIO.foreach(parameters)(compiler.compileParameters(_).toIO).chainError("Error when transforming Rudder Parameter for variable interpolation")
expand = compiler.compileInput(node, policyServer, globalPolicyMode, parameters.toMap) _
url <- expand(datasource.url).chainError(s"Error when trying to parse URL ${datasource.url}").toIO
path <- expand(datasource.path).chainError(s"Error when trying to compile JSON path ${datasource.path}").toIO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,53 +91,56 @@ import zio.{IO => _, _}
import com.normation.errors._
import com.normation.zio._
import org.specs2.matcher.EqualityMatcher
import zio.test.environment.TestClock
import zio.test.environment._
import zio.duration._

object TheSpaced {

val makeTestClock = ZManaged.make(TestClock.make(TestClock.DefaultData))(_ => UIO.unit)

val prog = makeTestClock.use(testClock => for {
queue <- Queue.unbounded[Unit]
f <- (UIO(println("Hello!")) *> queue.offer(())).repeat(ZSchedule.fixed(5.minutes)).provide(testClock).fork
_ <- UIO(println("set to 0 min")) *> testClock.clock.adjust(0.nano) *> queue.take
_ <- UIO(println("set to 1 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 2 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 3 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 4 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 5 min")) *> testClock.clock.adjust(1.minute) *> queue.take
_ <- UIO(println("set to 6 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 7 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 8 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 9 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 10 min")) *> testClock.clock.adjust(1.minute) *> queue.take
_ <- UIO(println("set to 11 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 25 min")) *> testClock.clock.adjust(10.minute)
_ <- f.join
} yield ())
val makeTestClock = TestClock.default.build

val prog = makeTestClock.use(testClock =>
for {
queue <- Queue.unbounded[Unit]
tc = testClock.get[TestClock.Service]
f <- (UIO(println("Hello!")) *> queue.offer(())).repeat(Schedule.fixed(5.minutes)).provide(testClock).forkDaemon
_ <- UIO(println("set to 0 min")) *> tc.adjust(0.nano) *> queue.take
_ <- UIO(println("set to 1 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 2 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 3 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 4 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 5 min")) *> tc.adjust(1.minute) *> queue.take
_ <- UIO(println("set to 6 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 7 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 8 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 9 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 10 min")) *> tc.adjust(1.minute) *> queue.take
_ <- UIO(println("set to 11 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 25 min")) *> tc.adjust(10.minute)
_ <- f.join
} yield ()
).provideLayer(testEnvironment)

val prog2 = makeTestClock.use(testClock => for {
q <- Queue.unbounded[Unit]
_ <- (q.offer(()).delay(60.minutes)).forever.provide(testClock).fork
_ <- (q.offer(()).delay(60.minutes)).forever.provide(testClock).forkDaemon
a <- q.poll.map(_.isEmpty)
_ <- testClock.clock.adjust(60.minutes)
_ <- testClock.get[TestClock.Service].adjust(60.minutes)
x <- q.poll.map(_.nonEmpty)
b <- q.take.as(true)
c <- q.poll.map(_.isEmpty)
_ <- testClock.clock.adjust(60.minutes)
_ <- testClock.get[TestClock.Service].adjust(60.minutes)
d <- q.take.as(true)
e <- q.poll.map(_.isEmpty)
} yield a && b && c && d && e && x)
} yield a && b && c && d && e && x).provideLayer(testEnvironment)

def main(args: Array[String]): Unit = {
println(ZioRuntime.unsafeRun(prog2))
println(ZioRuntime.unsafeRun(prog))
}
}

@RunWith(classOf[JUnitRunner])
class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Loggable with AfterAll {
val makeTestClock = ZManaged.make(TestClock.make(TestClock.DefaultData))(_ => UIO.unit)
val makeTestClock = TestClock.default.build

implicit val blockingExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
implicit val cs: ContextShift[IO] = IO.contextShift(blockingExecutionContext)
Expand All @@ -153,8 +156,8 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
compactRender(parse(json))
}

implicit class RunNowTimeout[A](effect: IOResult[A]) {
def runTimeout(d: Duration) = effect.timeout(d).notOptional(s"The test timed-out after ${d}").provide(ZioRuntime.environment).runNow
implicit class RunNowTimeout[A](effect: ZIO[Live, RudderError, A]) {
def runTimeout(d: Duration) = effect.timeout(d).notOptional(s"The test timed-out after ${d}").provideLayer(testEnvironment).runNow
}

// a timer
Expand Down Expand Up @@ -492,7 +495,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
total_0 = ce_0 + cs_0
_ <- dss.restartScheduleTask()
//then, event after days, nothing is done
_ <- testClock.clock.adjust(1 day)
_ <- testClock.get[TestClock.Service].adjust(1 day)
ce_1d <- NodeDataset.counterError.get
cs_1d <- NodeDataset.counterSuccess.get
} yield {
Expand Down Expand Up @@ -528,14 +531,14 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
total_0 = ce_0 + cs_0
_ <- dss.restartScheduleTask()
//then, event after days, nothing is done
_ <- testClock.clock.adjust(1 day)
_ <- testClock.get[TestClock.Service].adjust(1 day)
_ <- queue.failIfNonEmpty
ce_1 <- NodeDataset.counterError.get
cs_1 <- NodeDataset.counterSuccess.get
total_1 = ce_1 + cs_1
//but asking for a direct update do the queries immediatly - task need at least 1ms to notice it should run
_ <- dss.doActionAndSchedule(action(UpdateCause(ModificationId("plop"), RudderEventActor, None)))
_ <- testClock.clock.adjust(1 millis)
_ <- testClock.get[TestClock.Service].adjust(1 millis)
_ <- queue.failIfNonEmpty
ce_2 <- NodeDataset.counterError.get
cs_2 <- NodeDataset.counterSuccess.get
Expand Down Expand Up @@ -574,31 +577,31 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
_ <- dss.restartScheduleTask()
//then just after, we have the first exec - it still need at least a ms to tick
//still nothing here
_ <- testClock.clock.setTime(1 millis)
_ <- testClock.get[TestClock.Service].setTime(1 millis)
//here we have results
_ <- queue.take
ce_0s <- NodeDataset.counterError.get
cs_0s <- NodeDataset.counterSuccess.get
total_0s = ce_0s + cs_0s
//then nothing happens before 5 minutes
_ <- testClock.clock.setTime(1 second)
_ <- testClock.get[TestClock.Service].setTime(1 second)
_ <- queue.failIfNonEmpty
ce_1s <- NodeDataset.counterError.get
cs_1s <- NodeDataset.counterSuccess.get
total_1s = ce_1s + cs_1s
_ <- testClock.clock.setTime(4 minutes)
_ <- testClock.get[TestClock.Service].setTime(4 minutes)
_ <- queue.failIfNonEmpty
ce_4m <- NodeDataset.counterError.get
cs_4m <- NodeDataset.counterSuccess.get
total_4m = ce_4m + cs_4m
//then all the nodes gets their info
_ <- testClock.clock.setTime(5 minutes)
_ <- testClock.get[TestClock.Service].setTime(5 minutes)
_ <- queue.take
ce_5m <- NodeDataset.counterError.get
cs_5m <- NodeDataset.counterSuccess.get
total_5m = ce_5m + cs_5m
//then nothing happen anymore
_ <- testClock.clock.setTime(8 minutes)
_ <- testClock.get[TestClock.Service].setTime(8 minutes)
_ <- queue.failIfNonEmpty
ce_8m <- NodeDataset.counterError.get
cs_8m <- NodeDataset.counterSuccess.get
Expand Down
7 changes: 4 additions & 3 deletions pom-template.xml
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,10 @@
<dependency><groupId>commons-fileupload</groupId><artifactId>commons-fileupload</artifactId><scope>provided</scope></dependency>
<dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><scope>provided</scope></dependency>
<dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><scope>provided</scope></dependency>
<dependency><groupId>dev.zio</groupId><artifactId>izumi-reflect-thirdparty-boopickle-shaded_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>dev.zio</groupId><artifactId>izumi-reflect_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>dev.zio</groupId><artifactId>zio-interop-cats_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>dev.zio</groupId><artifactId>zio-stacktracer_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>dev.zio</groupId><artifactId>zio-streams_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>dev.zio</groupId><artifactId>zio-test_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>dev.zio</groupId><artifactId>zio_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>javax.servlet</groupId><artifactId>javax.servlet-api</artifactId><scope>provided</scope></dependency>
<dependency><groupId>jline</groupId><artifactId>jline</artifactId><scope>provided</scope></dependency>
Expand All @@ -230,9 +230,10 @@
<dependency><groupId>org.mindrot</groupId><artifactId>jbcrypt</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.mozilla</groupId><artifactId>rhino</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.ow2.asm</groupId><artifactId>asm</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.portable-scala</groupId><artifactId>portable-scala-reflect_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.reflections</groupId><artifactId>reflections</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.scala-lang.modules</groupId><artifactId>scala-collection-compat_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.scala-lang.modules</groupId><artifactId>scala-parallel-collections_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.scala-lang.modules</groupId><artifactId>scala-parser-combinators_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.scala-lang.modules</groupId><artifactId>scala-xml_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><scope>provided</scope></dependency>
Expand Down

0 comments on commit 7484faa

Please sign in to comment.