Skip to content
Permalink
Browse files

Merge pull request #1644 from poliez/patch-1

Add a section on `interruptWhen` to guide.md
  • Loading branch information...
SystemFw committed Oct 8, 2019
2 parents fad332e + c7b238d commit 5eb3d9deae80b7e8dd4cb5943909f1b76dc1a3f1
Showing with 76 additions and 0 deletions.
  1. +76 −0 site/src/main/tut/guide.md
@@ -26,6 +26,7 @@ This is the official FS2 guide. It gives an overview of the library and its feat
* [Exercises (stream transforming)](#exercises-1)
* [Concurrency](#concurrency)
* [Exercises (concurrency)](#exercises-2)
* [Interruption](#interruption)
* [Talking to the external world](#talking-to-the-external-world)
* [Reactive streams](#reactive-streams)
* [Learning more](#learning-more)
@@ -412,6 +413,81 @@ type Pipe2[F[_],-I,-I2,+O] = (Stream[F,I], Stream[F,I2]) => Stream[F,O]
def mergeHaltBoth[F[_]:Concurrent,O]: Pipe2[F,O,O,O] = (s1, s2) => ???
```

### Interruption

Sometimes some tasks have to run only when some conditions are met or until some other task completes. Luckily for us, `Stream` defines some really useful methods that let us accomplish this.
In the following example we will see how `interruptWhen` helps us to describe such cases. We will describe a program composed by two concurrent streams: the first will print the current time to the console every second, the second will stop the first.

First of all we will need to set up the environment with some imports and declare some implicit values.
```tut:book:reset
import fs2.Stream
import cats.effect.IO
import cats.effect.concurrent.Deferred
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
```

The example looks like this:
```tut:book
val program =
Stream.eval(Deferred[IO, Unit]).flatMap { switch =>
val switcher =
Stream.eval(switch.complete(())).delayBy(5.seconds)
val program =
Stream.repeatEval(IO(println(java.time.LocalTime.now))).metered(1.second)
program
.interruptWhen(switch.get.attempt)
.concurrently(switcher)
}
program.compile.drain.unsafeRunSync
```

Let's take this line by line now, so we can understand what's going on.
```scala
val program =
Stream.eval(Deferred[IO, Unit]).flatMap { switch =>
```

Here we create a `Stream[IO, Deferred[IO, Unit]]`. [`Deferred`](https://typelevel.org/cats-effect/concurrency/deferred.html) is a concurrency primitive that represents a condition yet to be fulfilled. We will use the emitted istance of `Deferred[IO, Unit]` as a mechanism to signal the completion of a task. Given this purpose, we call this instance `switch`.

```scala
val switcher =
Stream.eval(switch.complete(())).delayBy(5.seconds)
```

The `switcher` will be the stream that, after 5 seconds, will "flip" the `switch` calling `complete` on it. `delayBy` concatenates the stream after another that sleeps for the specified duration, effectively delaying the evaluation of our stream.
```scala
val program =
Stream.repeatEval(IO(println(java.time.LocalTime.now))).metered(1.second)
```

This is the program we want to interrupt. `repeatEval` is the effectful version of `repeat`. `metered`, on the other hand, forces our stream to emit values at the specified rate (in this case one every second).

```scala
program
.interruptWhen(switch.get.attempt)
.concurrently(switcher)
```

In this line we call `interruptWhen` on the stream, obtaining a stream that will stop evaluation as soon as "the `switch` gets flipped"; then, thanks to `concurrently`, we tell that we want the `switcher` to run in the _background_ ignoring his output. This gives us back the program we described back at the start of this chapter.

This is a way to create a program that runs for a given time, in this example 5 seconds. Timed interruption is such a common use case that FS2 defines the `interruptAfter` method. Armed with this knowledge we can rewrite our example as:
```tut:book
val program1 =
Stream.
repeatEval(IO(println(java.time.LocalTime.now))).
metered(1.second).
interruptAfter(5.seconds)
program1.compile.drain.unsafeRunSync
```

### Talking to the external world

When talking to the external world, there are a few different situations you might encounter:

0 comments on commit 5eb3d9d

Please sign in to comment.
You can’t perform that action at this time.