Skip to content

Commit

Permalink
Extract methods to make the code easier to read.
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Oct 16, 2023
1 parent b6fe6f0 commit b1d38be
Showing 1 changed file with 146 additions and 131 deletions.
277 changes: 146 additions & 131 deletions src/main/scala/com/evolution/resourcepool/ResourcePool.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.evolution.resourcepool

import cats.Functor
import cats.Monad
import cats.effect.{Async, Deferred, MonadCancel, MonadCancelThrow, Poll, Ref, Resource, Sync, Temporal}
import cats.effect.syntax.all._
import cats.syntax.all._
Expand Down Expand Up @@ -150,7 +151,10 @@ object ResourcePool {

final case class Entry(value: A, release: F[Unit], timestamp: FiniteDuration)

sealed trait State
sealed trait State {
def entryReleased(id: Id, entry: Entry): (State, F[Unit])
def cancelTask(task: Task): State
}

object State {

Expand Down Expand Up @@ -181,11 +185,43 @@ object ResourcePool {
entries: Map[Id, Option[Entry]],
stage: Allocated.Stage,
releasing: Set[Id]
) extends State
) extends State {

def incrementId: Allocated =
this.copy(
id = id + 1,
entries = this.entries.updated(id, none))

def entryReleased(id: Id, entry: Entry): (State, F[Unit]) = {

val (stage, task) = this.stage.putId(id)

val state = this.copy(
entries = this.entries.updated(id, entry.some),
stage = stage
)

val executeNextTask = task.traverse_ { task =>
task.complete((id, entry).asRight)
}

(state, executeNextTask)
}

def resourceReleased(id: Id): Allocated =
this.copy(releasing = releasing - id)

def cancelTask(task: Task): State =
this.copy(stage = stage.cancelTask(task))

}

object Allocated {

sealed trait Stage
sealed trait Stage {
def putId(id: Id): (Stage, Option[Task])
def cancelTask(task: Task): Stage
}

object Stage {

Expand All @@ -200,14 +236,43 @@ object ResourcePool {
* could be equal to `Nil` if all resources are busy, but there
* are no tasks waiting in queue.
*/
final case class Free(ids: Ids) extends Stage
final case class Free(ids: Ids) extends Stage {

def putId(id: Id): (Stage, Option[Task]) =
(this.copy(ids = id :: ids), None)

def takeId: Option[(Free, Id)] = ids match {
case id :: ids => Some((this.copy(ids = ids), id))
case Nil => None
}

def cancelTask(task: Task): Stage = this

}

/** No more free resources to use, and tasks are waiting in queue.
*
* @param tasks
* List of tasks waiting for resources to free up.
*/
final case class Busy(tasks: Tasks) extends Stage
final case class Busy(tasks: Tasks) extends Stage {

def putId(id: Id): (Stage, Option[Task]) =
this
.tasks
.dequeueOption
.fold {
(free(List(id)), none[Task])
} { case (task, tasks) =>
(this.copy(tasks = tasks), task.some)
}

def cancelTask(task: Task): Stage =
this.copy(
tasks = this
.tasks
.filter { _ ne task })
}
}
}

Expand All @@ -230,7 +295,59 @@ object ResourcePool {
releasing: Set[Id],
tasks: Tasks,
released: Deferred[F, Either[Throwable, Unit]]
) extends State
) extends State {

def tryRelease: F[Boolean] =
if (releasing.isEmpty && this.allocated.isEmpty) {
// this was the last resource in a pool,
// we can release the pool itself now
this
.released
.complete(().asRight)
.as(true)
} else {
false.pure[F]
}

def failRelease(e: Throwable): F[Unit] =
this
.released
.complete(e.asLeft)
.void

def waitForRelease: F[Either[Throwable, Unit]] =
this
.released
.get

def entryReleased(id: Id, entry: Entry): (State, F[Unit]) =
this
.tasks
.dequeueOption
.fold {
(
this.copy(
allocated = this.allocated - id,
releasing = this.releasing + id),
entry.release
)
} { case (task, tasks) =>
(
this.copy(tasks = tasks),
task.complete((id, entry).asRight).void
)
}

def resourceReleased(id: Id): Released =
this.copy(releasing = releasing - id)

def cancelTask(task: Task): State =
this.copy(tasks =
this
.tasks
.filter { _ ne task })

}
}

for {
Expand All @@ -248,27 +365,19 @@ object ResourcePool {
.flatMap { released =>

def apply(allocated: Set[Id], releasing: Set[Id], tasks: Tasks)(effect: => F[Unit]) = {
val state1 = State.Released(allocated = allocated, releasing = releasing, tasks, released)
set
.apply { State.Released(allocated = allocated, releasing = releasing, tasks, released) }
.apply(state1)
.flatMap {
case true =>
for {
result <- {
if (allocated.isEmpty && releasing.isEmpty) {
// the pool is empty now, we can safely release it
released
.complete(().asRight)
.void
} else {
success <- state1.tryRelease
result <-
Monad[F].whenA(!success) {
// the pool will be released elsewhere when all resources in `allocated` or
// `releasing` get released
effect.productR {
released
.get
.rethrow
}
effect.productR(state1.waitForRelease.rethrow)
}
}
} yield {
result.asRight[Int]
}
Expand Down Expand Up @@ -336,8 +445,7 @@ object ResourcePool {

case (state: State.Released, _) =>
state
.released
.get
.waitForRelease
.rethrow
.map { _.asRight[Int] }
}
Expand Down Expand Up @@ -416,70 +524,15 @@ object ResourcePool {
new ResourcePool[F, A] {
def get = {

def releaseOf(id: Id, entry: Entry): Release = {
def releaseOf(id: Id, entry: Entry): Release =
for {
timestamp <- now
entry <- entry.copy(timestamp = timestamp).pure[F]
result <- ref
.modify {
case state: State.Allocated =>

def stateOf(stage: State.Allocated.Stage) = {
state.copy(
entries = state.entries.updated(id, entry.some),
stage = stage)
}

state
.stage match {
case stage: State.Allocated.Stage.Free =>
(
stateOf(stage.copy(ids = id :: stage.ids)),
().pure[F]
)
case stage: State.Allocated.Stage.Busy =>
stage
.tasks
.dequeueOption
.fold {
(
stateOf(State.Allocated.Stage.free(List(id))),
().pure[F]
)
} { case (task, tasks) =>
(
stateOf(stage.copy(tasks = tasks)),
task
.complete((id, entry).asRight)
.void
)
}
}

case state: State.Released =>
state
.tasks
.dequeueOption
.fold {
(
state.copy(
allocated = state.allocated - id,
releasing = state.releasing + id),
entry.release
)
} { case (task, tasks) =>
(
state.copy(tasks = tasks),
task
.complete((id, entry).asRight)
.void
)
}
}
.modify(_.entryReleased(id, entry))
.flatten
.uncancelable
} yield result
}

0.tailRecM { count =>
ref
Expand Down Expand Up @@ -514,25 +567,7 @@ object ResourcePool {
poll
.apply { task.get }
.onCancel {
ref.update {
case state: State.Allocated =>
state.stage match {
case _: State.Allocated.Stage.Free =>
state
case stage: State.Allocated.Stage.Busy =>
state.copy(
stage = stage.copy(
tasks = stage
.tasks
.filter { _ ne task }))
}

case state: State.Released =>
state.copy(tasks =
state
.tasks
.filter { _ ne task })
}
ref.update(_.cancelTask(task))
}
.rethrow
.map { case (id, entry) =>
Expand All @@ -544,9 +579,9 @@ object ResourcePool {

state.stage match {
case stage: State.Allocated.Stage.Free =>
stage.ids match {
stage.takeId match {
// there are free resources to use
case id :: ids =>
case Some((stage, id)) =>
state
.entries
.get(id)
Expand All @@ -560,7 +595,7 @@ object ResourcePool {
val entry = entry0.copy(timestamp = timestamp)
apply {
state.copy(
stage = stage.copy(ids),
stage = stage,
entries = state.entries.updated(
id,
entry0
Expand All @@ -574,16 +609,12 @@ object ResourcePool {
}

// no free resources found
case Nil =>
case None =>
val entries = state.entries
if (entries.sizeCompare(maxSize) < 0) {
// pool is not full, create a new resource
val id = state.id
apply {
state.copy(
id = id + 1,
entries = state.entries.updated(id, none))
} { _ =>
apply(state.incrementId) { _ =>
resource
.apply(id.toString)
.allocated
Expand All @@ -600,33 +631,17 @@ object ResourcePool {
result <- release.attempt
result <- ref
.modify {
case state: State.Allocated =>
(
state.copy(releasing = state.releasing - id),
().pure[F]
)
case state0: State.Allocated =>
val state1 = state0.resourceReleased(id)
(state1, ().pure[F])

case state: State.Released =>
val releasing = state.releasing - id
case state0: State.Released =>
val state1 = state0.resourceReleased(id)
(
state.copy(releasing = releasing),
state1,
result match {
case Right(a) =>
if (releasing.isEmpty && state.allocated.isEmpty) {
// this was the last resource in a pool,
// we can release the pool itself now
state
.released
.complete(a.asRight)
.void
} else {
().pure[F]
}
case Left(a) =>
state
.released
.complete(a.asLeft)
.void
case Right(_) => state1.tryRelease.void
case Left(a) => state1.failRelease(a)
}
)
}
Expand Down

0 comments on commit b1d38be

Please sign in to comment.