Permalink
Browse files

Rename `Promise.setSync` to `complete`

  • Loading branch information...
SystemFw committed Dec 5, 2017
1 parent 72ec41a commit 50a9f1b637f8e59789032ebd2901bc2c24318878
@@ -62,10 +62,10 @@ abstract class StreamApp[F[_]](implicit F: Effect[F]) {
case Left(t) =>
IO(t.printStackTrace()) *>
halted.set(true) *>
exitCodePromise.setSync(ExitCode.Error)
exitCodePromise.complete(ExitCode.Error)
case Right(exitCode) =>
halted.set(true) *>
exitCodePromise.setSync(exitCode.getOrElse(ExitCode.Success))
exitCodePromise.complete(exitCode.getOrElse(ExitCode.Success))
} *> exitCodePromise.get
def main(args: Array[String]): Unit =
@@ -42,8 +42,8 @@ sealed abstract class AsyncPull[F[_],A] { self =>
(b, cancelB) = t1
fa = a.run.map(Left(_): Either[A, B])
fb = b.run.map(Right(_): Either[A, B])
_ <- async.fork(fa.attempt.flatMap(x => promise.setSync(x)))
_ <- async.fork(fb.attempt.flatMap(x => promise.setSync(x)))
_ <- async.fork(fa.attempt.flatMap(x => promise.complete(x)))
_ <- async.fork(fb.attempt.flatMap(x => promise.complete(x)))
} yield {
(FreeC.Eval(promise.get.flatMap {
case Left(e) => F.raiseError[Either[A, B]](e)
@@ -274,9 +274,9 @@ object Scheduler extends SchedulerPlatform {
async.promise[F,Option[A]].flatMap { gate =>
F.delay {
val cancel = scheduler.scheduleOnce(d) {
ec.execute(() => async.unsafeRunAsync(fa.flatMap(a => gate.setSync(Some(a))))(_ => IO.unit))
ec.execute(() => async.unsafeRunAsync(fa.flatMap(a => gate.complete(Some(a))))(_ => IO.unit))
}
gate.get -> (F.delay(cancel()) *> gate.setSync(None))
gate.get -> (F.delay(cancel()) *> gate.complete(None))
}
}
@@ -2510,7 +2510,7 @@ object Stream {
, None : UO
){ (_, uo) => uo.asInstanceOf[UO] } map { _ map { case (hd, tl) => (hd, fromFreeC(tl)) }}
Algebra.eval(async.fork(F.flatMap(F.attempt(runStep))(x => async.fork(p.setSync(x))))) map { _ => AsyncPull.readAttemptPromise(p) }
Algebra.eval(async.fork(F.flatMap(F.attempt(runStep))(x => async.fork(p.complete(x))))) map { _ => AsyncPull.readAttemptPromise(p) }
}
}
}
@@ -15,22 +15,22 @@ import Promise._
/**
* A purely functional synchronisation primitive.
*
* When created, a `Promise` is unset. It can then be set exactly once, and never be unset again.
* When created, a `Promise` is empty. It can then be completed exactly once, and never be made empty again.
*
* `get` on an unset `Promise` will block until the `Promise` is set.
* `get` on a set `Promise` will always immediately return its content.
* `get` on an empty `Promise` will block until the `Promise` is completed.
* `get` on a completed `Promise` will always immediately return its content.
*
* `set(a)` on an unset `Promise` will set it to `a`, and notify any and all readers currently blocked on a call to `get`.
* `set(a)` on a set `Promise` will not modify its content, and result in a failed `F`.
* `complete(a)` on an empty `Promise` will set it to `a`, and notify any and all readers currently blocked on a call to `get`.
* `complete(a)` on a `Promise` that's already been completed will not modify its content, and result in a failed `F`.
*
* Albeit simple, `Promise` can be used in conjunction with [[Ref]] to build complex concurrent behaviour and
* data structures like queues and semaphores.
*
* Finally, the blocking mentioned above is semantic only, no actual threads are blocked by the implementation.
* Finally, the blocking mentioned above is semantic only, no actual threads are blocked by the implementation.
*/
final class Promise[F[_], A] private[fs2] (ref: Ref[F, State[A]])(implicit F: Effect[F], ec: ExecutionContext) {
/** Obtains the value of the `Promise`, or waits until it has been set. */
/** Obtains the value of the `Promise`, or waits until it has been completed. */
def get: F[A] = F.suspend {
// `new Token` is a side effect because `Token`s are compared with reference equality, it needs to be in `F`.
// For performance reasons, `suspend` is preferred to `F.delay(...).flatMap` here.
@@ -52,7 +52,7 @@ final class Promise[F[_], A] private[fs2] (ref: Ref[F, State[A]])(implicit F: Ef
}
/**
* Like [[get]] but if the `Promise` has not been initialized when the timeout is reached, a `None`
* Like [[get]] but if the `Promise` has not been completed when the timeout is reached, a `None`
* is returned.
*/
def timedGet(timeout: FiniteDuration, scheduler: Scheduler): F[Option[A]] =
@@ -63,39 +63,39 @@ final class Promise[F[_], A] private[fs2] (ref: Ref[F, State[A]])(implicit F: Ef
}
/**
* If this `Promise` is unset, *synchronously* sets the current value to `a`, and notifies
* If this `Promise` is empty, *synchronously* sets the current value to `a`, and notifies
* any and all readers currently blocked on a `get`.
*
* Note that the returned action completes after the reference has been successfully set:
* use `async.fork(r.setSync)` if you want asynchronous behaviour.
* use `async.fork(r.complete)` if you want asynchronous behaviour.
*
* If this `Promise` is already set, the returned action immediately fails with a [[Ref.AlreadySetException]].
* If this `Promise` has already been completed, the returned action immediately fails with a [[Promise.AlreadyCompletedException]].
* In the uncommon scenario where this behaviour is problematic, you can handle failure explicitly
* using `attempt` or any other `ApplicativeError`/`MonadError` combinator on the returned action.
*
* Satisfies:
* `Promise.empty[F, A].flatMap(r => r.setSync(a) *> r.get) == a.pure[F]`
* `Promise.empty[F, A].flatMap(r => r.complete(a) *> r.get) == a.pure[F]`
*/
def setSync(a: A): F[Unit] = {
def complete(a: A): F[Unit] = {
def notifyReaders(r: State.Unset[A]): Unit =
r.waiting.values.foreach { cb =>
ec.execute { () => cb(a) }
}
ref.modify2 {
case s @ State.Set(_) => s -> F.raiseError[Unit](new AlreadySetException)
case s @ State.Set(_) => s -> F.raiseError[Unit](new AlreadyCompletedException)
case u @ State.Unset(_) => State.Set(a) -> F.delay(notifyReaders(u))
}.flatMap(_._2)
}
/**
* Runs `f1` and `f2` simultaneously, but only the winner gets to
* set this `Promise`. The loser continues running but its reference
* complete this `Promise`. The loser continues running but its reference
* to this `Promise` is severed, allowing this `Promise` to be garbage collected
* if it is no longer referenced by anyone other than the loser.
*
* If the winner fails, the returned `F` fails as well, and this `Promise`
* is not set.
* is not complete.
*/
def race(f1: F[A], f2: F[A])(implicit F: Effect[F], ec: ExecutionContext): F[Unit] = F.delay {
val refToSelf = new AtomicReference(this)
@@ -109,7 +109,7 @@ final class Promise[F[_], A] private[fs2] (ref: Ref[F, State[A]])(implicit F: Ef
refToSelf.set(null)
throw e
case Right(v) =>
val action = refToSelf.getAndSet(null).setSync(v)
val action = refToSelf.getAndSet(null).complete(v)
unsafeRunAsync(action)(_ => IO.unit)
}
}
@@ -140,9 +140,9 @@ object Promise {
def empty[F[_], A](implicit F: Effect[F], ec: ExecutionContext): F[Promise[F, A]] =
F.delay(unsafeCreate[F, A])
/** Raised when trying to set a [[Promise]] that's already been set once */
final class AlreadySetException extends Throwable(
s"Trying to set an fs2 Promise that's already been set"
/** Raised when trying to complete a [[Promise]] that's already been completed */
final class AlreadyCompletedException extends Throwable(
s"Trying to complete an fs2 Promise that's already been completed"
)
private sealed abstract class State[A]
@@ -94,7 +94,7 @@ package object async {
*/
def start[F[_], A](f: F[A])(implicit F: Effect[F], ec: ExecutionContext): F[F[A]] =
promise[F, Either[Throwable, A]].flatMap { p =>
fork(f.attempt.flatMap(p.setSync)).as(p.get.flatMap(F.fromEither))
fork(f.attempt.flatMap(p.complete)).as(p.get.flatMap(F.fromEither))
}
/**
@@ -189,14 +189,14 @@ object Queue {
signalSize(c.previous, c.now)
} else {
// queue was empty, we had waiting dequeuers
async.fork(c.previous.deq.head.setSync(Chunk.singleton(a)))
async.fork(c.previous.deq.head.complete(Chunk.singleton(a)))
}
val pk = if (c.previous.peek.isEmpty) {
// no peeker to notify
F.unit
} else {
// notify peekers
async.fork(c.previous.peek.get.setSync(a))
async.fork(c.previous.peek.get.complete(a))
}
(dq *> pk).as(true)
@@ -88,7 +88,7 @@ object Semaphore {
type S = Either[Vector[(Long,async.Promise[F, Unit])], Long]
async.refOf[F,S](Right(n)).map { state => new Semaphore[F] {
private def open(gate: async.Promise[F, Unit]): F[Unit] =
gate.setSync(())
gate.complete(())
def count = state.get.map(count_)
@@ -78,7 +78,7 @@ object Signal {
if (c.previous._3.isEmpty) F.pure(c.map(_._1) -> b)
else {
val now = c.now._1 -> c.now._2
c.previous._3.toVector.traverse { case(_, promise) => async.fork(promise.setSync(now)) } *> F.pure(c.map(_._1) -> b)
c.previous._3.toVector.traverse { case(_, promise) => async.fork(promise.complete(now)) } *> F.pure(c.map(_._1) -> b)
}
}
}
@@ -107,7 +107,7 @@ object Topic {
def unSubscribe: F[Unit] = for {
_ <- state.modify { case (a,subs) => a -> subs.filterNot(_.id == id) }
_ <- subSignal.modify(_ - 1)
_ <- done.setSync(true)
_ <- done.complete(true)
} yield ()
def subscribe: Stream[F, A] = eval(firstA.get) ++ q.dequeue
def publish(a: A): F[Unit] = {
@@ -127,7 +127,7 @@ object Topic {
}
c <- state.modify { case(a,s) => a -> (s :+ sub) }
_ <- subSignal.modify(_ + 1)
_ <- firstA.setSync(c.now._1)
_ <- firstA.complete(c.now._1)
} yield sub
new Topic[F,A] {
@@ -12,15 +12,15 @@ class PromiseSpec extends AsyncFs2Spec with EitherValues {
"Promise" - {
"setSync" in {
promise[IO, Int].flatMap { p =>
p.setSync(0) *> p.get
p.complete(0) *> p.get
}.unsafeToFuture.map { _ shouldBe 0 }
}
"setSync is only successful once" in {
promise[IO, Int].flatMap { p =>
p.setSync(0) *> p.setSync(1).attempt product p.get
p.complete(0) *> p.complete(1).attempt product p.get
}.unsafeToFuture.map { case (err, value) =>
err.left.value shouldBe a[Promise.AlreadySetException]
err.left.value shouldBe a[Promise.AlreadyCompletedException]
value shouldBe 0
}
}
@@ -31,10 +31,10 @@ class PromiseSpec extends AsyncFs2Spec with EitherValues {
modifyGate <- promise[IO, Unit]
readGate <- promise[IO, Unit]
_ <- fork {
modifyGate.get *> state.modify(_ * 2) *> readGate.setSync(())
modifyGate.get *> state.modify(_ * 2) *> readGate.complete(())
}
_ <- fork {
state.setSync(1) *> modifyGate.setSync(())
state.setSync(1) *> modifyGate.complete(())
}
_ <- readGate.get
res <- state.get
@@ -48,7 +48,7 @@ class PromiseSpec extends AsyncFs2Spec with EitherValues {
for {
p <- async.promise[IO,Int]
first <- p.timedGet(100.millis, scheduler)
_ <- p.setSync(42)
_ <- p.complete(42)
second <- p.timedGet(100.millis, scheduler)
} yield List(first, second)
}.runLog.unsafeToFuture.map(_.flatten shouldBe Vector(None, Some(42)))
@@ -36,7 +36,7 @@ class SocketSpec extends Fs2Spec with BeforeAndAfterAll {
val echoServer: Stream[IO, Unit] = {
serverWithLocalAddress[IO](new InetSocketAddress(InetAddress.getByName(null), 0)).flatMap {
case Left(local) => Stream.eval_(localBindAddress.setSync(local))
case Left(local) => Stream.eval_(localBindAddress.complete(local))
case Right(s) =>
s.map { socket =>
socket.reads(1024).to(socket.writes()).onFinalize(socket.endOfOutput)

0 comments on commit 50a9f1b

Please sign in to comment.