Skip to content

Commit

Permalink
test cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
t3hnar committed Sep 27, 2023
1 parent 2388acf commit b254b06
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 54 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Pool of [cats-effect](https://typelevel.org/cats-effect/) resources
* uses first-in-first-out queue for tasks
* shuts down gracefully after completing accumulated tasks
* tolerates resource failures
* supports cancellation of `resouce.use`

## Example

Expand Down Expand Up @@ -44,5 +45,5 @@ connection
```scala
addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2")

libraryDependencies += "com.evolution" %% "resource-pool" % "1.0.0"
libraryDependencies += "com.evolution" %% "resource-pool" % "1.0.2"
```
44 changes: 17 additions & 27 deletions src/main/scala/com/evolution/resourcepool/ResourcePool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ trait ResourcePool[F[_], A] {
def get: F[(A, Release[F])]
}

/**
* TODO
* * cancellable
*/
object ResourcePool {

type Release[F[_]] = F[Unit]
Expand Down Expand Up @@ -361,28 +357,22 @@ object ResourcePool {
}
}

if (stage.ids.sizeCompare(ids) == 0) {
()
.asRight[Int]
.pure[F]
} else {
set
.apply {
state.copy(
entries = entries,
stage = stage.copy(ids = ids.reverse),
releasing = releasing)
}
.flatMap {
case true =>
release.map { _.asRight[Int] }
case false =>
(count + 1)
.asLeft[Unit]
.pure[F]
}
.uncancelable
}
set
.apply {
state.copy(
entries = entries,
stage = stage.copy(ids = ids.reverse),
releasing = releasing)
}
.flatMap {
case true =>
release.map { _.asRight[Int] }
case false =>
(count + 1)
.asLeft[Unit]
.pure[F]
}
.uncancelable

case _: State.Allocated.Stage.Busy =>
()
Expand Down Expand Up @@ -546,7 +536,7 @@ object ResourcePool {
val id = state.id
apply {
state.copy(
id = id + 1,
id = id + 1,
entries = state.entries.updated(id, none))
} {
resource
Expand Down
97 changes: 71 additions & 26 deletions src/test/scala/com/evolution/resourcepool/ResourcePoolTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,36 +356,81 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers {
.run()
}

ignore("cancel `get`") {
test("cancel `get` while allocating resource") {
val result = for {
deferred0 <- Deferred[IO, Unit]
deferred1 <- Deferred[IO, Unit]
result <- ResourcePool
.of(
deferred0 <- Deferred[IO, Unit].toResource
deferred1 <- Deferred[IO, Unit].toResource
pool <- ResourcePool.of(
maxSize = 1,
expireAfter = 1.day,
resource = _ => deferred0
.complete(())
.productR { deferred1.get }
.toResource
)
} yield {
for {
fiber0 <- pool
.resource
.use { _ => IO.never[Unit] }
.start
result <- fiber0
.joinWithNever
.timeout(10.millis)
.attempt
_ <- IO { result should matchPattern { case Left(_: TimeoutException) => } }
fiber1 <- pool
.resource
.use { _.pure[IO] }
.start
_ <- deferred0.get
fiber2 <- fiber0.cancel.start
_ <- deferred1.complete(())
_ <- fiber1.joinWithNever
_ <- fiber2.joinWithNever
} yield {}
}
result
.use(identity)
.run()
}

test("cancel `get` while waiting in queue") {
val result = for {
deferred0 <- Deferred[IO, Unit].toResource
pool <- ResourcePool.of(
maxSize = 1,
expireAfter = 1.day,
resource = _ => deferred0
.complete(())
.productR { deferred1.get }
.toResource
resource = _ => ().pure[Resource[IO, *]]
)
.allocated
(pool, release) = result
fiber0 <- pool
.get
.start
result <- fiber0
.joinWithNever
.timeout(10.millis)
.attempt
_ <- IO { result should matchPattern { case Left(_: TimeoutException) => } }
_ <- deferred0.get
fiber1 <- fiber0.cancel.start
_ <- deferred1.complete(())
_ <- fiber1.join
_ <- release
} yield {}
result.run()
} yield {
for {
fiber0 <- pool
.resource
.use { _ =>
deferred0
.complete(())
.productR { IO.never[Unit] }
}
.start
_ <- deferred0.get
fiber1 <- pool
.resource
.use { _ => IO.never[Unit] }
.start
result <- fiber1
.joinWithNever
.timeout(10.millis)
.attempt
_ <- IO { result should matchPattern { case Left(_: TimeoutException) => } }
fiber2 <- fiber1.cancel.start
_ <- fiber0.cancel
_ <- fiber2.joinWithNever
} yield {}
}
result
.use(identity)
.run()
}

test("cancel `resource.use") {
Expand Down

0 comments on commit b254b06

Please sign in to comment.