Skip to content

Commit

Permalink
Add test for cancelability of the Resource compiler
Browse files Browse the repository at this point in the history
  • Loading branch information
SystemFw committed Mar 17, 2019
1 parent a34ed54 commit 0b175de
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 5 deletions.
39 changes: 38 additions & 1 deletion core/jvm/src/test/scala/fs2/ResourceCompilationSpec.scala
@@ -1,7 +1,7 @@
package fs2

import cats.implicits._
import cats.effect.{IO, Resource}
import cats.effect.{ExitCase, IO, Resource}
import cats.effect.concurrent.{Deferred, Ref}
import scala.concurrent.duration._

Expand Down Expand Up @@ -85,6 +85,43 @@ class ResourceCompilationSpec extends AsyncFs2Spec {
.unsafeToFuture
.map(written => written shouldBe false)
}

"compile.resource - interruption (1)" in {
val s = Stream
.resource {
Stream.never[IO].compile.resource.drain
}
.interruptAfter(200.millis)
.drain ++ Stream.emit(true)

s.compile.drain
.timeout(2.seconds)
.unsafeToFuture
.map(_ shouldBe true)
}

"compile.resource - interruption (2)" in {
val p = (Deferred[IO, Unit], Deferred[IO, ExitCase[Throwable]]).mapN { (started, stop) =>
val r = Stream
.never[IO]
.compile
.resource
.drain
.use { _ =>
IO.unit
}
.guaranteeCase(stop.complete)

(started.complete(()) >> r).start.flatMap { fiber =>
started.get >> fiber.cancel >> stop.get
}
}

p.flatten
.timeout(2.seconds)
.unsafeToFuture
.map(_ shouldBe ExitCase.Canceled)
}
}

object ResourceCompilationSpec {
Expand Down
8 changes: 4 additions & 4 deletions io/src/main/scala/fs2/io/io.scala
Expand Up @@ -142,10 +142,10 @@ package object io {
* Each write operation is performed on the supplied execution context. Writes are
* blocking so the execution context should be configured appropriately.
*/
def stdoutLines[F[_], O](blockingExecutionContext: ExecutionContext, charset: Charset = utf8Charset)(
implicit F: Sync[F],
cs: ContextShift[F],
show: Show[O]): Pipe[F, O, Unit] =
def stdoutLines[F[_], O](blockingExecutionContext: ExecutionContext,
charset: Charset = utf8Charset)(implicit F: Sync[F],
cs: ContextShift[F],
show: Show[O]): Pipe[F, O, Unit] =
_.map(_.show).through(text.encode(charset)).through(stdout(blockingExecutionContext))

/** Stream of `String` read asynchronously from standard input decoded in UTF-8. */
Expand Down

0 comments on commit 0b175de

Please sign in to comment.