Skip to content

Commit

Permalink
distage-testkit: add config for parallelism level (#1078)
Browse files Browse the repository at this point in the history
* add parallel level to testRunnner, add  to DIEffect

* update configs, add tests

* remove unused Functor instance from DIEffectAsync, fix distage-docker test

* namings

* fix build
  • Loading branch information
Caparow committed May 24, 2020
1 parent dbe2a00 commit 4ba0a77
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{Executors, ThreadFactory}

import cats.Parallel
import cats.effect.Timer
import izumi.distage.model.effect.LowPriorityDIEffectAsyncInstances.{_Parallel, _Timer}
import cats.effect.{Concurrent, Timer}
import izumi.distage.model.effect.LowPriorityDIEffectAsyncInstances.{_Concurrent, _Parallel, _Timer}
import izumi.functional.bio.{BIOTemporal, F}
import izumi.fundamentals.platform.functional.Identity

Expand All @@ -15,6 +15,8 @@ import scala.concurrent.{Await, ExecutionContext, Future}
trait DIEffectAsync[F[_]] {
def parTraverse_[A](l: Iterable[A])(f: A => F[Unit]): F[Unit]
def parTraverse[A, B](l: Iterable[A])(f: A => F[B]): F[List[B]]
def parTraverseN[A, B](n: Int)(l: Iterable[A])(f: A => F[B]): F[List[B]]
def parTraverseN_[A, B](n: Int)(l: Iterable[A])(f: A => F[Unit]): F[Unit]
def sleep(duration: FiniteDuration): F[Unit]
}

Expand All @@ -23,10 +25,9 @@ object DIEffectAsync extends LowPriorityDIEffectAsyncInstances {

implicit val diEffectParIdentity: DIEffectAsync[Identity] = {
new DIEffectAsync[Identity] {
final val DIEffectAsyncIdentityThreadFactory = new NamedThreadFactory("dieffect-cached-pool", daemon = true)
final val DIEffectAsyncIdentityPool = ExecutionContext.fromExecutorService {
Executors.newCachedThreadPool(new NamedThreadFactory(
"dieffect-cached-pool", daemon = true
))
Executors.newCachedThreadPool(DIEffectAsyncIdentityThreadFactory)
}

override def parTraverse_[A](l: Iterable[A])(f: A => Unit): Unit = {
Expand All @@ -40,6 +41,18 @@ object DIEffectAsync extends LowPriorityDIEffectAsyncInstances {
override def parTraverse[A, B](l: Iterable[A])(f: A => Identity[B]): Identity[List[B]] = {
parTraverseIdentity(DIEffectAsyncIdentityPool)(l)(f)
}

override def parTraverseN[A, B](n: Int)(l: Iterable[A])(f: A => Identity[B]): Identity[List[B]] = {
val limitedAsyncPool = ExecutionContext.fromExecutorService {
Executors.newFixedThreadPool(n, DIEffectAsyncIdentityThreadFactory)
}
parTraverseIdentity(limitedAsyncPool)(l)(f)
}

override def parTraverseN_[A, B](n: Int)(l: Iterable[A])(f: A => Identity[Unit]): Identity[Unit] = {
parTraverseN(n)(l)(f)
()
}
}
}

Expand All @@ -54,6 +67,12 @@ object DIEffectAsync extends LowPriorityDIEffectAsyncInstances {
override def parTraverse[A, B](l: Iterable[A])(f: A => F[Throwable, B]): F[Throwable, List[B]] = {
F.parTraverse(l)(f)
}
override def parTraverseN[A, B](n: Int)(l: Iterable[A])(f: A => F[Throwable, B]): F[Throwable, List[B]] = {
F.parTraverseN(n)(l)(f)
}
override def parTraverseN_[A, B](n: Int)(l: Iterable[A])(f: A => F[Throwable, Unit]): F[Throwable, Unit] = {
F.parTraverseN_(n)(l)(f)
}
}
}

Expand All @@ -70,7 +89,7 @@ object DIEffectAsync extends LowPriorityDIEffectAsyncInstances {

private val threadGroup = new ThreadGroup(parentGroup, name)
private val threadCount = new AtomicInteger(1)
private val threadHash = Integer.toUnsignedString(this.hashCode())
private val threadHash = Integer.toUnsignedString(this.hashCode())

override def newThread(r: Runnable): Thread = {
val newThreadNumber = threadCount.getAndIncrement()
Expand All @@ -93,7 +112,7 @@ private[effect] sealed trait LowPriorityDIEffectAsyncInstances {
*
* Optional instance via https://blog.7mind.io/no-more-orphans.html
*/
implicit final def fromParallelTimer[F[_], P[_[_]]: _Parallel, T[_[_]]: _Timer](implicit P: P[F], T: T[F]): DIEffectAsync[F] = {
implicit final def fromCats[F[_], P[_[_]]: _Parallel, T[_[_]]: _Timer, C[_[_]]: _Concurrent](implicit P: P[F], T: T[F], C: C[F]): DIEffectAsync[F] = {
new DIEffectAsync[F] {
override def parTraverse_[A](l: Iterable[A])(f: A => F[Unit]): F[Unit] = {
Parallel.parTraverse_(l.toList)(f)(cats.instances.list.catsStdInstancesForList, P.asInstanceOf[Parallel[F]])
Expand All @@ -104,6 +123,12 @@ private[effect] sealed trait LowPriorityDIEffectAsyncInstances {
override def parTraverse[A, B](l: Iterable[A])(f: A => F[B]): F[List[B]] = {
Parallel.parTraverse(l.toList)(f)(cats.instances.list.catsStdInstancesForList, P.asInstanceOf[Parallel[F]])
}
override def parTraverseN[A, B](n: Int)(l: Iterable[A])(f: A => F[B]): F[List[B]] = {
Concurrent.parTraverseN(n.toLong)(l.toList)(f)(cats.instances.list.catsStdInstancesForList, C.asInstanceOf[Concurrent[F]], P.asInstanceOf[Parallel[F]])
}
override def parTraverseN_[A, B](n: Int)(l: Iterable[A])(f: A => F[Unit]): F[Unit] = {
C.asInstanceOf[Concurrent[F]].void(parTraverseN(n)(l)(f))
}
}
}
}
Expand All @@ -118,4 +143,9 @@ private object LowPriorityDIEffectAsyncInstances {
object _Timer {
@inline implicit final def get: _Timer[cats.effect.Timer] = null
}

sealed trait _Concurrent[K[_[_]]]
object _Concurrent {
@inline implicit final def get: _Concurrent[cats.effect.Concurrent] = null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class KeyMinimizer(allKeys: Set[DIKey]) {

@silent("Unused import")
private[this] val index: Map[String, Int] = {

import scala.collection.compat._
allKeys
.toSeq
.flatMap(extract)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import izumi.distage.model.definition.ModuleDef
import izumi.distage.testkit.TestConfig
import izumi.distage.testkit.docker.fixtures.PgSvcExample
import izumi.distage.testkit.scalatest.DistageBIOSpecScalatest
import izumi.distage.testkit.services.dstest.TestEnvironment.ParallelLevel
import izumi.fundamentals.platform.properties.EnvVarsCI
import izumi.logstage.api.Log
import zio.IO
Expand Down Expand Up @@ -35,8 +36,8 @@ abstract class DistageTestDockerBIO extends DistageBIOSpecScalatest[IO] {
override protected def config: TestConfig = super
.config.copy(
memoizationRoots = Set(DIKey.get[PgSvcExample]),
parallelTests = true,
parallelEnvs = true,
parallelTests = ParallelLevel.Unlimited,
parallelEnvs = ParallelLevel.Unlimited,
moduleOverrides = super.config.moduleOverrides overridenBy new ModuleDef { make[UUID].fromValue(UUID.randomUUID()) },
logLevel = Log.Level.Info,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ trait CatsDIEffectModule extends ModuleDef {
addImplicit[DIEffect[IO]]

make[DIEffectAsync[IO]].from {
(P0: Parallel[IO], T0: Timer[IO]) =>
(P0: Parallel[IO], T0: Timer[IO], C0: Concurrent[IO]) =>
implicit val P: Parallel[IO] = P0
implicit val T: Timer[IO] = T0
implicit val C: Concurrent[IO] = C0
DIEffectAsync[IO]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import distage.config.AppConfig
import distage.{Activation, BootstrapModule, DIKey, Module, StandardAxis}
import izumi.distage.framework.config.PlanningOptions
import izumi.distage.plugins.PluginConfig
import izumi.distage.testkit.services.dstest.TestEnvironment.ParallelLevel
import izumi.logstage.api.Log

/**
Expand Down Expand Up @@ -49,13 +50,13 @@ import izumi.logstage.api.Log
* Parallelism options:
*
*
* @param parallelEnvs Whether to run distinct memoization environments in parallel, default: `true`.
* @param parallelEnvs [[ParallelLevel]] of distinct memoization environments run, default: [[ParallelLevel.Unlimited]].
* Sequential envs will run in sequence after the parallel ones.
*
* @param parallelSuites Whether to run test suites in parallel, default: `true`.
* @param parallelSuites [[ParallelLevel]] of test suites run, default: [[ParallelLevel.Unlimited]].
* Sequential suites will run in sequence after the parallel ones.
*
* @param parallelTests Whether to run test cases in parallel, default: `true`.
* @param parallelTests [[ParallelLevel]] of test cases run, default: [[ParallelLevel.Unlimited]].
* Sequential tests will run in sequence after the parallel ones.
*
*
Expand Down Expand Up @@ -86,9 +87,9 @@ final case class TestConfig(
memoizationRoots: Set[_ <: DIKey] = Set.empty,
forcedRoots: Set[_ <: DIKey] = Set.empty,
// parallelism options
parallelEnvs: Boolean = true,
parallelSuites: Boolean = true,
parallelTests: Boolean = true,
parallelEnvs: ParallelLevel = ParallelLevel.Unlimited,
parallelSuites: ParallelLevel = ParallelLevel.Unlimited,
parallelTests: ParallelLevel = ParallelLevel.Unlimited,
// other options
configBaseName: String,
configOverrides: Option[AppConfig] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import izumi.distage.model.providers.ProviderMagnet
import izumi.distage.roles.services.EarlyLoggers
import izumi.distage.testkit.DebugProperties
import izumi.distage.testkit.services.dstest.DistageTestRunner._
import izumi.distage.testkit.services.dstest.TestEnvironment.{EnvExecutionParams, MemoizationEnvWithPlan, PreparedTest}
import izumi.distage.testkit.services.dstest.TestEnvironment.{EnvExecutionParams, MemoizationEnvWithPlan, ParallelLevel, PreparedTest}
import izumi.fundamentals.platform.cli.model.raw.RawAppArgs
import izumi.fundamentals.platform.functional.Identity
import izumi.fundamentals.platform.integration.ResourceCheck
Expand All @@ -39,9 +39,9 @@ class DistageTestRunner[F[_]: TagK](
val envs = groupTests(tests)
logEnvironmentsInfo(envs)
try {
val (parallelEnvs, sequentialEnvs) = envs.partition(_._1.envExec.parallelEnvs)
proceedEnvs(parallel = true)(parallelEnvs)
proceedEnvs(parallel = false)(sequentialEnvs)
groupAndSortByParallelLevel(envs)(_._1.envExec.parallelEnvs).foreach {
case (level, parallelEnvs) => proceedEnvs(level)(parallelEnvs)
}
} finally reporter.endAll()
}

Expand Down Expand Up @@ -171,7 +171,7 @@ class DistageTestRunner[F[_]: TagK](
}
}

def proceedEnvs(parallel: Boolean)(envs: Map[MemoizationEnvWithPlan, Iterable[PreparedTest[F]]]): Unit = {
def proceedEnvs(parallel: ParallelLevel)(envs: Iterable[(MemoizationEnvWithPlan, Iterable[PreparedTest[F]])]): Unit = {
configuredForeach(parallel)(envs) {
case (MemoizationEnvWithPlan(envExec, integrationLogger, memoizationPlan, runtimePlan, memoizationInjector, _), tests) =>
val allEnvTests = tests.map(_.test)
Expand Down Expand Up @@ -454,36 +454,38 @@ class DistageTestRunner[F[_]: TagK](

protected def groupedConfiguredTraverse_[A](
l: Iterable[A]
)(getParallelismGroup: A => Boolean
)(getParallelismGroup: A => ParallelLevel
)(f: A => F[Unit]
)(implicit
F: DIEffect[F],
P: DIEffectAsync[F],
): F[Unit] = {
val (parallelEnvs, sequentialEnvs) = l.partition(getParallelismGroup)
if (sequentialEnvs.isEmpty) {
configuredTraverse_(parallel = true)(parallelEnvs)(f)
} else {
configuredTraverse_(parallel = true)(parallelEnvs)(f).flatMap {
_ =>
configuredTraverse_(parallel = false)(sequentialEnvs)(f)
}
F.traverse_(groupAndSortByParallelLevel(l)(getParallelismGroup)) {
case (level, l) => configuredTraverse_(level)(l)(f)
}
}

protected def configuredTraverse_[A](parallel: Boolean)(l: Iterable[A])(f: A => F[Unit])(implicit F: DIEffect[F], P: DIEffectAsync[F]): F[Unit] = {
if (parallel) {
P.parTraverse_(l)(f)
} else {
F.traverse_(l)(f)
private[this] def groupAndSortByParallelLevel[A](l: Iterable[A])(getParallelismGroup: A => ParallelLevel): List[(ParallelLevel, Iterable[A])] = {
l.groupBy(getParallelismGroup).toList.sortBy {
case (ParallelLevel.Unlimited, _) => 1
case (ParallelLevel.Fixed(_), _) => 2
case (ParallelLevel.Sequential, _) => 3
}
}

protected def configuredForeach[A](parallelEnvs: Boolean)(environments: Iterable[A])(f: A => Unit): Unit = {
if (parallelEnvs && environments.size > 1) {
DIEffectAsync.diEffectParIdentity.parTraverse_(environments)(f)
} else {
environments.foreach(f)
protected def configuredTraverse_[A](parallel: ParallelLevel)(l: Iterable[A])(f: A => F[Unit])(implicit F: DIEffect[F], P: DIEffectAsync[F]): F[Unit] = {
parallel match {
case ParallelLevel.Fixed(n) if l.size > 1 => P.parTraverseN_(n)(l)(f)
case ParallelLevel.Unlimited if l.size > 1 => P.parTraverse_(l)(f)
case _ => F.traverse_(l)(f)
}
}

protected def configuredForeach[A](parallel: ParallelLevel)(environments: Iterable[A])(f: A => Unit): Unit = {
parallel match {
case ParallelLevel.Fixed(n) if environments.size > 1 => DIEffectAsync.diEffectParIdentity.parTraverseN_(n)(environments)(f)
case ParallelLevel.Unlimited if environments.size > 1 => DIEffectAsync.diEffectParIdentity.parTraverse_(environments)(f)
case _ => environments.foreach(f)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import izumi.distage.model.definition.Activation
import izumi.distage.model.plan.{OrderedPlan, TriSplittedPlan}
import izumi.distage.roles.model.meta.RolesInfo
import izumi.distage.testkit.services.dstest.DistageTestRunner.DistageTest
import izumi.distage.testkit.services.dstest.TestEnvironment.EnvExecutionParams
import izumi.distage.testkit.services.dstest.TestEnvironment.{EnvExecutionParams, ParallelLevel}
import izumi.logstage.api.{IzLogger, Log}

final case class TestEnvironment(
Expand All @@ -19,16 +19,16 @@ final case class TestEnvironment(
activation: Activation,
memoizationRoots: Set[DIKey],
forcedRoots: Set[DIKey],
parallelEnvs: Boolean,
parallelEnvs: ParallelLevel,
bootstrapFactory: BootstrapFactory,
configBaseName: String,
configOverrides: Option[AppConfig],
planningOptions: PlanningOptions,
logLevel: Log.Level,
)(// exclude from `equals` test-runner only parameters that do not affect the memoization plan and
// that are not used in [[DistageTestRunner.groupEnvs]] grouping to allow merging more envs
val parallelSuites: Boolean,
val parallelTests: Boolean,
val parallelSuites: ParallelLevel,
val parallelTests: ParallelLevel,
val debugOutput: Boolean,
) {
def getExecParams: EnvExecutionParams = {
Expand All @@ -41,9 +41,15 @@ final case class TestEnvironment(
}

object TestEnvironment {
sealed trait ParallelLevel
object ParallelLevel {
final case class Fixed(n: Int) extends ParallelLevel
case object Unlimited extends ParallelLevel
case object Sequential extends ParallelLevel
}

final case class EnvExecutionParams(
parallelEnvs: Boolean,
parallelEnvs: ParallelLevel,
planningOptions: PlanningOptions,
logLevel: Log.Level,
)
Expand Down
Loading

0 comments on commit 4ba0a77

Please sign in to comment.