diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2492b0b..c82e925 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,6 +24,9 @@ jobs: distribution: 'oracle' cache: 'sbt' + - name: Check code formatting + run: sbt check + - name: build ${{ matrix.scala }} run: sbt ++${{ matrix.scala }} clean coverage test diff --git a/.scalafix.conf b/.scalafix.conf new file mode 100644 index 0000000..3d1bd2d --- /dev/null +++ b/.scalafix.conf @@ -0,0 +1,6 @@ +rules = [OrganizeImports] + +OrganizeImports { + preset = INTELLIJ_2020_3 + targetDialect = Auto +} \ No newline at end of file diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..0671409 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,19 @@ +version = 3.8.1 + +maxColumn = 120 + +preset = default +align.preset = none + +trailingCommas = always + +continuationIndent { + callSite = 2 + defnSite = 2 +} + +rewrite.rules = [ + RedundantBraces, RedundantParens, SortModifiers, prefercurlyfors +] + +runner.dialect = Scala213Source3 diff --git a/build.sbt b/build.sbt index a02674b..7718f1e 100644 --- a/build.sbt +++ b/build.sbt @@ -8,13 +8,21 @@ organizationHomepage := Some(url("https://evolution.com")) homepage := Some(url("https://github.com/evolution-gaming/resource-pool")) startYear := Some(2023) -crossScalaVersions := Seq("2.13.14") -scalaVersion := crossScalaVersions.value.head -scalacOptions := Seq( - "-release:17", - "-Xsource:3", - "-deprecation", +inThisBuild( + List( + crossScalaVersions := Seq("2.13.14"), + scalaVersion := crossScalaVersions.value.head, + scalacOptions := Seq( + "-release:17", + "-Xsource:3", + "-deprecation", + "-Wunused:imports", + ), + semanticdbEnabled := true, + semanticdbVersion := scalafixSemanticdb.revision, + ), ) + releaseCrossBuild := true autoAPIMappings := true versionScheme := Some("early-semver") @@ -29,3 +37,7 @@ libraryDependencies ++= Seq( licenses := Seq(("MIT", url("https://opensource.org/licenses/MIT"))) addCommandAlias("build", "all compile test") + +// https://github.com/scalacenter/scalafix/issues/1488 +addCommandAlias("check", "scalafixAll --check; all scalafmtCheckAll scalafmtSbtCheck") +addCommandAlias("fix", "scalafixAll; all scalafmtAll scalafmtSbt") diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 95da5ac..15c2edf 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,7 +1,7 @@ import sbt._ object Dependencies { - val scalatest = "org.scalatest" %% "scalatest" % "3.2.18" % Test - val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.3" - val `cats-effect` = "org.typelevel" %% "cats-effect" % "3.5.4" + val scalatest = "org.scalatest" %% "scalatest" % "3.2.18" % Test + val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.3" + val `cats-effect` = "org.typelevel" %% "cats-effect" % "3.5.4" } diff --git a/project/plugins.sbt b/project/plugins.sbt index 1957cc0..f3488a3 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -6,4 +6,8 @@ addSbtPlugin("com.github.sbt" % "sbt-release" % "1.4.0") addSbtPlugin("com.evolution" % "sbt-scalac-opts-plugin" % "0.0.9") -addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2") \ No newline at end of file +addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2") + +addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.12.1") + +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2") diff --git a/src/main/scala/com/evolution/resourcepool/IntHelper.scala b/src/main/scala/com/evolution/resourcepool/IntHelper.scala index 9b672fc..7d23db5 100644 --- a/src/main/scala/com/evolution/resourcepool/IntHelper.scala +++ b/src/main/scala/com/evolution/resourcepool/IntHelper.scala @@ -6,7 +6,7 @@ private[resourcepool] object IntHelper { implicit class IntOpsIntHelper(val self: Int) extends AnyVal { - def divide(value: Int): List[Int] = { + def divide(value: Int): List[Int] = if (value <= 0 || self <= 0) { List.empty } else if (value >= self) { @@ -14,21 +14,17 @@ private[resourcepool] object IntHelper { } else if (value == 1) { List(self) } else { - val quotient = (self.toDouble / value) - .round - .toInt + val quotient = (self.toDouble / value).round.toInt @tailrec - def loop(self: Int, value: Int, result: List[Int]): List[Int] = { + def loop(self: Int, value: Int, result: List[Int]): List[Int] = if (self > quotient && value > 1) { loop(self - quotient, value - 1, quotient :: result) } else { (self :: result).reverse } - } loop(self, value, List.empty) } - } } } diff --git a/src/main/scala/com/evolution/resourcepool/ResourcePool.scala b/src/main/scala/com/evolution/resourcepool/ResourcePool.scala index a71145c..84fcd7d 100644 --- a/src/main/scala/com/evolution/resourcepool/ResourcePool.scala +++ b/src/main/scala/com/evolution/resourcepool/ResourcePool.scala @@ -1,8 +1,8 @@ package com.evolution.resourcepool import cats.Functor +import cats.effect.* import cats.effect.kernel.Resource.ExitCase -import cats.effect.{Async, Deferred, MonadCancel, MonadCancelThrow, Ref, Resource, Sync, Temporal} import cats.effect.syntax.all.* import cats.syntax.all.* import com.evolution.resourcepool.IntHelper.* @@ -16,14 +16,12 @@ trait ResourcePool[F[_], A] { /** Returns the acquired resource, and a handle releasing it back to the pool. * - * Calling the handle will not release resource itself, but just make it - * available to be returned again, though resource may expire and be released - * if it stays unused for long enough. + * Calling the handle will not release resource itself, but just make it available to be returned again, though + * resource may expire and be released if it stays unused for long enough. * - * The resource leak may occur if the release handle is never called. - * Therefore it is recommended to use - * [[ResourcePool.ResourcePoolOps#resource]] method instead, which will - * return [[cats.effect.Resource]] calling the handle on release. + * The resource leak may occur if the release handle is never called. Therefore it is recommended to use + * [[ResourcePool.ResourcePoolOps#resource]] method instead, which will return [[cats.effect.Resource]] calling the + * handle on release. */ def get: F[(A, Release[F])] } @@ -34,30 +32,27 @@ object ResourcePool { type Id = String - /** Same as [[of[F[_],A](maxSize:Int,partitions:Int*]], but number of partitions is - * determined automatically by taking into account the number of available - * processors and expected pool size. + /** Same as [[of[F[_],A](maxSize:Int,partitions:Int*]], but number of partitions is determined automatically by taking + * into account the number of available processors and expected pool size. */ def of[F[_]: Async, A]( maxSize: Int, expireAfter: FiniteDuration, discardTasksOnRelease: Boolean, - resource: Id => Resource[F, A] + resource: Id => Resource[F, A], ): Resource[F, ResourcePool[F, A]] = { - def apply(maxSize: Int) = { + def apply(maxSize: Int) = for { - cpus <- Sync[F] - .delay { Runtime.getRuntime.availableProcessors() } - .toResource - result <- of( + cpus <- Sync[F].delay(Runtime.getRuntime.availableProcessors()).toResource + result <- of( maxSize = maxSize, partitions = (maxSize / 100).min(cpus), expireAfter, discardTasksOnRelease, - resource) + resource, + ) } yield result - } apply(maxSize.max(1)) } @@ -67,82 +62,70 @@ object ResourcePool { * @param maxSize * Maximum size of the whole pool. * @param partitions - * Number of partitions to be used. This number determines the count of the - * threads, that could access the pool in parallel, and also number of - * background processes removing the expiring entries. + * Number of partitions to be used. This number determines the count of the threads, that could access the pool in + * parallel, and also number of background processes removing the expiring entries. * @param expireAfter * Duration after which the resource should be removed if unused. * @param resource - * Factory for creating the new resources. `Id` is a unique identifier of a - * resource that could be used, for example, for logging purposes. + * Factory for creating the new resources. `Id` is a unique identifier of a resource that could be used, for + * example, for logging purposes. */ def of[F[_]: Async, A]( maxSize: Int, partitions: Int, expireAfter: FiniteDuration, discardTasksOnRelease: Boolean, - resource: Id => Resource[F, A] + resource: Id => Resource[F, A], ): Resource[F, ResourcePool[F, A]] = { def apply(maxSize: Int, partitions: Int) = { - def of(maxSize: Int)(resource: Id => Resource[F, A]) = { - of0( - maxSize, - expireAfter, - discardTasksOnRelease, - resource) - } + def of(maxSize: Int)(resource: Id => Resource[F, A]) = + of0(maxSize, expireAfter, discardTasksOnRelease, resource) if (partitions <= 1) { of(maxSize)(resource) } else { for { - ref <- Ref[F].of(0).toResource + ref <- Ref[F].of(0).toResource values <- maxSize .divide(partitions) .zipWithIndex - .parTraverse { case (maxSize, idx) => of(maxSize) { id => resource(s"$idx-$id") } } - values <- values - .toVector + .parTraverse { case (maxSize, idx) => of(maxSize)(id => resource(s"$idx-$id")) } + values <- values.toVector .pure[Resource[F, *]] - length = values.length - } yield { - new ResourcePool[F, A] { - def get = { - MonadCancel[F].uncancelable { poll => - ref - .modify { a => - val b = a + 1 - ( - if (b < length) b else 0, - a - ) - } - .flatMap { partition => - poll { - values - .apply(partition) - .get - } + length = values.length + } yield new ResourcePool[F, A] { + def get = + MonadCancel[F].uncancelable { poll => + ref + .modify { a => + val b = a + 1 + ( + if (b < length) b else 0, + a, + ) + } + .flatMap { partition => + poll { + values + .apply(partition) + .get } - } + } } - } } } } - apply( - maxSize = maxSize.max(1), - partitions = partitions.max(1)) + apply(maxSize = maxSize.max(1), partitions = partitions.max(1)) } private def of0[F[_]: Async, A]( maxSize: Int, expireAfter: FiniteDuration, discardTasksOnRelease: Boolean, - resource: Id => Resource[F, A] + resource: Id => Resource[F, A], ): Resource[F, ResourcePool[F, A]] = { type Id = Long @@ -155,32 +138,26 @@ object ResourcePool { def now = Temporal[F].realTime final case class Entry(value: A, release: F[Unit], timestamp: FiniteDuration) { - def renew: F[Entry] = now.map { now => copy(timestamp = now) } + def renew: F[Entry] = now.map(now => copy(timestamp = now)) } sealed trait State object State { - def empty: State = { - Allocated( - id = 0L, - entries = Map.empty, - stage = Allocated.Stage.free(List.empty), - releasing = Set.empty) - } + def empty: State = + Allocated(id = 0L, entries = Map.empty, stage = Allocated.Stage.free(List.empty), releasing = Set.empty) /** Resource pool is allocated. * * @param id - * Sequence number of a last resource allocated (used to generate an - * identifier for a next resource). + * Sequence number of a last resource allocated (used to generate an identifier for a next resource). * @param entries - * Allocated or allocating resources. `Some` means that resource is - * allocated, and `None` means allocating is in progress. + * Allocated or allocating resources. `Some` means that resource is allocated, and `None` means allocating is + * in progress. * @param stage - * Represents a state of a pool, i.e. if it is fully busy, if there are - * free resources, and the tasks waiting for resources to be freed. + * Represents a state of a pool, i.e. if it is fully busy, if there are free resources, and the tasks waiting + * for resources to be freed. * @param releasing * List of ids being released because these have expired. */ @@ -188,7 +165,7 @@ object ResourcePool { id: Long, entries: Map[Id, Option[Entry]], stage: Allocated.Stage, - releasing: Set[Id] + releasing: Set[Id], ) extends State object Allocated { @@ -204,9 +181,8 @@ object ResourcePool { /** There are free resources to use. * * @param ids - * List of ids from [[Allocated#Entries]] that are free to use. It - * could be equal to `Nil` if all resources are busy, but there - * are no tasks waiting in queue. + * List of ids from [[Allocated#Entries]] that are free to use. It could be equal to `Nil` if all resources + * are busy, but there are no tasks waiting in queue. */ final case class Free(ids: Ids) extends Stage @@ -224,41 +200,36 @@ object ResourcePool { * @param allocated * Allocated resources. * @param releasing - * List of ids being released (either because pool is releasing or - * because these expired earlier). + * List of ids being released (either because pool is releasing or because these expired earlier). * @param tasks - * The list of tasks left to be completed before the pool could be - * released. + * The list of tasks left to be completed before the pool could be released. * @param released - * `Deferred`, which will be completed, when all the tasks are - * completed and all resources are released. + * `Deferred`, which will be completed, when all the tasks are completed and all resources are released. */ final case class Released( allocated: Set[Id], releasing: Set[Id], tasks: Tasks, - released: Deferred[F, Either[Throwable, Unit]] + released: Deferred[F, Either[Throwable, Unit]], ) extends State } for { ref <- Resource.make { - Ref[F].of { State.empty } + Ref[F].of(State.empty) } { ref => 0.tailRecM { count => - ref - .access + ref.access .flatMap { case (state: State.Allocated, set) => Deferred .apply[F, Either[Throwable, Unit]] .flatMap { released => - - def apply(allocated: Set[Id], releasing: Set[Id], tasks: Tasks)(effect: => F[Unit]) = { + def apply(allocated: Set[Id], releasing: Set[Id], tasks: Tasks)(effect: => F[Unit]) = set - .apply { State.Released(allocated = allocated, releasing = releasing, tasks, released) } + .apply(State.Released(allocated = allocated, releasing = releasing, tasks, released)) .flatMap { - case true => + case true => for { result <- { if (allocated.isEmpty && releasing.isEmpty) { @@ -270,28 +241,22 @@ object ResourcePool { // the pool will be released elsewhere when all resources in `allocated` or // `releasing` get released effect.productR { - released - .get - .rethrow + released.get.rethrow } } } - } yield { - result.asRight[Int] - } + } yield result.asRight[Int] case false => (count + 1) .asLeft[Unit] .pure[F] } .uncancelable - } state.stage match { case stage: State.Allocated.Stage.Free => // glue `release` functions of all free resources together - val (entries, releasing, release) = stage - .ids + val (entries, releasing, release) = stage.ids .foldLeft((state.entries, state.releasing, ().pure[F])) { case ((entries, releasing, release), id) => entries @@ -301,7 +266,7 @@ object ResourcePool { } { case Some(entry) => (entries - id, releasing + id, release.productR(entry.release)) - case None => + case None => (entries, releasing, release) } } @@ -309,7 +274,7 @@ object ResourcePool { apply( allocated = entries.keySet, releasing = releasing, - Queue.empty + Queue.empty, ) { release } @@ -319,10 +284,9 @@ object ResourcePool { apply( allocated = state.entries.keySet, releasing = state.releasing, - Queue.empty + Queue.empty, ) { - stage - .tasks + stage.tasks .foldMapM { task => task .complete(ReleasedError.asLeft) @@ -333,7 +297,7 @@ object ResourcePool { apply( allocated = state.entries.keySet, releasing = state.releasing, - stage.tasks + stage.tasks, ) { ().pure[F] } @@ -342,31 +306,26 @@ object ResourcePool { } case (state: State.Released, _) => - state - .released - .get - .rethrow - .map { _.asRight[Int] } + state.released.get.rethrow + .map(_.asRight[Int]) } } } - _ <- Async[F].background { + _ <- Async[F].background { val interval = expireAfter / 10 for { _ <- Temporal[F].sleep(expireAfter) a <- Async[F].foreverM[Unit, Unit] { for { - now <- now - threshold = now - expireAfter - result <- 0.tailRecM { count => - ref - .access + now <- now + threshold = now - expireAfter + result <- 0.tailRecM { count => + ref.access .flatMap { case (state: State.Allocated, set) => state.stage match { case stage: State.Allocated.Stage.Free => - val (ids, entries, releasing, release) = stage - .ids + val (ids, entries, releasing, release) = stage.ids .foldLeft((List.empty[Id], state.entries, state.releasing, ().pure[F])) { case ((ids, entries, releasing, release), id) => entries @@ -380,7 +339,7 @@ object ResourcePool { } else { (id :: ids, entries, releasing, release) } - case None => + case None => (ids, entries, releasing, release) } } @@ -388,13 +347,14 @@ object ResourcePool { set .apply { state.copy( - entries = entries, - stage = stage.copy(ids = ids.reverse), - releasing = releasing) + entries = entries, + stage = stage.copy(ids = ids.reverse), + releasing = releasing, + ) } .flatMap { - case true => - release.map { _.asRight[Int] } + case true => + release.map(_.asRight[Int]) case false => (count + 1) .asLeft[Unit] @@ -423,252 +383,216 @@ object ResourcePool { new ResourcePool[F, A] { def get = { - def entryAdd(id: Id, entry: Entry) = { + def entryAdd(id: Id, entry: Entry) = 0.tailRecM { count => - ref - .access + ref.access .flatMap { case (state: State.Allocated, set) => set - .apply { state.copy(entries = state.entries.updated(id, entry.some)) } + .apply(state.copy(entries = state.entries.updated(id, entry.some))) .map { - case true => ().asRight[Int] + case true => ().asRight[Int] case false => (count + 1).asLeft[Unit] } - case (_: State.Released, _) => + case (_: State.Released, _) => () .asRight[Int] .pure[F] } } - } - - def entryRemove(id: Id, error: Throwable) = { - ref - .modify { - case state: State.Allocated => - - val entries = state.entries - id - - def stateOf(stage: State.Allocated.Stage) = { - state.copy( - entries = entries, - stage = stage) - } - if (entries.isEmpty) { - state.stage match { - case stage: State.Allocated.Stage.Free => - ( - stateOf(stage), - ().pure[F] - ) - case stage: State.Allocated.Stage.Busy => - ( - stateOf(State.Allocated.Stage.free(List.empty)), - stage - .tasks - .foldMapM { task => - task - .complete(error.asLeft) - .void - } - ) - } - } else { - ( - stateOf(state.stage), - ().pure[F] - ) - } - - case state: State.Released => - - val allocated = state.allocated - id - - def stateOf(tasks: Tasks) = { - state.copy( - allocated = allocated, - tasks = tasks) + def entryRemove(id: Id, error: Throwable) = + ref.modify { + case state: State.Allocated => + val entries = state.entries - id + + def stateOf(stage: State.Allocated.Stage) = + state.copy(entries = entries, stage = stage) + + if (entries.isEmpty) { + state.stage match { + case stage: State.Allocated.Stage.Free => + ( + stateOf(stage), + ().pure[F], + ) + case stage: State.Allocated.Stage.Busy => + ( + stateOf(State.Allocated.Stage.free(List.empty)), + stage.tasks + .foldMapM { task => + task + .complete(error.asLeft) + .void + }, + ) } + } else { + ( + stateOf(state.stage), + ().pure[F], + ) + } - if (allocated.isEmpty) { - ( - stateOf(Queue.empty), - state - .tasks - .foldMapM { task => - task - .complete(error.asLeft) + case state: State.Released => + val allocated = state.allocated - id + + def stateOf(tasks: Tasks) = + state.copy(allocated = allocated, tasks = tasks) + + if (allocated.isEmpty) { + ( + stateOf(Queue.empty), + state.tasks + .foldMapM { task => + task + .complete(error.asLeft) + .void + } + .productR { + if (state.releasing.isEmpty) { + state.released + .complete(().asRight) .void + } else { + ().pure[F] } - .productR { - if (state.releasing.isEmpty) { - state - .released - .complete(().asRight) - .void - } else { - ().pure[F] - } - } - ) - } else { - ( - stateOf(state.tasks), - ().pure[F] - ) - } - } - .flatten - } + }, + ) + } else { + ( + stateOf(state.tasks), + ().pure[F], + ) + } + }.flatten - def entryRelease(id: Id, release: Release) = { + def entryRelease(id: Id, release: Release) = for { result <- release.attempt - result <- ref - .modify { - case state: State.Allocated => - ( - state.copy(releasing = state.releasing - id), - ().pure[F] - ) + result <- ref.modify { + case state: State.Allocated => + ( + state.copy(releasing = state.releasing - id), + ().pure[F], + ) - case state: State.Released => - val releasing = state.releasing - id - ( - state.copy(releasing = releasing), - result match { - case Right(a) => - if (releasing.isEmpty && state.allocated.isEmpty) { - // this was the last resource in a pool, - // we can release the pool itself now - state - .released - .complete(a.asRight) - .void - } else { - ().pure[F] - } - case Left(error) => - state - .released - .complete(error.asLeft) + case state: State.Released => + val releasing = state.releasing - id + ( + state.copy(releasing = releasing), + result match { + case Right(a) => + if (releasing.isEmpty && state.allocated.isEmpty) { + // this was the last resource in a pool, + // we can release the pool itself now + state.released + .complete(a.asRight) .void - } - ) - } - .flatten + } else { + ().pure[F] + } + case Left(error) => + state.released + .complete(error.asLeft) + .void + }, + ) + }.flatten } yield result - } - def releaseOf(id: Id, entry: Entry): Release = { + def releaseOf(id: Id, entry: Entry): Release = for { - entry <- entry.renew + entry <- entry.renew result <- ref .modify { case state: State.Allocated => + def stateOf(stage: State.Allocated.Stage) = + state.copy(entries = state.entries.updated(id, entry.some), stage = stage) - def stateOf(stage: State.Allocated.Stage) = { - state.copy( - entries = state.entries.updated(id, entry.some), - stage = stage) - } - - state - .stage match { + state.stage match { case stage: State.Allocated.Stage.Free => ( stateOf(stage.copy(ids = id :: stage.ids)), - ().pure[F] + ().pure[F], ) case stage: State.Allocated.Stage.Busy => - stage - .tasks - .dequeueOption + stage.tasks.dequeueOption .fold { ( stateOf(State.Allocated.Stage.free(List(id))), - ().pure[F] + ().pure[F], ) } { case (task, tasks) => ( stateOf(stage.copy(tasks = tasks)), task .complete((id, entry).asRight) - .void + .void, ) } } case state: State.Released => - state - .tasks - .dequeueOption + state.tasks.dequeueOption .fold { ( - state.copy( - allocated = state.allocated - id, - releasing = state.releasing + id), - entry.release + state.copy(allocated = state.allocated - id, releasing = state.releasing + id), + entry.release, ) } { case (task, tasks) => ( state.copy(tasks = tasks), task .complete((id, entry).asRight) - .void + .void, ) } } .flatten .uncancelable } yield result - } - def removeTask(task: Task) = { + def removeTask(task: Task) = ref.update { case state: State.Allocated => state.stage match { - case _: State.Allocated.Stage.Free => + case _: State.Allocated.Stage.Free => state case stage: State.Allocated.Stage.Busy => state.copy( stage = stage.copy( - tasks = stage - .tasks - .filter { _ ne task })) + tasks = stage.tasks + .filter(_ ne task), + ), + ) } case state: State.Released => state.copy(tasks = - state - .tasks - .filter { _ ne task }) + state.tasks + .filter(_ ne task), + ) } - } MonadCancel[F].uncancelable { poll => 0.tailRecM { count => - ref - .access + ref.access .flatMap { case (state: State.Allocated, set) => - - def apply[X](state: State.Allocated)(effect: => F[X]) = { + def apply[X](state: State.Allocated)(effect: => F[X]) = set .apply(state) .flatMap { - case true => - effect.map { _.asRight[Int] } + case true => + effect.map(_.asRight[Int]) case false => (count + 1) .asLeft[X] .pure[F] } - } - def enqueue(tasks: Tasks) = { + def enqueue(tasks: Tasks) = Deferred .apply[F, Either[Throwable, (Id, Entry)]] .flatMap { task => @@ -676,37 +600,37 @@ object ResourcePool { state.copy(stage = State.Allocated.Stage.busy(tasks.enqueue(task))) } { poll - .apply { task.get } - .onCancel { removeTask(task) } + .apply(task.get) + .onCancel(removeTask(task)) .rethrow .map { case (id, entry) => (entry.value, releaseOf(id, entry)) } } } - } state.stage match { case stage: State.Allocated.Stage.Free => stage.ids match { // there are free resources to use case id :: ids => - state - .entries + state.entries .get(id) .fold { - IllegalStateError(s"entry is not found, id: $id").raiseError[F, Either[Int, (A, Release)]] + IllegalStateError(s"entry is not found, id: $id") + .raiseError[F, Either[Int, (A, Release)]] } { entry => entry.fold { - IllegalStateError(s"entry is not defined, id: $id").raiseError[F, Either[Int, (A, Release)]] + IllegalStateError(s"entry is not defined, id: $id") + .raiseError[F, Either[Int, (A, Release)]] } { entry => - entry - .renew + entry.renew .flatMap { entry => apply { state.copy( stage = stage.copy(ids), - entries = state.entries.updated(id, entry.some)) + entries = state.entries.updated(id, entry.some), + ) } { (entry.value, releaseOf(id, entry)).pure[F] } @@ -721,9 +645,7 @@ object ResourcePool { // pool is not full, create a new resource val id = state.id apply { - state.copy( - id = id + 1, - entries = state.entries.updated(id, none)) + state.copy(id = id + 1, entries = state.entries.updated(id, none)) } { poll .apply { @@ -731,26 +653,23 @@ object ResourcePool { .apply(id.toString) .allocated } - .onCancel { entryRemove(id, CancelledError) } + .onCancel(entryRemove(id, CancelledError)) .attempt .flatMap { case Right((value, release)) => // resource was allocated for { - now <- now - entry = Entry( - value = value, - release = entryRelease(id, release) - .start - .void, - timestamp = now) - _ <- entryAdd(id, entry) - } yield { - (value, releaseOf(id, entry)) - } - case Left(a) => + now <- now + entry = Entry( + value = value, + release = entryRelease(id, release).start.void, + timestamp = now, + ) + _ <- entryAdd(id, entry) + } yield (value, releaseOf(id, entry)) + case Left(a) => // resource failed to allocate - entryRemove(id, a).productR { a.raiseError[F, Result] } + entryRemove(id, a).productR(a.raiseError[F, Result]) } } } else { @@ -773,9 +692,8 @@ object ResourcePool { } } - def const[F[_]: MonadCancelThrow, A](value: Resource[F, A]): ResourcePool[F, A] = { + def const[F[_]: MonadCancelThrow, A](value: Resource[F, A]): ResourcePool[F, A] = const(value.allocated) - } def const[F[_], A](value: F[(A, Release[F])]): ResourcePool[F, A] = { class Const @@ -790,41 +708,36 @@ object ResourcePool { final case class IllegalStateError(msg: String) extends RuntimeException(msg) with NoStackTrace - implicit class ResourcePoolOps[F[_], A](val self: ResourcePool[F, A]) extends AnyVal { - /** Returns a `Resource`, which, when allocated, will take a resource from a - * pool. + /** Returns a `Resource`, which, when allocated, will take a resource from a pool. * - * When the `Resource` is released then the underlying resource is released - * back to the pool. + * When the `Resource` is released then the underlying resource is released back to the pool. */ - def resource(implicit F: Functor[F]): Resource[F, A] = { + def resource(implicit F: Functor[F]): Resource[F, A] = Resource.applyFull { poll => poll - .apply { self.get } + .apply(self.get) .map { case (a, release) => (a, (_: ExitCase) => release) } } - } } object implicits { implicit class ResourceOpsResourcePool[F[_], A](val self: Resource[F, A]) extends AnyVal { - /** Same as [[of[F[_],A](maxSize:Int,expireAfter*]], but provides a - * shorter syntax to create a pool out of existing resource. + /** Same as [[of[F[_],A](maxSize:Int,expireAfter*]], but provides a shorter syntax to create a pool out of + * existing resource. */ def toResourcePool( maxSize: Int, expireAfter: FiniteDuration, )(implicit F: Async[F], - ): Resource[F, ResourcePool[F, A]] = { + ): Resource[F, ResourcePool[F, A]] = toResourcePool(maxSize, expireAfter, discardTasksOnRelease = false) - } - /** Same as [[of[F[_],A](maxSize:Int,expireAfter*]], but provides a - * shorter syntax to create a pool out of existing resource. + /** Same as [[of[F[_],A](maxSize:Int,expireAfter*]], but provides a shorter syntax to create a pool out of + * existing resource. */ def toResourcePool( maxSize: Int, @@ -832,16 +745,11 @@ object ResourcePool { discardTasksOnRelease: Boolean, )(implicit F: Async[F], - ): Resource[F, ResourcePool[F, A]] = { - ResourcePool.of( - maxSize, - expireAfter, - discardTasksOnRelease, - _ => self) - } + ): Resource[F, ResourcePool[F, A]] = + ResourcePool.of(maxSize, expireAfter, discardTasksOnRelease, _ => self) - /** Same as [[of[F[_],A](maxSize:Int,partitions:Int*]], but provides a - * shorter syntax to create a pool out of existing resource. + /** Same as [[of[F[_],A](maxSize:Int,partitions:Int*]], but provides a shorter syntax to create a pool out of + * existing resource. */ def toResourcePool( maxSize: Int, @@ -850,14 +758,8 @@ object ResourcePool { discardTasksOnRelease: Boolean, )(implicit F: Async[F], - ): Resource[F, ResourcePool[F, A]] = { - ResourcePool.of( - maxSize, - partitions, - expireAfter, - discardTasksOnRelease, - _ => self) - } + ): Resource[F, ResourcePool[F, A]] = + ResourcePool.of(maxSize, partitions, expireAfter, discardTasksOnRelease, _ => self) } } } diff --git a/src/test/scala/com/evolution/resourcepool/IntHelperTest.scala b/src/test/scala/com/evolution/resourcepool/IntHelperTest.scala index 4634002..8c49d49 100644 --- a/src/test/scala/com/evolution/resourcepool/IntHelperTest.scala +++ b/src/test/scala/com/evolution/resourcepool/IntHelperTest.scala @@ -1,7 +1,7 @@ package com.evolution.resourcepool -import org.scalatest.funsuite.AnyFunSuite import com.evolution.resourcepool.IntHelper.* +import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers class IntHelperTest extends AnyFunSuite with Matchers { @@ -15,12 +15,12 @@ class IntHelperTest extends AnyFunSuite with Matchers { (3, 2, List(2, 1)), (5, 3, List(2, 2, 1)), (5, 2, List(3, 2)), - (5, 4, List(1, 1, 1, 2))) - } { + (5, 4, List(1, 1, 1, 2)), + ) + } test(s"value: $value, divisor: $divisor, result: $result") { value.divide(divisor) shouldEqual result result.sum shouldEqual value result.size shouldEqual divisor.min(value) } - } } diff --git a/src/test/scala/com/evolution/resourcepool/ResourcePoolTest.scala b/src/test/scala/com/evolution/resourcepool/ResourcePoolTest.scala index ab2f0eb..eb40260 100644 --- a/src/test/scala/com/evolution/resourcepool/ResourcePoolTest.scala +++ b/src/test/scala/com/evolution/resourcepool/ResourcePoolTest.scala @@ -1,10 +1,9 @@ package com.evolution.resourcepool -import cats.effect.syntax.all.* -import cats.effect.{Concurrent, Deferred, IO, Ref, Resource, Temporal} +import cats.effect.* import cats.syntax.all.* -import com.evolution.resourcepool.util.IOSuite.* import com.evolution.resourcepool.ResourcePool.implicits.* +import com.evolution.resourcepool.util.IOSuite.* import org.scalatest.funsuite.AsyncFunSuite import org.scalatest.matchers.should.Matchers @@ -22,9 +21,8 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { expireAfter = 1.day, ) .use { pool => - pool - .resource - .use { _.pure[IO] } + pool.resource + .use(_.pure[IO]) } .run() } @@ -39,35 +37,36 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { } val result = for { - ref <- Ref[IO].of(List.empty[Action]) - add = (a: Action) => ref.update { a :: _ } - result <- ResourcePool + ref <- Ref[IO].of(List.empty[Action]) + add = (a: Action) => ref.update(a :: _) + result <- ResourcePool .of( maxSize = 1, expireAfter = 1.day, discardTasksOnRelease = false, - resource = _ => Resource.make { - add(Action.Acquire) - } { _ => - add(Action.Release) - } + resource = _ => + Resource.make { + add(Action.Acquire) + } { _ => + add(Action.Release) + }, ) .allocated (pool, release) = result actions <- ref.get - _ <- IO { actions.reverse shouldEqual List.empty } - _ <- pool.resource.use { _ => + _ <- IO(actions.reverse shouldEqual List.empty) + _ <- pool.resource.use { _ => for { actions <- ref.get - result <- IO { actions.reverse shouldEqual List(Action.Acquire) } + result <- IO(actions.reverse shouldEqual List(Action.Acquire)) } yield result } - _ <- pool.resource.use { _.pure[IO] } + _ <- pool.resource.use(_.pure[IO]) actions <- ref.get - _ <- IO { actions.reverse shouldEqual List(Action.Acquire) } - _ <- release + _ <- IO(actions.reverse shouldEqual List(Action.Acquire)) + _ <- release actions <- ref.get - _ <- IO { actions.reverse shouldEqual List(Action.Acquire, Action.Release) } + _ <- IO(actions.reverse shouldEqual List(Action.Acquire, Action.Release)) } yield {} result.run() @@ -78,33 +77,28 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { deferred0 <- Deferred[IO, Unit].toResource deferred1 <- Deferred[IO, Unit].toResource deferred2 <- Deferred[IO, Unit].toResource - deferreds = List(deferred0, deferred1) - ref <- Ref[IO].of(deferreds).toResource - pool <- { + deferreds = List(deferred0, deferred1) + ref <- Ref[IO].of(deferreds).toResource + pool <- { val result = for { result <- ref.modify { case a :: as => (as, a.some) - case as => (as, none) + case as => (as, none) } - result <- result.foldMapM { _.complete(()).void } - _ <- deferred2.get + result <- result.foldMapM(_.complete(()).void) + _ <- deferred2.get } yield result - result - .toResource - .toResourcePool( - maxSize = 2, - expireAfter = 1.day) + result.toResource + .toResourcePool(maxSize = 2, expireAfter = 1.day) } - } yield { - for { - fiber0 <- pool.resource.use { _.pure[IO] }.start - fiber1 <- pool.resource.use { _.pure[IO] }.start - _ <- deferreds.foldMapM { _.get } - _ <- deferred2.complete(()) - _ <- fiber0.join - _ <- fiber1.join - } yield {} - } + } yield for { + fiber0 <- pool.resource.use(_.pure[IO]).start + fiber1 <- pool.resource.use(_.pure[IO]).start + _ <- deferreds.foldMapM(_.get) + _ <- deferred2.complete(()) + _ <- fiber0.join + _ <- fiber1.join + } yield {} resource .use(identity) @@ -113,71 +107,60 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { test("fail after being released") { val result = for { - result <- () + result <- () .pure[Resource[IO, *]] - .toResourcePool( - maxSize = 2, - expireAfter = 1.day) + .toResourcePool(maxSize = 2, expireAfter = 1.day) .allocated - (pool, release) = result - _ <- release - result <- pool.get.attempt - _ <- IO { result shouldEqual ResourcePool.ReleasedError.asLeft } + (pool, release) = result + _ <- release + result <- pool.get.attempt + _ <- IO(result shouldEqual ResourcePool.ReleasedError.asLeft) } yield {} result.run() } test("release gracefully") { val result = for { - ref <- Ref[IO].of(0) - result <- Resource - .release { ref.update { _ + 1 } } - .toResourcePool( - maxSize = 2, - expireAfter = 1.day) + ref <- Ref[IO].of(0) + result <- Resource + .release(ref.update(_ + 1)) + .toResourcePool(maxSize = 2, expireAfter = 1.day) .allocated - (pool, release0) = result - result <- pool.get - (_, release1) = result - result <- pool.get - (_, release2) = result - fiber0 <- pool - .resource - .use { _.pure[IO] } - .start - result <- fiber0 - .join + (pool, release0) = result + result <- pool.get + (_, release1) = result + result <- pool.get + (_, release2) = result + fiber0 <- pool.resource.use(_.pure[IO]).start + result <- fiber0.join .timeout(10.millis) .attempt - _ <- IO { result should matchPattern { case Left(_: TimeoutException) => } } - _ <- release1 - _ <- fiber0.joinWithNever - fiber1 <- release0.start - result <- fiber1 - .join + _ <- IO(result should matchPattern { case Left(_: TimeoutException) => }) + _ <- release1 + _ <- fiber0.joinWithNever + fiber1 <- release0.start + result <- fiber1.join .timeout(10.millis) .attempt - _ <- IO { result should matchPattern { case Left(_: TimeoutException) => } } - _ <- release2 - _ <- fiber1.joinWithNever - result <- ref.get - _ <- IO { result shouldEqual 2 } + _ <- IO(result should matchPattern { case Left(_: TimeoutException) => }) + _ <- release2 + _ <- fiber1.joinWithNever + result <- ref.get + _ <- IO(result shouldEqual 2) } yield {} result.run() } test("release empty pool") { val result = for { - ref <- Ref[IO].of(0) - _ <- ref - .update { _ + 1 } + ref <- Ref[IO].of(0) + _ <- ref + .update(_ + 1) .toResource - .toResourcePool( - maxSize = 2, - expireAfter = 1.day) - .use { _ => ().pure[IO] } + .toResourcePool(maxSize = 2, expireAfter = 1.day) + .use(_ => ().pure[IO]) result <- ref.get - _ <- IO { result shouldEqual 0 } + _ <- IO(result shouldEqual 0) } yield {} result.run() } @@ -185,74 +168,65 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { test("propagate release errors") { val error = new RuntimeException("error") with NoStackTrace val result = for { - deferred <- Deferred[IO, Unit] - ref <- Ref[IO].of(List(error.raiseError[IO, Unit], deferred.complete(()).void)) - result <- Resource + deferred <- Deferred[IO, Unit] + ref <- Ref[IO].of(List(error.raiseError[IO, Unit], deferred.complete(()).void)) + result <- Resource .release { - ref - .modify { - case a :: as => (as, a) - case as => (as, ().pure[IO]) - } - .flatten + ref.modify { + case a :: as => (as, a) + case as => (as, ().pure[IO]) + }.flatten } - .toResourcePool( - maxSize = 2, - expireAfter = 1.day) + .toResourcePool(maxSize = 2, expireAfter = 1.day) .allocated - (pool, release0) = result - result <- pool.get - (_, release1) = result - _ <- pool.resource.use { _.pure[IO] } - _ <- release1 - result <- release0.attempt - _ <- IO { result shouldEqual error.asLeft } - _ <- deferred.get - result <- ref.get - _ <- IO { result shouldEqual List.empty } + (pool, release0) = result + result <- pool.get + (_, release1) = result + _ <- pool.resource.use(_.pure[IO]) + _ <- release1 + result <- release0.attempt + _ <- IO(result shouldEqual error.asLeft) + _ <- deferred.get + result <- ref.get + _ <- IO(result shouldEqual List.empty) } yield {} result.run() } test("expire after use") { val result = for { - ref0 <- Ref[IO].of(0).toResource - ref1 <- Ref[IO].of(0).toResource + ref0 <- Ref[IO].of(0).toResource + ref1 <- Ref[IO].of(0).toResource deferred0 <- Deferred[IO, Unit].toResource deferred1 <- Deferred[IO, Unit].toResource - pool <- Resource + pool <- Resource .make { for { - a <- ref0.update { _ + 1 } + a <- ref0.update(_ + 1) _ <- deferred0.complete(()) } yield a } { _ => for { - a <- ref1.update { _ + 1 } + a <- ref1.update(_ + 1) _ <- deferred1.complete(()) } yield a } - .toResourcePool( - maxSize = 5, - expireAfter = 10.millis) - _ <- Concurrent[IO].background { - pool - .resource - .use { _ => IO.sleep(10.millis) } + .toResourcePool(maxSize = 5, expireAfter = 10.millis) + _ <- Concurrent[IO].background { + pool.resource + .use(_ => IO.sleep(10.millis)) .foreverM .void } - } yield { - for { - _ <- deferred0.get - _ <- pool.resource.use { _.pure[IO] } - _ <- deferred1.get - a <- ref0.get - _ <- IO { a shouldEqual 2 } - a <- ref1.get - _ <- IO { a shouldEqual 1 } - } yield {} - } + } yield for { + _ <- deferred0.get + _ <- pool.resource.use(_.pure[IO]) + _ <- deferred1.get + a <- ref0.get + _ <- IO(a shouldEqual 2) + a <- ref1.get + _ <- IO(a shouldEqual 1) + } yield {} result .use(identity) .run() @@ -261,24 +235,19 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { test("not exceed `maxSize`") { val maxSize = 2 val resource = for { - ref <- Ref[IO].of(0).toResource + ref <- Ref[IO].of(0).toResource pool <- ref - .update { _ + 1 } + .update(_ + 1) .toResource - .toResourcePool( - maxSize = maxSize, - expireAfter = 1.day) - } yield { - for { - _ <- pool - .resource - .use { _ => Temporal[IO].sleep(1.millis) } - .parReplicateA(maxSize * 100) - .map { _.combineAll } - result <- ref.get - _ <- IO { result shouldEqual maxSize } - } yield {} - } + .toResourcePool(maxSize = maxSize, expireAfter = 1.day) + } yield for { + _ <- pool.resource + .use(_ => Temporal[IO].sleep(1.millis)) + .parReplicateA(maxSize * 100) + .map(_.combineAll) + result <- ref.get + _ <- IO(result shouldEqual maxSize) + } yield {} resource .use(identity) @@ -288,26 +257,23 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { test("not exceed `maxSize` with partitioned pool") { val maxSize = 10 val resource = for { - ref <- Ref[IO].of(List.empty[String]).toResource + ref <- Ref[IO].of(List.empty[String]).toResource pool <- ResourcePool.of( maxSize = maxSize, partitions = 3, expireAfter = 1.day, discardTasksOnRelease = false, - resource = id => ref.update { id :: _ }.toResource + resource = id => ref.update(id :: _).toResource, ) - } yield { - for { - _ <- pool - .resource - .use { _ => Temporal[IO].sleep(1.millis) } - .parReplicateA(maxSize * 100) - .map { _.combineAll } - result <- ref.get - _ <- IO { result.size shouldEqual maxSize } - _ <- IO { result.toSet shouldEqual Set("2-3", "1-2", "1-0", "0-2", "1-1", "0-0", "2-2", "0-1", "2-0", "2-1") } - } yield {} - } + } yield for { + _ <- pool.resource + .use(_ => Temporal[IO].sleep(1.millis)) + .parReplicateA(maxSize * 100) + .map(_.combineAll) + result <- ref.get + _ <- IO(result.size shouldEqual maxSize) + _ <- IO(result.toSet shouldEqual Set("2-3", "1-2", "1-0", "0-2", "1-1", "0-0", "2-2", "0-1", "2-0", "2-1")) + } yield {} resource .use(identity) @@ -318,44 +284,33 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { val error = new RuntimeException("error") with NoStackTrace val result = for { deferred <- Deferred[IO, Throwable].toResource - pool <- ResourcePool.of( + pool <- ResourcePool.of( maxSize = 1, expireAfter = 1.day, discardTasksOnRelease = false, - resource = _ => { + resource = _ => for { a <- deferred.get.toResource a <- a.raiseError[IO, Unit].toResource - } yield a - } + } yield a, ) - } yield { - for { - fiber0 <- pool - .resource - .use { _.pure[IO] } - .start - result <- fiber0 - .joinWithNever - .timeout(10.millis) - .attempt - _ <- IO { result should matchPattern { case Left(_: TimeoutException) => } } - fiber1 <- pool - .resource - .use { _.pure[IO] } - .start - result <- fiber1 - .joinWithNever - .timeout(10.millis) - .attempt - _ <- IO { result should matchPattern { case Left(_: TimeoutException) => } } - _ <- deferred.complete(error) - result <- fiber0.joinWithNever.attempt - _ <- IO { result shouldEqual error.asLeft } - result <- fiber1.joinWithNever.attempt - _ <- IO { result shouldEqual error.asLeft } - } yield {} - } + } yield for { + fiber0 <- pool.resource.use(_.pure[IO]).start + result <- fiber0.joinWithNever + .timeout(10.millis) + .attempt + _ <- IO(result should matchPattern { case Left(_: TimeoutException) => }) + fiber1 <- pool.resource.use(_.pure[IO]).start + result <- fiber1.joinWithNever + .timeout(10.millis) + .attempt + _ <- IO(result should matchPattern { case Left(_: TimeoutException) => }) + _ <- deferred.complete(error) + result <- fiber0.joinWithNever.attempt + _ <- IO(result shouldEqual error.asLeft) + result <- fiber1.joinWithNever.attempt + _ <- IO(result shouldEqual error.asLeft) + } yield {} result .use(identity) .run() @@ -371,24 +326,15 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { ) .use { pool => for { - fiber0 <- pool - .resource - .use { _.pure[IO] } - .start - result <- fiber0 - .joinWithNever + fiber0 <- pool.resource.use(_.pure[IO]).start + result <- fiber0.joinWithNever .timeout(10.millis) .attempt - _ <- IO { result should matchPattern { case Left(_: TimeoutException) => } } - fiber1 <- pool - .resource - .use { _.pure[IO] } - .start - _ <- fiber0.cancel - result <- fiber1 - .joinWithNever - .attempt - _ <- IO { result shouldEqual ResourcePool.CancelledError.asLeft } + _ <- IO(result should matchPattern { case Left(_: TimeoutException) => }) + fiber1 <- pool.resource.use(_.pure[IO]).start + _ <- fiber0.cancel + result <- fiber1.joinWithNever.attempt + _ <- IO(result shouldEqual ResourcePool.CancelledError.asLeft) } yield {} } .run() @@ -396,34 +342,27 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { test("cancel `get` while allocating resource, maxSize = 2") { val result = for { - ref <- Ref[IO] + ref <- Ref[IO] .of(IO.never[Unit].some) .toResource pool <- ref .getAndSet(none) - .flatMap { _.foldA } + .flatMap(_.foldA) .toResource .toResourcePool( maxSize = 2, expireAfter = 1.day, ) - } yield { - for { - fiber0 <- pool - .resource - .use { _.pure[IO] } - .start - result <- fiber0 - .joinWithNever - .timeout(10.millis) - .attempt - _ <- IO { result should matchPattern { case Left(_: TimeoutException) => } } - _ <- pool - .resource - .use { _.pure[IO] } - _ <- fiber0.cancel - } yield {} - } + } yield for { + fiber0 <- pool.resource.use(_.pure[IO]).start + result <- fiber0.joinWithNever + .timeout(10.millis) + .attempt + _ <- IO(result should matchPattern { case Left(_: TimeoutException) => }) + _ <- pool.resource + .use(_.pure[IO]) + _ <- fiber0.cancel + } yield {} result .use(identity) .run() @@ -432,34 +371,24 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { test("cancel `get` while waiting in queue") { val result = for { deferred0 <- Deferred[IO, Unit].toResource - pool <- () + pool <- () .pure[Resource[IO, *]] - .toResourcePool( - maxSize = 1, - expireAfter = 1.day) - } yield { - for { - fiber0 <- pool - .resource - .use { _ => - deferred0 - .complete(()) - .productR { IO.never[Unit] } - } - .start - _ <- deferred0.get - fiber1 <- pool - .get - .start - result <- fiber1 - .joinWithNever - .timeout(10.millis) - .attempt - _ <- IO { result should matchPattern { case Left(_: TimeoutException) => } } - _ <- fiber1.cancel - _ <- fiber0.cancel - } yield {} - } + .toResourcePool(maxSize = 1, expireAfter = 1.day) + } yield for { + fiber0 <- pool.resource.use { _ => + deferred0 + .complete(()) + .productR(IO.never[Unit]) + }.start + _ <- deferred0.get + fiber1 <- pool.get.start + result <- fiber1.joinWithNever + .timeout(10.millis) + .attempt + _ <- IO(result should matchPattern { case Left(_: TimeoutException) => }) + _ <- fiber1.cancel + _ <- fiber0.cancel + } yield {} result .use(identity) .run() @@ -468,35 +397,24 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { test("cancel `resource` while waiting in queue") { val result = for { deferred0 <- Deferred[IO, Unit].toResource - pool <- () + pool <- () .pure[Resource[IO, *]] - .toResourcePool( - maxSize = 1, - expireAfter = 1.day) - } yield { - for { - fiber0 <- pool - .resource - .use { _ => - deferred0 - .complete(()) - .productR { IO.never[Unit] } - } - .start - _ <- deferred0.get - fiber1 <- pool - .resource - .use { _ => IO.never[Unit] } - .start - result <- fiber1 - .joinWithNever - .timeout(10.millis) - .attempt - _ <- IO { result should matchPattern { case Left(_: TimeoutException) => } } - _ <- fiber1.cancel - _ <- fiber0.cancel - } yield {} - } + .toResourcePool(maxSize = 1, expireAfter = 1.day) + } yield for { + fiber0 <- pool.resource.use { _ => + deferred0 + .complete(()) + .productR(IO.never[Unit]) + }.start + _ <- deferred0.get + fiber1 <- pool.resource.use(_ => IO.never[Unit]).start + result <- fiber1.joinWithNever + .timeout(10.millis) + .attempt + _ <- IO(result should matchPattern { case Left(_: TimeoutException) => }) + _ <- fiber1.cancel + _ <- fiber0.cancel + } yield {} result .use(identity) .run() @@ -506,29 +424,23 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { val result = for { deferred0 <- Deferred[IO, Unit] deferred1 <- Deferred[IO, Unit] - result <- deferred0 + result <- deferred0 .complete(()) - .productR { deferred1.get } + .productR(deferred1.get) .toResource - .toResourcePool( - maxSize = 1, - expireAfter = 1.day) + .toResourcePool(maxSize = 1, expireAfter = 1.day) .allocated (pool, release) = result - fiber0 <- pool - .resource - .use { _ => IO.never } - .start - result <- fiber0 - .joinWithNever + fiber0 <- pool.resource.use(_ => IO.never).start + result <- fiber0.joinWithNever .timeout(10.millis) .attempt - _ <- IO { result should matchPattern { case Left(_: TimeoutException) => } } - _ <- deferred0.get - fiber1 <- fiber0.cancel.start - _ <- deferred1.complete(()) - _ <- fiber1.join - _ <- release + _ <- IO(result should matchPattern { case Left(_: TimeoutException) => }) + _ <- deferred0.get + fiber1 <- fiber0.cancel.start + _ <- deferred1.complete(()) + _ <- fiber1.join + _ <- release } yield {} result.run() } @@ -536,33 +448,24 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { test("release before resource allocation is completed") { val result = for { deferred <- Deferred[IO, Unit] - result <- deferred - .get - .toResource - .toResourcePool( - maxSize = 1, - expireAfter = 1.day) + result <- deferred.get.toResource + .toResourcePool(maxSize = 1, expireAfter = 1.day) .allocated (pool, release) = result - fiber0 <- pool - .resource - .use { _.pure[IO] } - .start - result <- fiber0 - .joinWithNever + fiber0 <- pool.resource.use(_.pure[IO]).start + result <- fiber0.joinWithNever .timeout(10.millis) .attempt - _ <- IO { result should matchPattern { case Left(_: TimeoutException) => } } + _ <- IO(result should matchPattern { case Left(_: TimeoutException) => }) - fiber1 <- release.start - result <- fiber1 - .joinWithNever + fiber1 <- release.start + result <- fiber1.joinWithNever .timeout(10.millis) .attempt - _ <- IO { result should matchPattern { case Left(_: TimeoutException) => } } - _ <- deferred.complete(()) - _ <- fiber0.joinWithNever - _ <- fiber1.joinWithNever + _ <- IO(result should matchPattern { case Left(_: TimeoutException) => }) + _ <- deferred.complete(()) + _ <- fiber0.joinWithNever + _ <- fiber1.joinWithNever } yield {} result.run() } @@ -577,52 +480,46 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { } val result = for { - ref <- Ref[IO].of(List.empty[Action]).toResource - add = (action: Action) => ref.update { action :: _ } + ref <- Ref[IO].of(List.empty[Action]).toResource + add = (action: Action) => ref.update(action :: _) pool <- Resource .make { add(Action.Allocate) } { _ => add(Action.Release) } - .toResourcePool( - maxSize = 5, - expireAfter = 10.millis) + .toResourcePool(maxSize = 5, expireAfter = 10.millis) } yield { - val job = pool - .resource - .use { _ => IO.sleep(1.millis) } + val job = pool.resource + .use(_ => IO.sleep(1.millis)) .foreverM .void - def actionsOf(size: Int) = { + def actionsOf(size: Int) = 0.tailRecM { count => - ref - .get + ref.get .flatMap { actions => if (actions.size >= size || count >= 10) { - actions - .reverse + actions.reverse .asRight[Int] .pure[IO] } else { IO .sleep(10.millis) - .as { (count + 1).asLeft[Unit] } + .as((count + 1).asLeft[Unit]) } } } - } for { - fiber0 <- job.start - fiber1 <- job.start + fiber0 <- job.start + fiber1 <- job.start actions <- actionsOf(2) - _ <- IO { actions shouldEqual List(Action.Allocate, Action.Allocate) } - _ <- fiber1.cancel + _ <- IO(actions shouldEqual List(Action.Allocate, Action.Allocate)) + _ <- fiber1.cancel actions <- actionsOf(3) - _ <- IO { actions shouldEqual List(Action.Allocate, Action.Allocate, Action.Release) } - _ <- fiber0.cancel + _ <- IO(actions shouldEqual List(Action.Allocate, Action.Allocate, Action.Release)) + _ <- fiber0.cancel } yield {} } result @@ -632,37 +529,27 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { test("discard tasks on release") { val result = for { - result <- () + result <- () .pure[Resource[IO, *]] - .toResourcePool( - maxSize = 1, - expireAfter = 1.day, - discardTasksOnRelease = true) + .toResourcePool(maxSize = 1, expireAfter = 1.day, discardTasksOnRelease = true) .allocated (pool, release) = result - fiber = pool - .resource - .use { _ => IO.never[Unit] } - .start - fiber0 <- fiber - result <- fiber0 - .joinWithNever + fiber = pool.resource.use(_ => IO.never[Unit]).start + fiber0 <- fiber + result <- fiber0.joinWithNever .timeout(10.millis) .attempt - _ <- IO { result should matchPattern { case Left(_: TimeoutException) => } } - fiber1 <- fiber - result <- fiber1 - .joinWithNever + _ <- IO(result should matchPattern { case Left(_: TimeoutException) => }) + fiber1 <- fiber + result <- fiber1.joinWithNever .timeout(10.millis) .attempt - _ <- IO { result should matchPattern { case Left(_: TimeoutException) => } } - fiber2 <- release.start - result <- fiber1 - .joinWithNever - .attempt - _ <- IO { result shouldEqual ResourcePool.ReleasedError.asLeft } - _ <- fiber0.cancel - _ <- fiber2.joinWithNever + _ <- IO(result should matchPattern { case Left(_: TimeoutException) => }) + fiber2 <- release.start + result <- fiber1.joinWithNever.attempt + _ <- IO(result shouldEqual ResourcePool.ReleasedError.asLeft) + _ <- fiber0.cancel + _ <- fiber2.joinWithNever } yield {} result.run() } diff --git a/src/test/scala/com/evolution/resourcepool/util/IOSuite.scala b/src/test/scala/com/evolution/resourcepool/util/IOSuite.scala index f4d9a0a..7724eba 100644 --- a/src/test/scala/com/evolution/resourcepool/util/IOSuite.scala +++ b/src/test/scala/com/evolution/resourcepool/util/IOSuite.scala @@ -14,9 +14,8 @@ object IOSuite { implicit val executor: ExecutionContextExecutor = ExecutionContext.global - def runIO[A](io: IO[A], timeout: FiniteDuration = Timeout): Future[Succeeded.type] = { + def runIO[A](io: IO[A], timeout: FiniteDuration = Timeout): Future[Succeeded.type] = io.timeout(timeout).as(Succeeded).unsafeToFuture() - } implicit class IOOps[A](val self: IO[A]) extends AnyVal { @@ -25,8 +24,7 @@ object IOSuite { implicit class ResourceObjOps(val self: Resource.type) extends AnyVal { - def release[F[_]: Applicative](release: F[Unit]): Resource[F, Unit] = { + def release[F[_]: Applicative](release: F[Unit]): Resource[F, Unit] = Resource(((), release).pure[F]) - } } }