From b1d38be8598898c9368c39f7714c2c0b22067474 Mon Sep 17 00:00:00 2001 From: Ruslans Tarasovs Date: Mon, 16 Oct 2023 15:28:00 +0300 Subject: [PATCH] Extract methods to make the code easier to read. --- .../evolution/resourcepool/ResourcePool.scala | 277 +++++++++--------- 1 file changed, 146 insertions(+), 131 deletions(-) diff --git a/src/main/scala/com/evolution/resourcepool/ResourcePool.scala b/src/main/scala/com/evolution/resourcepool/ResourcePool.scala index 9d1c056..37c2e9c 100644 --- a/src/main/scala/com/evolution/resourcepool/ResourcePool.scala +++ b/src/main/scala/com/evolution/resourcepool/ResourcePool.scala @@ -1,6 +1,7 @@ package com.evolution.resourcepool import cats.Functor +import cats.Monad import cats.effect.{Async, Deferred, MonadCancel, MonadCancelThrow, Poll, Ref, Resource, Sync, Temporal} import cats.effect.syntax.all._ import cats.syntax.all._ @@ -150,7 +151,10 @@ object ResourcePool { final case class Entry(value: A, release: F[Unit], timestamp: FiniteDuration) - sealed trait State + sealed trait State { + def entryReleased(id: Id, entry: Entry): (State, F[Unit]) + def cancelTask(task: Task): State + } object State { @@ -181,11 +185,43 @@ object ResourcePool { entries: Map[Id, Option[Entry]], stage: Allocated.Stage, releasing: Set[Id] - ) extends State + ) extends State { + + def incrementId: Allocated = + this.copy( + id = id + 1, + entries = this.entries.updated(id, none)) + + def entryReleased(id: Id, entry: Entry): (State, F[Unit]) = { + + val (stage, task) = this.stage.putId(id) + + val state = this.copy( + entries = this.entries.updated(id, entry.some), + stage = stage + ) + + val executeNextTask = task.traverse_ { task => + task.complete((id, entry).asRight) + } + + (state, executeNextTask) + } + + def resourceReleased(id: Id): Allocated = + this.copy(releasing = releasing - id) + + def cancelTask(task: Task): State = + this.copy(stage = stage.cancelTask(task)) + + } object Allocated { - sealed trait Stage + sealed trait Stage { + def putId(id: Id): (Stage, Option[Task]) + def cancelTask(task: Task): Stage + } object Stage { @@ -200,14 +236,43 @@ object ResourcePool { * could be equal to `Nil` if all resources are busy, but there * are no tasks waiting in queue. */ - final case class Free(ids: Ids) extends Stage + final case class Free(ids: Ids) extends Stage { + + def putId(id: Id): (Stage, Option[Task]) = + (this.copy(ids = id :: ids), None) + + def takeId: Option[(Free, Id)] = ids match { + case id :: ids => Some((this.copy(ids = ids), id)) + case Nil => None + } + + def cancelTask(task: Task): Stage = this + + } /** No more free resources to use, and tasks are waiting in queue. * * @param tasks * List of tasks waiting for resources to free up. */ - final case class Busy(tasks: Tasks) extends Stage + final case class Busy(tasks: Tasks) extends Stage { + + def putId(id: Id): (Stage, Option[Task]) = + this + .tasks + .dequeueOption + .fold { + (free(List(id)), none[Task]) + } { case (task, tasks) => + (this.copy(tasks = tasks), task.some) + } + + def cancelTask(task: Task): Stage = + this.copy( + tasks = this + .tasks + .filter { _ ne task }) + } } } @@ -230,7 +295,59 @@ object ResourcePool { releasing: Set[Id], tasks: Tasks, released: Deferred[F, Either[Throwable, Unit]] - ) extends State + ) extends State { + + def tryRelease: F[Boolean] = + if (releasing.isEmpty && this.allocated.isEmpty) { + // this was the last resource in a pool, + // we can release the pool itself now + this + .released + .complete(().asRight) + .as(true) + } else { + false.pure[F] + } + + def failRelease(e: Throwable): F[Unit] = + this + .released + .complete(e.asLeft) + .void + + def waitForRelease: F[Either[Throwable, Unit]] = + this + .released + .get + + def entryReleased(id: Id, entry: Entry): (State, F[Unit]) = + this + .tasks + .dequeueOption + .fold { + ( + this.copy( + allocated = this.allocated - id, + releasing = this.releasing + id), + entry.release + ) + } { case (task, tasks) => + ( + this.copy(tasks = tasks), + task.complete((id, entry).asRight).void + ) + } + + def resourceReleased(id: Id): Released = + this.copy(releasing = releasing - id) + + def cancelTask(task: Task): State = + this.copy(tasks = + this + .tasks + .filter { _ ne task }) + + } } for { @@ -248,27 +365,19 @@ object ResourcePool { .flatMap { released => def apply(allocated: Set[Id], releasing: Set[Id], tasks: Tasks)(effect: => F[Unit]) = { + val state1 = State.Released(allocated = allocated, releasing = releasing, tasks, released) set - .apply { State.Released(allocated = allocated, releasing = releasing, tasks, released) } + .apply(state1) .flatMap { case true => for { - result <- { - if (allocated.isEmpty && releasing.isEmpty) { - // the pool is empty now, we can safely release it - released - .complete(().asRight) - .void - } else { + success <- state1.tryRelease + result <- + Monad[F].whenA(!success) { // the pool will be released elsewhere when all resources in `allocated` or // `releasing` get released - effect.productR { - released - .get - .rethrow - } + effect.productR(state1.waitForRelease.rethrow) } - } } yield { result.asRight[Int] } @@ -336,8 +445,7 @@ object ResourcePool { case (state: State.Released, _) => state - .released - .get + .waitForRelease .rethrow .map { _.asRight[Int] } } @@ -416,70 +524,15 @@ object ResourcePool { new ResourcePool[F, A] { def get = { - def releaseOf(id: Id, entry: Entry): Release = { + def releaseOf(id: Id, entry: Entry): Release = for { timestamp <- now entry <- entry.copy(timestamp = timestamp).pure[F] result <- ref - .modify { - case state: State.Allocated => - - def stateOf(stage: State.Allocated.Stage) = { - state.copy( - entries = state.entries.updated(id, entry.some), - stage = stage) - } - - state - .stage match { - case stage: State.Allocated.Stage.Free => - ( - stateOf(stage.copy(ids = id :: stage.ids)), - ().pure[F] - ) - case stage: State.Allocated.Stage.Busy => - stage - .tasks - .dequeueOption - .fold { - ( - stateOf(State.Allocated.Stage.free(List(id))), - ().pure[F] - ) - } { case (task, tasks) => - ( - stateOf(stage.copy(tasks = tasks)), - task - .complete((id, entry).asRight) - .void - ) - } - } - - case state: State.Released => - state - .tasks - .dequeueOption - .fold { - ( - state.copy( - allocated = state.allocated - id, - releasing = state.releasing + id), - entry.release - ) - } { case (task, tasks) => - ( - state.copy(tasks = tasks), - task - .complete((id, entry).asRight) - .void - ) - } - } + .modify(_.entryReleased(id, entry)) .flatten .uncancelable } yield result - } 0.tailRecM { count => ref @@ -514,25 +567,7 @@ object ResourcePool { poll .apply { task.get } .onCancel { - ref.update { - case state: State.Allocated => - state.stage match { - case _: State.Allocated.Stage.Free => - state - case stage: State.Allocated.Stage.Busy => - state.copy( - stage = stage.copy( - tasks = stage - .tasks - .filter { _ ne task })) - } - - case state: State.Released => - state.copy(tasks = - state - .tasks - .filter { _ ne task }) - } + ref.update(_.cancelTask(task)) } .rethrow .map { case (id, entry) => @@ -544,9 +579,9 @@ object ResourcePool { state.stage match { case stage: State.Allocated.Stage.Free => - stage.ids match { + stage.takeId match { // there are free resources to use - case id :: ids => + case Some((stage, id)) => state .entries .get(id) @@ -560,7 +595,7 @@ object ResourcePool { val entry = entry0.copy(timestamp = timestamp) apply { state.copy( - stage = stage.copy(ids), + stage = stage, entries = state.entries.updated( id, entry0 @@ -574,16 +609,12 @@ object ResourcePool { } // no free resources found - case Nil => + case None => val entries = state.entries if (entries.sizeCompare(maxSize) < 0) { // pool is not full, create a new resource val id = state.id - apply { - state.copy( - id = id + 1, - entries = state.entries.updated(id, none)) - } { _ => + apply(state.incrementId) { _ => resource .apply(id.toString) .allocated @@ -600,33 +631,17 @@ object ResourcePool { result <- release.attempt result <- ref .modify { - case state: State.Allocated => - ( - state.copy(releasing = state.releasing - id), - ().pure[F] - ) + case state0: State.Allocated => + val state1 = state0.resourceReleased(id) + (state1, ().pure[F]) - case state: State.Released => - val releasing = state.releasing - id + case state0: State.Released => + val state1 = state0.resourceReleased(id) ( - state.copy(releasing = releasing), + state1, result match { - case Right(a) => - if (releasing.isEmpty && state.allocated.isEmpty) { - // this was the last resource in a pool, - // we can release the pool itself now - state - .released - .complete(a.asRight) - .void - } else { - ().pure[F] - } - case Left(a) => - state - .released - .complete(a.asLeft) - .void + case Right(_) => state1.tryRelease.void + case Left(a) => state1.failRelease(a) } ) }