Skip to content

Commit

Permalink
Add docs for public methods and implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Sep 21, 2023
1 parent 266a2b6 commit c859254
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 2 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ licenses := Seq(("MIT", url("https://opensource.org/licenses/MIT")))
releaseCrossBuild := true

versionScheme := Some("early-semver")

autoAPIMappings := true
53 changes: 51 additions & 2 deletions src/main/scala/com/evolution/resourcepool/ResourcePool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ import scala.util.control.NoStackTrace
trait ResourcePool[F[_], A] {
import ResourcePool.Release

/** Returns the acquired resource, and a handle releasing it back to the pool.
*
* Calling the handle will not release resource itself, but just make it
* available to be returned again, though resource may expire and be released
* if it stays unused for long enough.
*
* The resource leak may occur if the release handle is never called.
* Therefore it is recommended to use
* [[ResourcePool.ResourcePoolOps#resource]] method instead, which will
* return [[cats.effect.Resource]] calling the handle on release.
*/
def get: F[(A, Release[F])]
}

Expand All @@ -26,6 +37,10 @@ object ResourcePool {

type Id = String

/** Same as [[of[F[_],A](maxSize:Int,partitions:Int*]], but number of partitions is
* determined automatically by taking into account the number of available
* processors and expected pool size.
*/
def of[F[_]: Async, A](
maxSize: Int,
expireAfter: FiniteDuration,
Expand All @@ -48,6 +63,20 @@ object ResourcePool {
apply(maxSize.max(1))
}

/** Creates a new pool with specified number of partitions.
*
* @param maxSize
* Maximum size of the whole pool.
* @param partitions
* Number of paritions to be used. This number determines the count of the
* threads, that could access the pool in parallel, and also number of
* background processes removing the expiring entries.
* @param expireAfter
* Duration after which the resource should be removed if unused.
* @param resource
* Factory for creating the new resources. `Id` is a unique identifier of a
* resource that could be used, for example, for logging purposes.
*/
def of[F[_]: Async, A](
maxSize: Int,
partitions: Int,
Expand Down Expand Up @@ -135,7 +164,7 @@ object ResourcePool {
* Sequence number of a last resource allocated (used to generate an
* identifier for a next resource).
* @param entries
* Allocated or allocating resources.`Some(_)` means that resource is
* Allocated or allocating resources. `Some(_)` means that resource is
* allocated, and `None` means allocating is in progress.
* @param stage
* Represents a state of a pool, i.e. if it is fully busy, if there are
Expand Down Expand Up @@ -163,7 +192,7 @@ object ResourcePool {
/** There are free resources to use.
*
* @param ids
* List of ids from [[Allocated#Entries]] that are free to use.It
* List of ids from [[Allocated#Entries]] that are free to use. It
* could be equal to `Nil` if all resources are busy, but there
* are no tasks waiting in queue.
*/
Expand Down Expand Up @@ -222,10 +251,13 @@ object ResourcePool {
for {
result <- {
if (allocated.isEmpty && releasing.isEmpty) {
// the pool is empty now, we can safely release it
released
.complete(().asRight)
.void
} else {
// the pool will be released elsewhere when all resources in `allocated` or
// `releasing` get released
effect.productR {
released
.get
Expand All @@ -246,6 +278,7 @@ object ResourcePool {

state.stage match {
case stage: State.Allocated.Stage.Free =>
// glue `release` functions of all free resources together
val (entries, releasing, release) = stage
.ids
.foldLeft((state.entries, state.releasing, ().pure[F])) {
Expand Down Expand Up @@ -541,6 +574,7 @@ object ResourcePool {
.attempt
.flatMap {
case Right((value, release)) =>
// resource was allocated
for {
timestamp <- now
entry = Entry(
Expand Down Expand Up @@ -571,6 +605,8 @@ object ResourcePool {
val result1 = result match {
case Right(a) =>
if (releasing.isEmpty && state.allocated.isEmpty) {
// this was the last resource in a pool,
// we can release the pool itself now
state
.released
.complete(a.asRight)
Expand Down Expand Up @@ -618,6 +654,7 @@ object ResourcePool {
(value, releaseOf(id, entry))
}
case Left(a) =>
// resource failed to allocate
0
.tailRecM { count =>
ref
Expand Down Expand Up @@ -791,12 +828,21 @@ object ResourcePool {

implicit class ResourcePoolOps[F[_], A](val self: ResourcePool[F, A]) extends AnyVal {

/** Returns a `Resource`, which, when allocated, will take a resource from a
* pool.
*
* When the `Resource` is released then the underlying resource is released
* back to the pool.
*/
def resource(implicit F: Functor[F]): Resource[F, A] = Resource(self.get)
}

object implicits {
implicit class ResourceOpsResourcePool[F[_], A](val self: Resource[F, A]) extends AnyVal {

/** Same as [[of[F[_],A](maxSize:Int,expireAfter*]], but provides a
* shorter syntax to create a pool out of existing resource.
*/
def toResourcePool(
maxSize: Int,
expireAfter: FiniteDuration,
Expand All @@ -806,6 +852,9 @@ object ResourcePool {
ResourcePool.of(maxSize, expireAfter, _ => self)
}

/** Same as [[of[F[_],A](maxSize:Int,partitions:Int*]], but provides a
* shorter syntax to create a pool out of existing resource.
*/
def toResourcePool(
maxSize: Int,
partitions: Int,
Expand Down

0 comments on commit c859254

Please sign in to comment.