Skip to content

Commit

Permalink
add support of cancellation of get, task will be removed from the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
t3hnar committed Sep 28, 2023
1 parent b254b06 commit a80e69a
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 24 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Pool of [cats-effect](https://typelevel.org/cats-effect/) resources
* uses first-in-first-out queue for tasks
* shuts down gracefully after completing accumulated tasks
* tolerates resource failures
* supports cancellation of `resouce.use`
* supports cancellation

## Example

Expand Down
60 changes: 42 additions & 18 deletions src/main/scala/com/evolution/resourcepool/ResourcePool.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.evolution.resourcepool

import cats.Functor
import cats.effect.{Async, MonadCancelThrow, Resource, Temporal, Deferred, Ref, Sync}
import cats.effect.{Async, Deferred, MonadCancel, MonadCancelThrow, Poll, Ref, Resource, Sync, Temporal}
import cats.effect.syntax.all._
import cats.syntax.all._
import com.evolution.resourcepool.IntHelper._
Expand Down Expand Up @@ -466,18 +466,21 @@ object ResourcePool {
.flatMap {
case (state: State.Allocated, set) =>

def apply[X](state: State.Allocated)(effect: => F[X]) = {
set
.apply(state)
.flatMap {
case true =>
effect.map { _.asRight[Int] }
case false =>
(count + 1)
.asLeft[X]
.pure[F]
}
.uncancelable
def apply[X](state: State.Allocated)(effect: Poll[F] => F[X]) = {
MonadCancel[F].uncancelable { poll =>
set
.apply(state)
.flatMap {
case true =>
effect
.apply(poll)
.map { _.asRight[Int] }
case false =>
(count + 1)
.asLeft[X]
.pure[F]
}
}
}

def enqueue(tasks: Tasks) = {
Expand All @@ -486,9 +489,30 @@ object ResourcePool {
.flatMap { task =>
apply {
state.copy(stage = State.Allocated.Stage.busy(tasks.enqueue(task)))
} {
task
.get
} { poll =>
poll
.apply { task.get }
.onCancel {
ref.update {
case state: State.Allocated =>
state.stage match {
case _: State.Allocated.Stage.Free =>
state
case stage: State.Allocated.Stage.Busy =>
state.copy(
stage = stage.copy(
tasks = stage
.tasks
.filter { _ ne task }))
}

case state: State.Released =>
state.copy(tasks =
state
.tasks
.filter { _ ne task })
}
}
.rethrow
.map { case (id, entry) =>
(entry.value, releaseOf(id, entry))
Expand Down Expand Up @@ -521,7 +545,7 @@ object ResourcePool {
entry0
.copy(timestamp = timestamp)
.some))
} {
} { _ =>
(entry0.value, releaseOf(id, entry)).pure[F]
}
}
Expand All @@ -538,7 +562,7 @@ object ResourcePool {
state.copy(
id = id + 1,
entries = state.entries.updated(id, none))
} {
} { _ =>
resource
.apply(id.toString)
.allocated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,17 +415,15 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers {
.start
_ <- deferred0.get
fiber1 <- pool
.resource
.use { _ => IO.never[Unit] }
.get
.start
result <- fiber1
.joinWithNever
.timeout(10.millis)
.timeout(100.millis)
.attempt
_ <- IO { result should matchPattern { case Left(_: TimeoutException) => } }
fiber2 <- fiber1.cancel.start
_ <- fiber1.cancel
_ <- fiber0.cancel
_ <- fiber2.joinWithNever
} yield {}
}
result
Expand Down

0 comments on commit a80e69a

Please sign in to comment.