Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #16926: Port datasources to zio RC18-2 #249

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datasources/pom-template.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@
<version>${http4s-version}</version>
<scope>test</scope>
</dependency>

<!-- zio tests -->
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-test_${scala-binary-version}</artifactId>
<version>${dev-zio-version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<!-- I hate maven. 100 lines for a F**** if/then/else. -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class DataSourcesPluginDef(override val status: PluginStatus) extends DefaultPlu

def init = {
// initialize datasources to start schedule
ZioRuntime.unsafeRun(DatasourcesConf.dataSourceRepository.initialize().fork)
ZioRuntime.unsafeRun(DatasourcesConf.dataSourceRepository.initialize().forkDaemon)
}

def oneTimeInit : Unit = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import com.normation.utils.StringUuidGenerator
import doobie._
import doobie.implicits._
import org.joda.time.DateTime

import zio.duration._
import com.normation.plugins.PluginStatus
import com.normation.errors._
Expand All @@ -60,6 +59,7 @@ import zio._
import zio.clock.Clock
import zio.syntax._
import com.github.ghik.silencer.silent
import zio.console.Console
import zio.interop.catz._


Expand Down Expand Up @@ -132,7 +132,7 @@ trait NoopDataSourceCallbacks extends DataSourceUpdateCallbacks {
}

class MemoryDataSourceRepository extends DataSourceRepository {
val print = ZioRuntime.environment.console.putStrLn _
def print(s: String) = ZIO.accessM[Console](_.get.putStrLn(s)).provide(ZioRuntime.environment)

private[this] val sourcesRef = zio.Ref.make(Map[DataSourceId, DataSource]()).runNow

Expand Down Expand Up @@ -201,15 +201,15 @@ class DataSourceRepoImpl(
// Initialize data sources scheduler, with all sources present in backend
def initialize(): IOResult[Unit] = {
getAll.flatMap(sources =>
ZIO.traverse(sources) { case (_, source) =>
ZIO.foreach(sources) { case (_, source) =>
updateDataSourceScheduler(clock, source, Some(source.runParam.schedule.duration))
}.unit
).chainError("Error when initializing datasources")
}

// get datasource scheduler which match the condition
private[this] def foreachDatasourceScheduler(condition: DataSource => Boolean)(action: DataSourceScheduler => IOResult[Unit]): IOResult[Unit] = {
datasources.all.flatMap(m => ZIO.traverse(m.toIterable) { case (_, dss) =>
datasources.all.flatMap(m => ZIO.foreach(m.toIterable) { case (_, dss) =>
if(condition(dss.datasource)) {
action(dss)
} else {
Expand Down Expand Up @@ -371,7 +371,7 @@ class DataSourceRepoImpl(
}
}.toList.sortBy( _._1.toMillis ).zipWithIndex)

toStart.map(l => ZIO.traverse(l) { case ((period, dss), i) =>
toStart.map(l => ZIO.foreach(l) { case ((period, dss), i) =>
dss.startWithDelay((i+1).minutes)
}): @silent //suppress "a type was inferred to be `Any`"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class DataSourceScheduler(
case Scheduled(d) =>
if(datasource.enabled) {
DataSourceLoggerPure.Scheduler.info(s"Datasource '${datasource.name.value}' (${datasource.id.value}) is enabled and scheduled every ${d.asScala.toMinutes.toString} minutes") *>
ZSchedule.spaced(d).succeed
Schedule.spaced(d).succeed
} else {
DataSourceLoggerPure.Scheduler.info(s"Datasource '${datasource.name.value}' (${datasource.id.value}) is disabled") *>
Schedule.never.succeed
Expand Down Expand Up @@ -126,7 +126,7 @@ class DataSourceScheduler(
*/
def startWithDelay(delay: Duration): IOResult[Unit] = {
// don't forget to fork is you don't want to block for "delay"!
restartScheduleTask().delay(delay).provide(clock).fork.unit
restartScheduleTask().delay(delay).provide(clock).forkDaemon.unit
}

/*
Expand All @@ -141,7 +141,7 @@ class DataSourceScheduler(
if(pluginStatus.isEnabled) {
for {
_ <- DataSourceLoggerPure.debug(s"Scheduling runs for data source with id '${datasource.id.value}'")
fiber <- source.fork
fiber <- source.forkDaemon
_ <- scheduledTask.set(Some(fiber))
} yield ()
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class GetDataset(valueCompiler: InterpolatedValueCompiler) {
) : IOResult[Option[NodeProperty]] = {
//utility to expand both key and values of a map
def expandMap(expand: String => PureResult[String], map: Map[String, String]): IOResult[Map[String, String]] = {
(ZIO.traverse(map.toList) { case (key, value) =>
(ZIO.foreach(map.toList) { case (key, value) =>
(for {
newKey <- expand(key)
newValue <- expand(value)
Expand All @@ -117,7 +117,7 @@ class GetDataset(valueCompiler: InterpolatedValueCompiler) {
//actual logic

for {
parameters <- ZIO.traverse(parameters)(compiler.compileParameters(_).toIO).chainError("Error when transforming Rudder Parameter for variable interpolation")
parameters <- ZIO.foreach(parameters)(compiler.compileParameters(_).toIO).chainError("Error when transforming Rudder Parameter for variable interpolation")
expand = compiler.compileInput(node, policyServer, globalPolicyMode, parameters.toMap) _
url <- expand(datasource.url).chainError(s"Error when trying to parse URL ${datasource.url}").toIO
path <- expand(datasource.path).chainError(s"Error when trying to compile JSON path ${datasource.path}").toIO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,53 +91,56 @@ import zio.{IO => _, _}
import com.normation.errors._
import com.normation.zio._
import org.specs2.matcher.EqualityMatcher
import zio.test.environment.TestClock
import zio.test.environment._
import zio.duration._

object TheSpaced {

val makeTestClock = ZManaged.make(TestClock.make(TestClock.DefaultData))(_ => UIO.unit)

val prog = makeTestClock.use(testClock => for {
queue <- Queue.unbounded[Unit]
f <- (UIO(println("Hello!")) *> queue.offer(())).repeat(ZSchedule.fixed(5.minutes)).provide(testClock).fork
_ <- UIO(println("set to 0 min")) *> testClock.clock.adjust(0.nano) *> queue.take
_ <- UIO(println("set to 1 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 2 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 3 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 4 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 5 min")) *> testClock.clock.adjust(1.minute) *> queue.take
_ <- UIO(println("set to 6 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 7 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 8 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 9 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 10 min")) *> testClock.clock.adjust(1.minute) *> queue.take
_ <- UIO(println("set to 11 min")) *> testClock.clock.adjust(1.minute)
_ <- UIO(println("set to 25 min")) *> testClock.clock.adjust(10.minute)
_ <- f.join
} yield ())
val makeTestClock = TestClock.default.build

val prog = makeTestClock.use(testClock =>
for {
queue <- Queue.unbounded[Unit]
tc = testClock.get[TestClock.Service]
f <- (UIO(println("Hello!")) *> queue.offer(())).repeat(Schedule.fixed(5.minutes)).provide(testClock).forkDaemon
_ <- UIO(println("set to 0 min")) *> tc.adjust(0.nano) *> queue.take
_ <- UIO(println("set to 1 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 2 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 3 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 4 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 5 min")) *> tc.adjust(1.minute) *> queue.take
_ <- UIO(println("set to 6 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 7 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 8 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 9 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 10 min")) *> tc.adjust(1.minute) *> queue.take
_ <- UIO(println("set to 11 min")) *> tc.adjust(1.minute)
_ <- UIO(println("set to 25 min")) *> tc.adjust(10.minute)
_ <- f.join
} yield ()
).provideLayer(testEnvironment)

val prog2 = makeTestClock.use(testClock => for {
q <- Queue.unbounded[Unit]
_ <- (q.offer(()).delay(60.minutes)).forever.provide(testClock).fork
_ <- (q.offer(()).delay(60.minutes)).forever.provide(testClock).forkDaemon
a <- q.poll.map(_.isEmpty)
_ <- testClock.clock.adjust(60.minutes)
_ <- testClock.get[TestClock.Service].adjust(60.minutes)
x <- q.poll.map(_.nonEmpty)
b <- q.take.as(true)
c <- q.poll.map(_.isEmpty)
_ <- testClock.clock.adjust(60.minutes)
_ <- testClock.get[TestClock.Service].adjust(60.minutes)
d <- q.take.as(true)
e <- q.poll.map(_.isEmpty)
} yield a && b && c && d && e && x)
} yield a && b && c && d && e && x).provideLayer(testEnvironment)

def main(args: Array[String]): Unit = {
println(ZioRuntime.unsafeRun(prog2))
println(ZioRuntime.unsafeRun(prog))
}
}

@RunWith(classOf[JUnitRunner])
class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Loggable with AfterAll {
val makeTestClock = ZManaged.make(TestClock.make(TestClock.DefaultData))(_ => UIO.unit)
val makeTestClock = TestClock.default.build

implicit val blockingExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
implicit val cs: ContextShift[IO] = IO.contextShift(blockingExecutionContext)
Expand All @@ -153,8 +156,8 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
compactRender(parse(json))
}

implicit class RunNowTimeout[A](effect: IOResult[A]) {
def runTimeout(d: Duration) = effect.timeout(d).notOptional(s"The test timed-out after ${d}").provide(ZioRuntime.environment).runNow
implicit class RunNowTimeout[A](effect: ZIO[Live, RudderError, A]) {
def runTimeout(d: Duration) = effect.timeout(d).notOptional(s"The test timed-out after ${d}").provideLayer(testEnvironment).runNow
}

// a timer
Expand Down Expand Up @@ -492,7 +495,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
total_0 = ce_0 + cs_0
_ <- dss.restartScheduleTask()
//then, event after days, nothing is done
_ <- testClock.clock.adjust(1 day)
_ <- testClock.get[TestClock.Service].adjust(1 day)
ce_1d <- NodeDataset.counterError.get
cs_1d <- NodeDataset.counterSuccess.get
} yield {
Expand Down Expand Up @@ -528,14 +531,14 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
total_0 = ce_0 + cs_0
_ <- dss.restartScheduleTask()
//then, event after days, nothing is done
_ <- testClock.clock.adjust(1 day)
_ <- testClock.get[TestClock.Service].adjust(1 day)
_ <- queue.failIfNonEmpty
ce_1 <- NodeDataset.counterError.get
cs_1 <- NodeDataset.counterSuccess.get
total_1 = ce_1 + cs_1
//but asking for a direct update do the queries immediatly - task need at least 1ms to notice it should run
_ <- dss.doActionAndSchedule(action(UpdateCause(ModificationId("plop"), RudderEventActor, None)))
_ <- testClock.clock.adjust(1 millis)
_ <- testClock.get[TestClock.Service].adjust(1 millis)
_ <- queue.failIfNonEmpty
ce_2 <- NodeDataset.counterError.get
cs_2 <- NodeDataset.counterSuccess.get
Expand Down Expand Up @@ -574,31 +577,31 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
_ <- dss.restartScheduleTask()
//then just after, we have the first exec - it still need at least a ms to tick
//still nothing here
_ <- testClock.clock.setTime(1 millis)
_ <- testClock.get[TestClock.Service].setTime(1 millis)
//here we have results
_ <- queue.take
ce_0s <- NodeDataset.counterError.get
cs_0s <- NodeDataset.counterSuccess.get
total_0s = ce_0s + cs_0s
//then nothing happens before 5 minutes
_ <- testClock.clock.setTime(1 second)
_ <- testClock.get[TestClock.Service].setTime(1 second)
_ <- queue.failIfNonEmpty
ce_1s <- NodeDataset.counterError.get
cs_1s <- NodeDataset.counterSuccess.get
total_1s = ce_1s + cs_1s
_ <- testClock.clock.setTime(4 minutes)
_ <- testClock.get[TestClock.Service].setTime(4 minutes)
_ <- queue.failIfNonEmpty
ce_4m <- NodeDataset.counterError.get
cs_4m <- NodeDataset.counterSuccess.get
total_4m = ce_4m + cs_4m
//then all the nodes gets their info
_ <- testClock.clock.setTime(5 minutes)
_ <- testClock.get[TestClock.Service].setTime(5 minutes)
_ <- queue.take
ce_5m <- NodeDataset.counterError.get
cs_5m <- NodeDataset.counterSuccess.get
total_5m = ce_5m + cs_5m
//then nothing happen anymore
_ <- testClock.clock.setTime(8 minutes)
_ <- testClock.get[TestClock.Service].setTime(8 minutes)
_ <- queue.failIfNonEmpty
ce_8m <- NodeDataset.counterError.get
cs_8m <- NodeDataset.counterSuccess.get
Expand Down
7 changes: 4 additions & 3 deletions pom-template.xml
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,10 @@
<dependency><groupId>commons-fileupload</groupId><artifactId>commons-fileupload</artifactId><scope>provided</scope></dependency>
<dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><scope>provided</scope></dependency>
<dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><scope>provided</scope></dependency>
<dependency><groupId>dev.zio</groupId><artifactId>izumi-reflect-thirdparty-boopickle-shaded_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to adjust dependencies on plugins-common ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they changed some dependencies when switching to zio RC18-2.
Maven don't manage the provided one correctly if we don't tell him like that, so it may put them in the rpkg (because it doesn't see that they are provided), which lead to bigger rpkg (and horror stories in case of version mismatch)

<dependency><groupId>dev.zio</groupId><artifactId>izumi-reflect_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>dev.zio</groupId><artifactId>zio-interop-cats_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>dev.zio</groupId><artifactId>zio-stacktracer_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>dev.zio</groupId><artifactId>zio-streams_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>dev.zio</groupId><artifactId>zio-test_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>dev.zio</groupId><artifactId>zio_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>javax.servlet</groupId><artifactId>javax.servlet-api</artifactId><scope>provided</scope></dependency>
<dependency><groupId>jline</groupId><artifactId>jline</artifactId><scope>provided</scope></dependency>
Expand All @@ -230,9 +230,10 @@
<dependency><groupId>org.mindrot</groupId><artifactId>jbcrypt</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.mozilla</groupId><artifactId>rhino</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.ow2.asm</groupId><artifactId>asm</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.portable-scala</groupId><artifactId>portable-scala-reflect_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.reflections</groupId><artifactId>reflections</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.scala-lang.modules</groupId><artifactId>scala-collection-compat_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.scala-lang.modules</groupId><artifactId>scala-parallel-collections_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.scala-lang.modules</groupId><artifactId>scala-parser-combinators_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.scala-lang.modules</groupId><artifactId>scala-xml_${scala-binary-version}</artifactId><scope>provided</scope></dependency>
<dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><scope>provided</scope></dependency>
Expand Down