diff --git a/src/main/scala/com/evolution/scache/ExpiringCache.scala b/src/main/scala/com/evolution/scache/ExpiringCache.scala index 6ef8d22..116bf9a 100644 --- a/src/main/scala/com/evolution/scache/ExpiringCache.scala +++ b/src/main/scala/com/evolution/scache/ExpiringCache.scala @@ -5,6 +5,7 @@ import cats.effect.syntax.all.* import cats.kernel.CommutativeMonoid import cats.syntax.all.* import cats.{Applicative, Monad, MonadThrow, Monoid} +import com.evolution.scache.LoadingCache.EntryState import com.evolutiongaming.catshelper.ClockHelper.* import com.evolutiongaming.catshelper.Schedule @@ -40,16 +41,17 @@ object ExpiringCache { def removeExpired(key: K, entryRef: LoadingCache.EntryRef[F, Entry[V]]) = { entryRef .get - .flatMap { entry => - entry.foldMapM { entry => + .flatMap { + case state: EntryState.Value[F, Entry[V]] => for { now <- Clock[F].millis - expiredAfterRead = expireAfterReadMs + entry.value.touched < now - expiredAfterWrite = () => expireAfterWriteMs.exists { _ + entry.value.created < now } + expiredAfterRead = expireAfterReadMs + state.entry.value.touched < now + expiredAfterWrite = () => expireAfterWriteMs.exists { _ + state.entry.value.created < now } expired = expiredAfterRead || expiredAfterWrite() result <- if (expired) remove(key) else ().pure[F] } yield result - } + case _: EntryState.Loading[F, Entry[V]] => ().pure[F] + case EntryState.Removed => ().pure[F] } } @@ -66,8 +68,9 @@ object ExpiringCache { entryRef .get .map { - case Right(a) => Elem(key, a.value.touched) :: result - case Left(_) => result + case state: EntryState.Value[F, Entry[V]] => Elem(key, state.entry.value.touched) :: result + case _: EntryState.Loading[F, Entry[V]] => result + case EntryState.Removed => result } } } @@ -105,8 +108,8 @@ object ExpiringCache { entryRefs.foldMapM { case (key, entryRef) => entryRef .get - .flatMap { value => - value.foldMapM { _ => + .flatMap { + case _: EntryState.Value[F, Entry[V]] => refresh .value(key) .flatMap { @@ -114,7 +117,8 @@ object ExpiringCache { case None => cache.remove(key).void } .handleError { _ => () } - } + case _: EntryState.Loading[F, Entry[V]] => ().pure[F] + case EntryState.Removed => ().pure[F] } } } diff --git a/src/main/scala/com/evolution/scache/LoadingCache.scala b/src/main/scala/com/evolution/scache/LoadingCache.scala index f06f132..c336c18 100644 --- a/src/main/scala/com/evolution/scache/LoadingCache.scala +++ b/src/main/scala/com/evolution/scache/LoadingCache.scala @@ -1,13 +1,12 @@ package com.evolution.scache -import cats.{Functor, Monad, MonadThrow, Parallel} +import cats.{Applicative, Functor, Monad, MonadThrow, Parallel} import cats.effect.implicits.* import cats.effect.{Concurrent, Deferred, Fiber, GenConcurrent, Outcome, Ref, Resource} import cats.kernel.CommutativeMonoid import com.evolutiongaming.catshelper.ParallelHelper.* import cats.syntax.all.* - private[scache] object LoadingCache { def of[F[_] : Concurrent, K, V]( @@ -58,19 +57,23 @@ private[scache] object LoadingCache { entry .get .flatMap { - case Right(entry) => - entry + case state: EntryState.Value[F, V] => + state + .entry .value .some .pure[F] - case Left(deferred) => - deferred + case state: EntryState.Loading[F, V] => + state + .deferred .get .map { entry => entry .toOption .map { _.value } } + case EntryState.Removed => + none[V].pure[F] } } } @@ -82,7 +85,7 @@ private[scache] object LoadingCache { .flatMap { entryRefs => entryRefs .get(key) - .traverse { _.either } + .flatTraverse { _.optEither } } } @@ -94,7 +97,7 @@ private[scache] object LoadingCache { } } - def getOrUpdate1[A](key: K)(value: => F[(A, V, Option[Release])]) = { + def getOrUpdate1[A](key: K)(value: => F[(A, V, Option[Release])]): F[Either[A, Either[F[V], V]]] = { 0.tailRecM { counter => ref .access @@ -104,7 +107,7 @@ private[scache] object LoadingCache { .fold { for { deferred <- Deferred[F, Either[Throwable, Entry[F, V]]] - entryRef <- Ref[F].of(deferred.asLeft[Entry[F, V]]) + entryRef <- Ref[F].of[EntryState[F, V]](EntryState.Loading(deferred)) result <- set(entryRefs.updated(key, entryRef)) .flatMap { case true => @@ -116,110 +119,206 @@ private[scache] object LoadingCache { .attempt .race1 { deferred.get } .flatMap { + // `value` got computed, and deferred was not (yet) completed by any other fiber in `put` case Left(Right((a, entry))) => - 0.tailRecM { counter => - entryRef - .access - .flatMap { - case (Right(entry0), _) => + deferred + .complete(entry.asRight) + .flatMap { + // Successfully completed our deferred, + // now trying to place the new value in the entry. + case true => + + def releaseAndReturnValue(state: EntryState.Value[F, V]): F[Either[A, Either[F[V], V]]] = entry .release1 + .start .as { - entry0 + state + .entry .value .asRight[F[V]] .asRight[A] - .asRight[Int] } - case (Left(_), set) => - set(entry.asRight).flatMap { - case true => - deferred - .complete(entry.asRight) + def releaseAndReturnLoading(state: EntryState.Loading[F, V]): F[Either[A, Either[F[V], V]]] = + entry + .release1 + .start + .as { + state + .deferred + .getOrError + .map(_.value) + .asLeft[V] + .asRight[A] + } + + // Try putting computed value in the map, if there is no entry with our key. + // If the map already contains an entry with our key, + // return its value (or value computation). + def tryPutNewValue: F[Either[A, Either[F[V], V]]] = + 0.tailRecM { counter => + ref + .access + .flatMap { case (entryRefs, set) => + entryRefs + .get(key) + .fold { + // No entry present in the map, so we try to add a new one + Ref[F] + .of[EntryState[F, V]](EntryState.Value(entry)) + .flatMap { entryRef => + set(entryRefs.updated(key, entryRef)).map { + case true => + a + .asLeft[Either[F[V], V]] + .asRight[Int] + case false => + (counter + 1) + .asLeft[Either[A, Either[F[V], V]]] + } + } + } { entryRef => + entryRef + .get + .flatMap { + case state: EntryState.Value[F, V] => + releaseAndReturnValue(state).map(_.asRight[Int]) + + case state: EntryState.Loading[F, V] => + releaseAndReturnLoading(state).map(_.asRight[Int]) + + // `Removed` means that this entry won't be present in the map + // next time we look the key up (see `remove` flow), + // so we just retry. + case EntryState.Removed => + (counter + 1) + .asLeft[Either[A, Either[F[V], V]]] + .pure[F] + } + .uncancelable + } + } + } + + entryRef + .access + .flatMap { + // Entry is still in loading state, containing the same deferred we just completed. + // Now we can try to put the computed value in the same entryRef. + case (state: EntryState.Loading[F, V], set) if state.deferred == deferred => + set(EntryState.Value(entry)) .flatMap { - case true => + // Happy path: successfully placed our computed value + case true => a .asLeft[Either[F[V], V]] .pure[F] + // Failed to set our value, meaning the entry was either: + // - Updated: in that case we release our computed value, and return + // the value (or its computation), giving it the priority + // - Removed: in that case we try to put our value back in the map case false => - deferred - .getOrError - .map { entry => - entry - .value - .asRight[F[V]] - .asRight[A] + entryRef + .get + .flatMap { + case state: EntryState.Value[F, V] => + releaseAndReturnValue(state) + + case state: EntryState.Loading[F, V] => + releaseAndReturnLoading(state) + + case EntryState.Removed => + tryPutNewValue } } - .map { _.asRight[Int] } - case false => - (counter + 1) - .asLeft[Either[A, Either[F[V], V]]] - .pure[F] + + case (state: EntryState.Value[F, V], _) => + releaseAndReturnValue(state) + + case (state: EntryState.Loading[F, V], _) => + releaseAndReturnLoading(state) + + case (EntryState.Removed, _) => + tryPutNewValue } - } - } + // Deferred got completed by another fiber, so we return what they put there, + // and release the value we just computed. + case false => + entry + .release1 + .start + .productR( + deferred + .getOrError + .map { entry => + entry + .value + .asRight[F[V]] + .asRight[A] + } + ) + } + + // `value` computation completed with error, + // and deferred was not completed in another fiber in `put`. case Left(Left(error)) => deferred .complete(error.asLeft) .flatMap { + // Successfully completed our deferred with error, + // now trying to remove the entry from the map, if it is still there. case true => - entryRef - .get - .flatMap { - case Left(_) => - 0.tailRecM { counter => - ref - .access - .flatMap { case (entryRefs, set) => - entryRefs - .get(key) - .fold { - error.raiseError[F, Either[Int, V]] - } { - case `entryRef` => - set(entryRefs - key).flatMap { - case true => - error.raiseError[F, Either[Int, V]] - case false => - (counter + 1) - .asLeft[V] - .pure[F] - } - case entryRef => - entryRef - .get - .flatMap { - case Left(_) => - error.raiseError[F, Either[Int, V]] - case Right(entry) => - entry - .value - .asRight[Int] - .pure[F] - } - } - } - } - case Right(entry) => - entry - .value - .pure[F] - } + 0.tailRecM { counter1 => + ref + .access + .flatMap { case (entryRefs, set) => + entryRefs + .get(key) + .fold { + // Key was removed while we were loading, + // so we are just propagating the error + error.raiseError[F, Either[Int, Either[F[V], V]]] + } { + // The entry we added to the map is still there and unmodified, + // so we can safely remove it and propagate the error + case `entryRef` => + set(entryRefs - key).flatMap { + // Happy path: successfully removed our entry + case true => + error.raiseError[F, Either[Int, Either[F[V], V]]] + // Retrying (different keys could've been modified in the map) + case false => + (counter1 + 1) + .asLeft[Either[F[V], V]] + .pure[F] + } + // Another fiber replaced the `ref` we added to the map, + // so we return their value (computed or ongoing), + // or propagate our error if our entry got removed. + case entryRef => + entryRef + .optEither + .flatMap(_.liftTo[F](error)) + .map(_.asRight[Int]) + } + } + } + // Someone else completed the deferred before us, so they must've take care of + // updating the `ref`, and we return their result. case false => deferred .getOrError .map { _.value } + .asLeft[V] + .pure[F] } - .map { value => - value - .asRight[F[V]] - .asRight[A] - } + .map { _.asRight[A] } + // Deferred was completed by `put` in another fiber before `value` computation completed. + // We return their value, and schedule release of our value that is still being computed. case Right((fiber, entry)) => fiber .joinWithNever @@ -249,19 +348,25 @@ private[scache] object LoadingCache { .uncancelable } yield result } { entryRef => + // Map already contained an entry under our key, so we return that value (or its ongoing computation) entryRef - .either - .map { value => - value - .asRight[A] - .asRight[Int] + .optEither + .map { + case Some(either) => + either + .asRight[A] + .asRight[Int] + // Entry got removed (see `remove` flow), so we retry expecting to get something else with our key. + case None => + (counter + 1) + .asLeft[Either[A, Either[F[V], V]]] } } } } } - def put(key: K, value: V, release: Option[Release]) = { + def put(key: K, value: V, release: Option[Release]): F[F[Option[V]]] = { val entry = entryOf(value, release) 0.tailRecM { counter => ref @@ -270,64 +375,107 @@ private[scache] object LoadingCache { entryRefs .get(key) .fold { + // No entry present in the map, so we add a new one Ref[F] - .of(entry.asRight[DeferredThrow[F, Entry[F, V]]]) + .of[EntryState[F, V]](EntryState.Value(entry)) .flatMap { entryRef => set(entryRefs.updated(key, entryRef)).map { - case true => + case true => none[V] .pure[F] .asRight[Int] case false => - (counter + 1).asLeft[F[Option[V]]] + (counter + 1) + .asLeft[F[Option[V]]] } } } { entryRef => - 0.tailRecM { counter => - entryRef - .access - .flatMap { - case (Right(entry0), set) => - set(entry.asRight) - .flatMap { - case true => - entry0 - .release - .traverse { _.start } - .map { fiber => - fiber - .foldMapM { _.joinWithNever } - .as { entry0.value.some } - .asRight[Int] - } - case false => - (counter + 1) - .asLeft[F[Option[V]]] - .pure[F] - } + entryRef + .access + .flatMap { + // A computed value is already present in the map, so we are replacing it with our value. + case (state: EntryState.Value[F, V], set) => + set(EntryState.Value(entry)) + .flatMap { + // Successfully replaced the entryRef with our value, + // now we are responsible for releasing the old value. + case true => + state + .entry + .release + .traverse { _.start } + .map { fiber => + fiber + .foldMapM { _.joinWithNever } + .as { state.entry.value.some } + .asRight[Int] + } + // Failed to set the entryRef to our value + // so we just release our value and exit. + case false => + entry + .release + .traverse { _.start } // Start releasing and forget + .as { + none[V] + .pure[F] + .asRight[Int] + } + } - case (Left(deferred), set) => - deferred - .complete(entry.asRight) - .flatMap { - case true => - set(entry.asRight).map { - case true => - none[V] - .pure[F] - .asRight[Int] - case false => - (counter + 1).asLeft[F[Option[V]]] + // The value is still loading, so we first try to complete the deferred with it, + // and then replace it with our value. + case (state: EntryState.Loading[F, V], set) => + state + .deferred + .complete(entry.asRight) + .flatMap { + // We successfully completed the deferred, now trying to set the value. + case true => + set(EntryState.Value(entry)).flatMap { + // We successfully replaced the entry with our value, so we are done. + case true => + none[V] + .pure[F] + .asRight[Int] + .pure[F] + // Another fiber placed their new value before us + // so we just release our value and exit. + case false => + entry + .release + .traverse { _.start } // Start releasing and forget + .as { + none[V] + .pure[F] + .asRight[Int] + } + } + // Someone just completed the deferred we saw + // so we just release our value and exit. + case false => + entry + .release + .traverse { _.start } // Start releasing and forget + .as { + none[V] + .pure[F] + .asRight[Int] } - case false => - (counter + 1) - .asLeft[F[Option[V]]] - .pure[F] - } - } - .uncancelable - .map { _.asRight[Int] } - } + } + + // The key was just removed from the map, so just release the value and exit. + case (EntryState.Removed, _) => + entry + .release + .traverse { _.start } // Start releasing and forget + .as { + none[V] + .pure[F] + .asRight[Int] + } + } + .uncancelable } } } @@ -367,7 +515,10 @@ private[scache] object LoadingCache { values.flatMap { values => entryRef .value - .map { value => (key, value) :: values } + .map { + case Some(value) => (key, value) :: values + case None => values + } } } } @@ -386,8 +537,11 @@ private[scache] object LoadingCache { } { case (values, (key, entryRef)) => values.flatMap { values => entryRef - .either - .map { value => (key, value) :: values } + .optEither + .map { + case Some(value) => (key, value) :: values + case None => values + } } } } @@ -395,7 +549,7 @@ private[scache] object LoadingCache { } - def remove(key: K) = { + def remove(key: K): F[F[Option[V]]] = { 0.tailRecM { counter => ref .access @@ -410,21 +564,39 @@ private[scache] object LoadingCache { } { entryRef => set(entryRefs - key) .flatMap { - case true => + case true => + // We just removed the entry for the map, now we need to release it. + // Replacing the value of the ref with `Removed` means that we are getting responsible for the release. entryRef - .getOption - .flatMap { entry => - entry.traverse { entry => - entry + .getAndSet(EntryState.Removed) + .flatMap { + // We removed a loaded value, so we are responsible for releasing it. + case state: EntryState.Value[F, V] => + state + .entry .release1 - .as { entry.value } - } - } - .start - .map { fiber => - fiber - .joinWithNever - .asRight[Int] + .as { state.entry.value.some } + .start + .map { fiber => + fiber + .joinWithNever + .asRight[Int] + } + + // We removed a loading value, and the fiber that will complete it will also + // release that value, so there is nothing for us to return. + case _: EntryState.Loading[F, V] => + none[V] + .pure[F] + .asRight[Int] + .pure[F] + + // We removed an entry that was already being removed by another fiber, so we are done. + case EntryState.Removed => + none[V] + .pure[F] + .asRight[Int] + .pure[F] } case false => (counter + 1) @@ -465,8 +637,8 @@ private[scache] object LoadingCache { entryRefs.foldLeft(zero) { case (a, (key, entryRef)) => for { a <- a - v <- entryRef.either - b <- f(key, v) + v <- entryRef.optEither + b <- v.fold(CommutativeMonoid[A].empty.pure[F])(v => f(key, v)) } yield { CommutativeMonoid[A].combine(a, b) } @@ -486,8 +658,8 @@ private[scache] object LoadingCache { .foldLeft(zero) { case (a, (key, entryRef)) => val b = Parallel[F].parallel { for { - v <- entryRef.either - b <- f(key, v) + v <- entryRef.optEither + b <- v.fold(CommutativeMonoid[A].empty.pure[F])(v => f(key, v)) } yield b } Parallel[F] @@ -500,7 +672,7 @@ private[scache] object LoadingCache { } } - final case class Entry[F[_], A](value: A, release: Option[F[Unit]]) + final case class Entry[+F[_], +A](value: A, release: Option[F[Unit]]) object Entry { implicit class EntryOps[F[_], A](val self: Entry[F, A]) extends AnyVal { @@ -508,9 +680,16 @@ private[scache] object LoadingCache { } } + sealed trait EntryState[+F[_], +A] + object EntryState { + final case class Loading[F[_], A](deferred: Deferred[F, Either[Throwable, Entry[F, A]]]) extends EntryState[F, A] + final case class Value[F[_], A](entry: Entry[F, A]) extends EntryState[F, A] + final case object Removed extends EntryState[Nothing, Nothing] + } + type DeferredThrow[F[_], A] = Deferred[F, Either[Throwable, A]] - type EntryRef[F[_], A] = Ref[F, Either[DeferredThrow[F, Entry[F, A]], Entry[F, A]]] + type EntryRef[F[_], A] = Ref[F, EntryState[F, A]] type EntryRefs[F[_], K, V] = Map[K, EntryRef[F, V]] @@ -535,52 +714,64 @@ private[scache] object LoadingCache { } } + implicit class EntryStateOps[F[_], A](val self: EntryState[F, A]) extends AnyVal { + + def getOption(implicit F: Applicative[F]): F[Option[Entry[F, A]]] = + self match { + case EntryState.Loading(deferred) => deferred.getOption + case EntryState.Value(entry) => entry.some.pure[F] + case EntryState.Removed => none[Entry[F, A]].pure[F] + } + + def optEither(implicit F: MonadThrow[F]): Option[Either[F[A], A]] = + self match { + case EntryState.Value(entry) => + entry + .value + .asRight[F[A]] + .some + case EntryState.Loading(deferred) => + deferred + .getOrError + .map(_.value) + .asLeft[A] + .some + case EntryState.Removed => + none[Either[F[A], A]] + } + + } + implicit class EntryRefOps[F[_], A](val self: EntryRef[F, A]) extends AnyVal { def getOption(implicit F: Monad[F]): F[Option[Entry[F, A]]] = { self .get - .flatMap { - case Right(a) => a.some.pure[F] - case Left(a) => a.getOption - } + .flatMap(_.getOption) } - def either(implicit F: MonadThrow[F]): F[Either[F[A], A]] = { + def optEither(implicit F: MonadThrow[F]): F[Option[Either[F[A], A]]] = { self .get - .map { - case Right(entry) => - entry - .value - .asRight[F[A]] - case Left(deferred) => - deferred - .get - .flatMap { - case Right(entry) => - entry - .value - .pure[F] - case Left(error) => - error.raiseError[F, A] - } - .asLeft[A] - } + .map(_.optEither) } - def value(implicit F: MonadThrow[F]): F[F[A]] = { + def value(implicit F: MonadThrow[F]): F[Option[F[A]]] = { self .get .map { - case Right(entry) => + case EntryState.Value(entry) => entry .value .pure[F] - case Left(deferred) => + .some + case EntryState.Loading(deferred) => deferred .getOrError .map { _.value } + .some + case EntryState.Removed => + none[F[A]] } } @@ -589,13 +780,17 @@ private[scache] object LoadingCache { self .access .flatMap { - case (Right(entry), set) => + case (EntryState.Value(entry), set) => val entry1 = entry.copy(value = f(entry.value)) - set(entry1.asRight).map { + set(EntryState.Value(entry1)).map { case true => ().asRight[Int] case false => (counter + 1).asLeft[Unit] } - case (Left(_), _) => + case (_: EntryState.Loading[F, A], _) => + () + .asRight[Int] + .pure[F] + case (EntryState.Removed, _) => () .asRight[Int] .pure[F] diff --git a/src/test/scala/com/evolution/scache/CacheSpec.scala b/src/test/scala/com/evolution/scache/CacheSpec.scala index 1ec47e1..0e7ca78 100644 --- a/src/test/scala/com/evolution/scache/CacheSpec.scala +++ b/src/test/scala/com/evolution/scache/CacheSpec.scala @@ -39,7 +39,7 @@ class CacheSpec extends AsyncFunSuite with Matchers { } yield (cache, metrics) def check(testName: String)(assertions: (Cache[IO, Int, Int], CacheMetricsProbe) => IO[Any]): Unit = test(testName) { - cacheAndMetrics.use(assertions.tupled).run() + cacheAndMetrics.use(assertions.tupled).run(timeout = 20.seconds) } check(s"get: $name") { (cache, metrics) => @@ -833,11 +833,14 @@ class CacheSpec extends AsyncFunSuite with Matchers { value <- value0.joinWithNever _ <- IO { value shouldEqual 0.asRight } value <- value1 - _ <- IO { value shouldEqual 0.some } + _ <- IO { value shouldEqual None } + // Value is still present in cache, since calculation ended later than remove + value <- cache.get(0) + _ <- IO { value shouldEqual Some(0) } _ <- metrics.expect( metrics.expectedGet(hit = false) -> 1, - metrics.expectedLoad(success = true) -> 1, - metrics.expectedLife -> 1 + metrics.expectedGet(hit = true) -> 1, + metrics.expectedLoad(success = true) -> 1 ) } yield {} } @@ -849,19 +852,22 @@ class CacheSpec extends AsyncFunSuite with Matchers { value1 <- cache.remove(0) release <- Deferred[IO, Unit] released <- Ref[IO].of(false) - _ <- deferred.complete((0, (release.get *> released.set(true)).some)) + _ <- deferred.complete((0, (release.get *> released.set(true).void).some)) value <- fiber.joinWithNever _ <- IO { value shouldEqual 0.asRight } value <- value1.startEnsure _ <- release.complete(()) value <- value.joinWithNever released <- released.get - _ <- IO { released shouldEqual true } - _ <- IO { value shouldEqual 0.some } + _ <- IO { released shouldEqual false } + _ <- IO { value shouldEqual None } + value <- cache.get(0) + // Value is still present in cache, since calculation ended later than remove + _ <- IO { value shouldEqual Some(0) } _ <- metrics.expect( metrics.expectedGet(hit = false) -> 1, - metrics.expectedLoad(success = true) -> 1, - metrics.expectedLife -> 1 + metrics.expectedGet(hit = true) -> 1, + metrics.expectedLoad(success = true) -> 1 ) } yield {} } @@ -1160,6 +1166,131 @@ class CacheSpec extends AsyncFunSuite with Matchers { ) } yield {} } + + check(s"each release performed exactly once during `getOrUpdate1` and `put` race: $name") { (cache, _) => + for { + resultRef1 <- Ref[IO].of(0) + resultRef2 <- Ref[IO].of(0) + n = 100000 + range = (1 to n).toList + + // For `getOrUpdate*` we don't know how many times the resource will be run, + // so we use increment/decrement as a way to check that the resource is released exactly once. + valueResource = (i: Int) => Resource.make(resultRef1.update(_ + i).as(i))(_ => resultRef1.update(_ - i)) + f1 <- range.parTraverse { i => cache.getOrUpdateResource(0)(valueResource(i)) }.start + + // For `put` we know that the resource will be written and released every time, + // so we increment on release and check that the final value is equal to the sum of the range. + f2 <- range.parTraverse(i => cache.put(0, 0, resultRef2.update(_ + i))).start + + expectedResult = range.sum + + _ <- f1.joinWithNever.void + _ <- f2.joinWithNever.flatMap(_.sequence) + _ <- cache.clear.flatten + + result1 <- resultRef1.get + result2 <- resultRef2.get + _ <- IO { result1 shouldEqual 0 } + _ <- IO { result2 shouldEqual expectedResult } + } yield {} + } + + check(s"each release performed exactly once during `put` and `remove` race: $name") { (cache, _) => + for { + resultRef <- Ref[IO].of(0) + n = 100000 + range = (1 to n).toList + + f1 <- range.parTraverse(i => cache.put(0, 0, resultRef.update(_ + i))).start + f2 <- cache.remove(0).replicateA(n).start + + expectedResult = range.sum + + _ <- f1.joinWithNever.flatMap(_.sequence) + _ <- f2.joinWithNever.flatMap(_.sequence) + _ <- cache.clear.flatten + + result <- resultRef.get + _ <- IO { result shouldEqual expectedResult } + } yield {} + } + + check(s"each release performed exactly once during `getOrUpdate1`, `put` and `remove` race: $name") { (cache, _) => + for { + resultRef1 <- Ref[IO].of(0) + resultRef2 <- Ref[IO].of(0) + n = 100000 + range = (1 to n).toList + + // For `getOrUpdate*` we don't know how many times the resource will be run, + // so we use increment/decrement as a way to check that the resource is released exactly once. + valueResource = (i: Int) => Resource.make(resultRef1.update(_ + i).as(i))(_ => resultRef1.update(_ - i)) + f1 <- range.parTraverse { i => cache.getOrUpdateResource(0)(valueResource(i)) }.start + + // For `put` we know that the resource will be written and released every time, + // so we increment on release and check that the final value is equal to the sum of the range. + f2 <- range.parTraverse(i => cache.put(0, 0, resultRef2.update(_ + i))).start + + f3 <- cache.remove(0).replicateA(n).start + + expectedResult = range.sum + + _ <- f1.joinWithNever.void + _ <- f2.joinWithNever.flatMap(_.sequence) + _ <- f3.joinWithNever.flatMap(_.sequence) + _ <- cache.clear.flatten + + result1 <- resultRef1.get + result2 <- resultRef2.get + _ <- IO { result1 shouldEqual 0 } + _ <- IO { result2 shouldEqual expectedResult } + } yield {} + } + + check(s"failing loads don't interfere with releases during `getOrUpdate1`, `put` and `remove` race: $name") { (cache, _) => + for { + resultRef1 <- Ref[IO].of(0) + resultRef2 <- Ref[IO].of(0) + resultRef3 <- Ref[IO].of(0) + n = 100000 + range = (1 to n).toList + + // For `getOrUpdate*` we don't know how many times the resource will be run, + // so we use increment/decrement as a way to check that the resource is released exactly once. + valueResource = (i: Int) => Resource.make(resultRef1.update(_ + i).as(i))(_ => resultRef1.update(_ - i)) + f1 <- range.parTraverse { i => + cache.getOrUpdateResource(0)(valueResource(i)).recover { case _ => -1 } + }.start + + failingResource = (i: Int) => + Resource.make(new Exception("Boom").raiseError[IO, Int])(_ => resultRef2.update(_ - i)) + f2 <- range.parTraverse { i => + cache.getOrUpdateResource(0)(failingResource(i)).recover { case _ => -1 } + }.start + + // For `put` we know that the resource will be written and released every time, + // so we increment on release and check that the final value is equal to the sum of the range. + f3 <- range.parTraverse(i => cache.put(0, 0, resultRef3.update(_ + i))).start + + f4 <- cache.remove(0).parReplicateA(n).start + + expectedResult = range.sum + + _ <- f1.joinWithNever.void + _ <- f2.joinWithNever.void + _ <- f3.joinWithNever.flatMap(_.sequence) + _ <- f4.joinWithNever.flatMap(_.sequence) + _ <- cache.clear.flatten + + result1 <- resultRef1.get + result2 <- resultRef2.get + result3 <- resultRef3.get + _ <- IO { result1 shouldEqual 0 } + _ <- IO { result2 shouldEqual 0 } + _ <- IO { result3 shouldEqual expectedResult } + } yield () + } } } diff --git a/src/test/scala/com/evolution/scache/ExpiringCacheSpec.scala b/src/test/scala/com/evolution/scache/ExpiringCacheSpec.scala index 2d6e8f5..5c2173d 100644 --- a/src/test/scala/com/evolution/scache/ExpiringCacheSpec.scala +++ b/src/test/scala/com/evolution/scache/ExpiringCacheSpec.scala @@ -267,9 +267,9 @@ class ExpiringCacheSpec extends AsyncFunSuite with Matchers { } for { - released <- Ref[F].of(false) + released <- Deferred[F, Boolean] release <- Deferred[F, Unit] - value <- cache.put(0, 1, released.set(true) *> release.complete(()).void) + value <- cache.put(0, 1, released.complete(true) *> release.complete(()).void) value <- value _ <- Sync[F].delay { value shouldEqual none } value <- cache.get(0)