Permalink
Browse files

Fix concurrency-primitives.md:

- SignallingRef instead of Signal
- more relevant termination example with SignallingRef
- fix compilation issues (e.g Stream.awakeEvery[F])
  • Loading branch information...
kczulko committed Oct 24, 2018
1 parent d65f6a2 commit f6543127f899cd6612cea017780df46b46002838
Showing with 29 additions and 17 deletions.
  1. +29 −17 site/src/concurrency-primitives.md
@@ -64,49 +64,61 @@ The program ends after 15 seconds when the signal interrupts the publishing of m
- Subscriber #1 should receive 15 events + the initial empty event
- Subscriber #2 should receive 10 events
- Subscriber #3 should receive 5 events
- Publisher sends Quit event
- Subscribers raise interrupt signal on Quit event
```
```scala
import cats.effect.{Concurrent, ExitCode, IO, IOApp}
import fs2.concurrent.{Signal, Topic}
import fs2.{Sink, Stream}
import scala.concurrent.duration._
case class Event(value: String)
import cats.effect.{Concurrent, ExitCode, IO, IOApp, Timer}
import cats.syntax.all._
import fs2.{Sink, Stream}
import fs2.concurrent.{SignallingRef, Topic}
sealed trait Event
case class Text(value: String) extends Event
case object Quit extends Event
class EventService[F[_]](eventsTopic: Topic[F, Event],
interrupter: Signal[F, Boolean])(implicit F: Concurrent[F], timer: Timer[F]) {
interrupter: SignallingRef[F, Boolean])(implicit F: Concurrent[F], timer: Timer[F]) {
// Publishing events every one second until signaling interruption
def startPublisher: Stream[F, Unit] =
Stream.awakeEvery(1.second).flatMap { _ =>
val event = Event(System.currentTimeMillis().toString)
Stream.eval(eventsTopic.publish1(event))
}.interruptWhen(interrupter)
// Publishing 15 text events, then single Quit event, and still publishing text events
def startPublisher: Stream[F, Unit] = {
val textEvents = eventsTopic.publish(
Stream.awakeEvery[F](1.second)
.zipRight(Stream(Text(System.currentTimeMillis().toString)).repeat)
)
val quitEvent = Stream.eval(eventsTopic.publish1(Quit))
(textEvents.take(15) ++ quitEvent ++ textEvents).interruptWhen(interrupter)
}
// Creating 3 subscribers in a different period of time and join them to run concurrently
def startSubscribers: Stream[F, Unit] = {
val s1: Stream[F, Event] = eventsTopic.subscribe(10)
val s2: Stream[F, Event] = Stream.sleep_[F](5.seconds) ++ eventsTopic.subscribe(10)
val s3: Stream[F, Event] = Stream.sleep_[F](10.seconds) ++ eventsTopic.subscribe(10)
// When Quit message received - terminate the program
def sink(subscriberNumber: Int): Sink[F, Event] =
_.evalMap(e => F.delay(println(s"Subscriber #$subscriberNumber processing event: $e")))
_.flatMap {
case e @ Text(_) => Stream.eval(F.delay(println(s"Subscriber #$subscriberNumber processing event: $e")))
case Quit => Stream.eval(interrupter.set(true))
}
Stream(s1.to(sink(1)), s2.to(sink(2)), s3.to(sink(3))).parJoin(3)
}
}
class PubSub extends IOApp {
object PubSub extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val stream = for {
topic <- Stream.eval(Topic[IO, Event](Event("")))
signal <- Stream.eval(Signal[IO, Boolean](false))
topic <- Stream.eval(Topic[IO, Event](Text("")))
signal <- Stream.eval(SignallingRef[IO, Boolean](false))
service = new EventService[IO](topic, signal)
_ <- Stream(
S.delay(Stream.eval(signal.set(true)), 15.seconds),
service.startPublisher.concurrently(service.startSubscribers)
).parJoin(2).drain
} yield ()

0 comments on commit f654312

Please sign in to comment.