Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #22971: Spurious test error on the killing of old fiber on datasource save/delete #575

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class DataSourceScheduler(
private[this] val semaphore = Semaphore.make(1).runNow

// for that datasource, this is the timer
private[datasources] val source: UIO[Unit] = {
private[datasources] val source: UIO[Unit] = {
val never = Schedule.stop

val schedule = datasource.runParam.schedule match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ import zio.syntax._
import zio.test.Annotations
import zio.test.TestClock


/**
* This is just an example test server to run by hand and see how things work.
*/
Expand Down Expand Up @@ -805,13 +804,13 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
}

"create a new schedule from data source information" in {
val (total_0, total_0s, total_1s, total_4m, total_5m, total_8m) = ZIO.scoped(makeTestClock.flatMap {
testClock =>
val (total_0, total_0s, total_1s, total_4m, total_5m, total_8m) = ZIO
.scoped(makeTestClock.flatMap { testClock =>
// testClock need to know what fibers are doing something, and it' seems to be done easily with a queue.
val queue = Queue.unbounded[Unit].runNow

val dss = new DataSourceScheduler(
datasource.copy(name = DataSourceName("create a new schedule")),
datasource.copy(name = DataSourceName("create a new schedule")),
Enabled,
() => ModificationId(MyDatasource.uuidGen.newUuid),
testAction(queue)
Expand Down Expand Up @@ -862,54 +861,57 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
.runTimeout(1.minute)

val size = NodeConfigData.allNodesInfo.size
(total_0, total_0s, total_1s, total_4m, total_5m, total_8m) must beEqualTo((0, size, size, size, size * 2, size * 2))// and
//(f1 must beEqualTo(None)) and (r1 === Fiber.Status.Running(interrupting = false)) and (r2 === Fiber.Status.Done)
(total_0, total_0s, total_1s, total_4m, total_5m, total_8m) must beEqualTo((0, size, size, size, size * 2, size * 2)) // and
// (f1 must beEqualTo(None)) and (r1 === Fiber.Status.Running(interrupting = false)) and (r2 === Fiber.Status.Done)
}
}

"operation from repository" should {

"saving rom repos should kill the old fiber" in {
val id = DataSourceId("test-repos-save")
"When we update a datasource with repo operation, its live instance must be reloaded (old fiber killed)" >> {
val infos = new TestNodeRepoInfo(NodeConfigData.allNodesInfo)
val repos = new DataSourceRepoImpl(
new MemoryDataSourceRepository(),
new HttpQueryDataSourceService(
infos,
parameterRepo,
infos,
interpolation,
noPostHook,
() => alwaysEnforce.succeed
),
MyDatasource.uuidGen,
AlwaysEnabledPluginStatus
)

val datasource = NewDataSource(
name = id.value,
url = s"${REST_SERVER_URL}/$${rudder.node.id}",
path = "$.hostname",
schedule = Scheduled(5.minute)
)
val id = DataSourceId("test-repos-save")

val infos = new TestNodeRepoInfo(NodeConfigData.allNodesInfo)
val repos = new DataSourceRepoImpl(
new MemoryDataSourceRepository(),
new HttpQueryDataSourceService(
infos,
parameterRepo,
infos,
interpolation,
noPostHook,
() => alwaysEnforce.succeed
),
MyDatasource.uuidGen,
AlwaysEnabledPluginStatus
)
val datasource = NewDataSource(
name = id.value,
url = s"${REST_SERVER_URL}/$${rudder.node.id}",
path = "$.hostname",
schedule = Scheduled(5.minute)
)

val (r11, r12) = RunNowTimeout(
for {
_ <- repos.save(datasource)
f1 <- repos.datasources.all().flatMap(_(id).scheduledTask.get).notOptional("error in test: f1 is none")
// here, it should be Suspended because it won't run before 5 minutes
r11 <- f1.fold(_.status, _ => Unexpected("Datasource scheduler fiber should not be synthetic").fail)
_ <- repos.save(datasource.copy(name = DataSourceName("updated name")))
_ <- repos.datasources.all().flatMap(_(id).scheduledTask.get).notOptional("error in test: f2 is none")
r12 <- f1.fold(_.status, _ => Unexpected("Datasource scheduler fiber should not be synthetic").fail)
} yield (r11, r12)
).runTimeout(1.minute)

(r11 must beLike {
case Fiber.Status.Suspended( _, _, _) => ok
}) and (r12 === Fiber.Status.Done)
val (r11, r12, r21) = RunNowTimeout(
for {
_ <- repos.save(datasource) // init
f1 <- repos.datasources.all().flatMap(_(id).scheduledTask.get).notOptional("error in test: f1 is none")
// here, it can be either Running (if the init takes some time) or Suspended (if init ended and won't run before 5 minutes)
r11 <- f1.fold(_.status, _ => Unexpected("Datasource scheduler fiber should not be synthetic").fail)
_ <- repos.save(datasource.copy(name = DataSourceName("updated name")))
f2 <- repos.datasources.all().flatMap(_(id).scheduledTask.get).notOptional("error in test: f2 is none")
// here, since we updated repos, f1 was terminated
r12 <- f1.fold(_.status, _ => Unexpected("Datasource scheduler fiber should not be synthetic").fail)
// and f2 is running or suspended (if waiting for next schedule)
r21 <- f2.fold(_.status, _ => Unexpected("Datasource scheduler fiber should not be synthetic").fail)
} yield (r11, r12, r21)
).runTimeout(1.minute)

def fiberRunning(fs: Fiber.Status) = fs must beLike {
case Fiber.Status.Running(_, _) => ok
case Fiber.Status.Suspended(_, _, _) => ok
}

fiberRunning(r11) and (r12 must beEqualTo(Fiber.Status.Done)) and fiberRunning(r21)
}

"querying a lot of nodes" should {
Expand Down