Skip to content
Permalink
Browse files

Replaced blockingExecutionContext with Blocker

  • Loading branch information...
mpilquist committed Jun 9, 2019
1 parent a666a4f commit c42170e95baa1e796886ad9a15856b4c34fdc269
@@ -73,29 +73,25 @@ libraryDependencies += "co.fs2" %%% "fs2-core" % "1.0.4"
FS2 is a streaming I/O library. The design goals are compositionality, expressiveness, resource safety, and speed. Here's a simple example of its use:

```scala
import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import cats.implicits._
import fs2.{io, text, Stream}
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
object Converter extends IOApp {
private val blockingExecutionContext =
Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())))(ec => IO(ec.shutdown()))
val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC))
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
}
def run(args: List[String]): IO[ExitCode] =
@@ -56,8 +56,8 @@ lazy val commonSettings = Seq(
compilerPlugin("org.typelevel" %% "kind-projector" % "0.10.1"),
"org.typelevel" %%% "cats-core" % "2.0.0-M2",
"org.typelevel" %%% "cats-laws" % "2.0.0-M2" % "test",
"org.typelevel" %%% "cats-effect" % "2.0.0-M2",
"org.typelevel" %%% "cats-effect-laws" % "2.0.0-M2" % "test",
"org.typelevel" %%% "cats-effect" % "2.0.0-637eb93",
"org.typelevel" %%% "cats-effect-laws" % "2.0.0-637eb93" % "test",
"org.scalacheck" %%% "scalacheck" % "1.14.0" % "test",
"org.scalatest" %%% "scalatest" % "3.1.0-SNAP11" % "test",
"org.scalatestplus" %%% "scalatestplus-scalacheck" % "1.0.0-SNAP6" % "test"
@@ -1,24 +1,5 @@
package fs2

import scala.concurrent.ExecutionContext

import cats.effect.{IO, Resource}
import cats.implicits._

import java.util.concurrent.Executors

import fs2.internal.ThreadFactories

trait TestPlatform {

def isJVM: Boolean = true

val blockingExecutionContext: Resource[IO, ExecutionContext] =
Resource
.make(
IO(ExecutionContext.fromExecutorService(
Executors.newCachedThreadPool(ThreadFactories.named("fs2-blocking", true)))))(ec =>
IO(ec.shutdown()))
.widen[ExecutionContext]

}
@@ -13,7 +13,6 @@ import fs2.internal.{Resource => _, _}
import java.io.PrintStream

import scala.annotation.tailrec
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

/**
@@ -1573,7 +1572,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* Writes this stream of strings to the supplied `PrintStream`.
*
* Note: printing to the `PrintStream` is performed *synchronously*.
* Use `linesAsync(out, blockingEc)` if synchronous writes are a concern.
* Use `linesAsync(out, blocker)` if synchronous writes are a concern.
*/
def lines[F2[x] >: F[x]](out: PrintStream)(implicit F: Sync[F2],
ev: O <:< String): Stream[F2, Unit] = {
@@ -1587,13 +1586,13 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
*
* Note: printing to the `PrintStream` is performed on the supplied blocking execution context.
*/
def linesAsync[F2[x] >: F[x]](out: PrintStream, blockingExecutionContext: ExecutionContext)(
def linesAsync[F2[x] >: F[x]](out: PrintStream, blocker: Blocker)(
implicit F: Sync[F2],
cs: ContextShift[F2],
ev: O <:< String): Stream[F2, Unit] = {
val _ = ev
val src = this.asInstanceOf[Stream[F2, String]]
src.evalMap(str => cs.evalOn(blockingExecutionContext)(F.delay(out.println(str))))
src.evalMap(str => blocker.delay(out.println(str)))
}

/**
@@ -2348,7 +2347,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
* Writes this stream to the supplied `PrintStream`, converting each element to a `String` via `Show`.
*
* Note: printing to the `PrintStream` is performed *synchronously*.
* Use `showLinesAsync(out, blockingEc)` if synchronous writes are a concern.
* Use `showLinesAsync(out, blocker)` if synchronous writes are a concern.
*/
def showLines[F2[x] >: F[x], O2 >: O](out: PrintStream)(implicit F: Sync[F2],
showO: Show[O2]): Stream[F2, Unit] =
@@ -2359,12 +2358,10 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
*
* Note: printing to the `PrintStream` is performed on the supplied blocking execution context.
*/
def showLinesAsync[F2[x] >: F[x], O2 >: O](out: PrintStream,
blockingExecutionContext: ExecutionContext)(
implicit F: Sync[F2],
cs: ContextShift[F2],
showO: Show[O2]): Stream[F2, Unit] =
covaryAll[F2, O2].map(_.show).linesAsync(out, blockingExecutionContext)
def showLinesAsync[F2[x] >: F[x]: Sync: ContextShift, O2 >: O: Show](
out: PrintStream,
blocker: Blocker): Stream[F2, Unit] =
covaryAll[F2, O2].map(_.show).linesAsync(out, blocker)

/**
* Writes this stream to standard out, converting each element to a `String` via `Show`.
@@ -2381,11 +2378,9 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
*
* Note: printing to the `PrintStream` is performed on the supplied blocking execution context.
*/
def showLinesStdOutAsync[F2[x] >: F[x], O2 >: O](blockingExecutionContext: ExecutionContext)(
implicit F: Sync[F2],
cs: ContextShift[F2],
showO: Show[O2]): Stream[F2, Unit] =
showLinesAsync[F2, O2](Console.out, blockingExecutionContext)
def showLinesStdOutAsync[F2[x] >: F[x]: Sync: ContextShift, O2 >: O: Show](
blocker: Blocker): Stream[F2, Unit] =
showLinesAsync[F2, O2](Console.out, blocker)

/**
* Groups inputs in fixed size chunks by passing a "sliding window"
@@ -3,8 +3,8 @@
This walks through the implementation of the example given in [the README](../README.md). This program opens a file, `fahrenheit.txt`, containing temperatures in degrees fahrenheit, one per line, and converts each temperature to celsius, incrementally writing to the file `celsius.txt`. Both files will be closed, regardless of whether any errors occur.

```scala
import cats.effect.{ExitCode, IO, IOApp, Resource}
// import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
// import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import cats.implicits._
// import cats.implicits._
@@ -15,28 +15,20 @@ import fs2.{io, text, Stream}
import java.nio.file.Paths
// import java.nio.file.Paths
import java.util.concurrent.Executors
// import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
// import scala.concurrent.ExecutionContext
object Converter extends IOApp {
private val blockingExecutionContext =
Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())))(ec => IO(ec.shutdown()))
val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC))
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
}
def run(args: List[String]): IO[ExitCode] =
@@ -54,17 +46,20 @@ Operations on `Stream` are defined for any choice of type constructor, not just
`fs2.io` has a number of helper functions for constructing or working with streams that talk to the outside world. `readAll` creates a stream of bytes from a file name (specified via a `java.nio.file.Path`). It encapsulates the logic for opening and closing the file, so that users of this stream do not need to remember to close the file when they are done or in the event of exceptions during processing of the stream.

```scala
import cats.effect.{ContextShift, IO}
import cats.effect.{Blocker, ContextShift, IO}
import fs2.{io, text}
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
//note: this should be shut down when it's no longer necessary - normally that's at the end of your app.
//See the whole README example for proper resource management in terms of ExecutionContexts.
val blockingExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
// Note: to make these examples work in docs, we create a `Blocker` manually here but in real code,
// we should always use `Blocker[IO]`, which returns the blocker as a resource that shuts down the pool
// upon finalization, like in the original example.
// See the whole README example for proper resource management in terms of `Blocker`.
val blockingPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
val blocker: Blocker = Blocker.unsafeFromExecutionContext(blockingPool)
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
@@ -75,7 +70,7 @@ scala> import fs2.Stream
import fs2.Stream
scala> val src: Stream[IO, Byte] =
| io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingExecutionContext, 4096)
| io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
src: fs2.Stream[cats.effect.IO,Byte] = Stream(..)
```

@@ -118,7 +113,7 @@ encodedBytes: fs2.Stream[cats.effect.IO,Byte] = Stream(..)
We then write the encoded bytes to a file. Note that nothing has happened at this point -- we are just constructing a description of a computation that, when interpreted, will incrementally consume the stream, sending converted values to the specified file.

```scala
scala> val written: Stream[IO, Unit] = encodedBytes.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingExecutionContext))
scala> val written: Stream[IO, Unit] = encodedBytes.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
written: fs2.Stream[cats.effect.IO,Unit] = Stream(..)
```

@@ -131,8 +126,8 @@ task: cats.effect.IO[Unit] = <function1>

We still haven't *done* anything yet. Effects only occur when we run the resulting task. We can run a `IO` by calling `unsafeRunSync()` -- the name is telling us that calling it performs effects and hence, it is not referentially transparent. In this example, we extended `IOApp`, which lets us express our overall program as an `IO[ExitCase]`. The `IOApp` class handles running the task and hooking it up to the application entry point.

Let's shut down the ExecutionContext that we allocated earlier.
Let's shut down the thread pool that we allocated earlier -- reminder: in real code, we would not manually control the lifecycle of the blocking thread pool -- we'd use the resource returned from `Blocker[IO]` to manage it automatically, like in the full example we started with.

```scala
scala> blockingExecutionContext.shutdown()
scala> blockingPool.shutdown()
```
@@ -3,29 +3,25 @@
This walks through the implementation of the example given in [the README](../README.md). This program opens a file, `fahrenheit.txt`, containing temperatures in degrees fahrenheit, one per line, and converts each temperature to celsius, incrementally writing to the file `celsius.txt`. Both files will be closed, regardless of whether any errors occur.

```tut:book
import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.effect.{Blocker, ExitCode, IO, IOApp, Resource}
import cats.implicits._
import fs2.{io, text, Stream}
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
object Converter extends IOApp {
private val blockingExecutionContext =
Resource.make(IO(ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())))(ec => IO(ec.shutdown()))
val converter: Stream[IO, Unit] = Stream.resource(blockingExecutionContext).flatMap { blockingEC =>
val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC))
.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
}
def run(args: List[String]): IO[ExitCode] =
@@ -42,17 +38,20 @@ Operations on `Stream` are defined for any choice of type constructor, not just
`fs2.io` has a number of helper functions for constructing or working with streams that talk to the outside world. `readAll` creates a stream of bytes from a file name (specified via a `java.nio.file.Path`). It encapsulates the logic for opening and closing the file, so that users of this stream do not need to remember to close the file when they are done or in the event of exceptions during processing of the stream.

```tut:silent
import cats.effect.{ContextShift, IO}
import cats.effect.{Blocker, ContextShift, IO}
import fs2.{io, text}
import java.nio.file.Paths
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)
//note: this should be shut down when it's no longer necessary - normally that's at the end of your app.
//See the whole README example for proper resource management in terms of ExecutionContexts.
val blockingExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
// Note: to make these examples work in docs, we create a `Blocker` manually here but in real code,
// we should always use `Blocker[IO]`, which returns the blocker as a resource that shuts down the pool
// upon finalization, like in the original example.
// See the whole README example for proper resource management in terms of `Blocker`.
val blockingPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
val blocker: Blocker = Blocker.unsafeFromExecutionContext(blockingPool)
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0/9.0)
@@ -62,7 +61,7 @@ def fahrenheitToCelsius(f: Double): Double =
import fs2.Stream
val src: Stream[IO, Byte] =
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingExecutionContext, 4096)
io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blocker, 4096)
```

A stream can be attached to a pipe, allowing for stateful transformations of the input values. Here, we attach the source stream to the `text.utf8Decode` pipe, which converts the stream of bytes to a stream of strings. We then attach the result to the `text.lines` pipe, which buffers strings and emits full lines. Pipes are expressed using the type `Pipe[F,I,O]`, which describes a pipe that can accept input values of type `I` and can output values of type `O`, potentially evaluating an effect periodically.
@@ -97,7 +96,7 @@ val encodedBytes: Stream[IO, Byte] = withNewlines.through(text.utf8Encode)
We then write the encoded bytes to a file. Note that nothing has happened at this point -- we are just constructing a description of a computation that, when interpreted, will incrementally consume the stream, sending converted values to the specified file.

```tut
val written: Stream[IO, Unit] = encodedBytes.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingExecutionContext))
val written: Stream[IO, Unit] = encodedBytes.through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
```

There are a number of ways of interpreting the stream. In this case, we call `compile.drain`, which returns a val value of the effect type, `IO`. The output of the stream is ignored - we compile it solely for its effect.
@@ -108,8 +107,8 @@ val task: IO[Unit] = written.compile.drain

We still haven't *done* anything yet. Effects only occur when we run the resulting task. We can run a `IO` by calling `unsafeRunSync()` -- the name is telling us that calling it performs effects and hence, it is not referentially transparent. In this example, we extended `IOApp`, which lets us express our overall program as an `IO[ExitCase]`. The `IOApp` class handles running the task and hooking it up to the application entry point.

Let's shut down the ExecutionContext that we allocated earlier.
Let's shut down the thread pool that we allocated earlier -- reminder: in real code, we would not manually control the lifecycle of the blocking thread pool -- we'd use the resource returned from `Blocker[IO]` to manage it automatically, like in the full example we started with.

```tut
blockingExecutionContext.shutdown()
blockingPool.shutdown()
```

0 comments on commit c42170e

Please sign in to comment.
You can’t perform that action at this time.