Skip to content

Commit

Permalink
Added special handling of Uncons and CloseScope in case of interrupt
Browse files Browse the repository at this point in the history
  • Loading branch information
pchlupacek committed Jan 4, 2018
1 parent 3fa9dea commit f73714e
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 12 deletions.
14 changes: 7 additions & 7 deletions build.sbt
Expand Up @@ -27,15 +27,15 @@ lazy val commonSettings = Seq(
"-language:postfixOps",
"-Ypartial-unification"
) ++
(if (scalaBinaryVersion.value startsWith "2.12")
(if (scalaBinaryVersion.value.startsWith("2.12"))
List(
"-Xlint",
"-Xfatal-warnings",
"-Yno-adapted-args",
"-Ywarn-value-discard",
"-Ywarn-unused-import"
)
else Nil) ++ (if (scalaBinaryVersion.value startsWith "2.11")
else Nil) ++ (if (scalaBinaryVersion.value.startsWith("2.11"))
List("-Xexperimental")
else
Nil), // 2.11 needs -Xexperimental to enable SAM conversion
Expand Down Expand Up @@ -93,17 +93,17 @@ lazy val scaladocSettings = Seq(
"-implicits-sound-shadowing",
"-implicits-show-all"
),
scalacOptions in (Compile, doc) ~= { _ filterNot { _ == "-Xfatal-warnings" } },
scalacOptions in (Compile, doc) ~= { _.filterNot { _ == "-Xfatal-warnings" } },
autoAPIMappings := true
)

lazy val publishingSettings = Seq(
publishTo := {
val nexus = "https://oss.sonatype.org/"
if (version.value.trim.endsWith("SNAPSHOT"))
Some("snapshots" at nexus + "content/repositories/snapshots")
Some("snapshots".at(nexus + "content/repositories/snapshots"))
else
Some("releases" at nexus + "service/local/staging/deploy/maven2")
Some("releases".at(nexus + "service/local/staging/deploy/maven2"))
},
credentials ++= (for {
username <- Option(System.getenv().get("SONATYPE_USERNAME"))
Expand Down Expand Up @@ -281,7 +281,7 @@ lazy val benchmarkMacros = project
.settings(noPublish)
.settings(
name := "fs2-benchmark-macros",
addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.patch),
addCompilerPlugin(("org.scalamacros" % "paradise" % "2.1.1").cross(CrossVersion.patch)),
libraryDependencies += scalaOrganization.value % "scala-reflect" % scalaVersion.value
)

Expand All @@ -294,7 +294,7 @@ lazy val benchmark = project
name := "fs2-benchmark"
)
.settings(
addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.patch),
addCompilerPlugin(("org.scalamacros" % "paradise" % "2.1.1").cross(CrossVersion.patch)),
libraryDependencies += scalaOrganization.value % "scala-reflect" % scalaVersion.value
)
.enablePlugins(JmhPlugin)
Expand Down
10 changes: 9 additions & 1 deletion core/jvm/src/test/scala/fs2/Pipe2Spec.scala
Expand Up @@ -215,8 +215,15 @@ class Pipe2Spec extends Fs2Spec {

"interrupt (4)" in {
// tests the interruption of the constant stream with flatMap combinator
// for { i <- 0 until 1000} yield {
val interrupt =
mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt
mkScheduler
.flatMap {
_.sleep_[IO](20.millis)
}
.compile
.drain
.attempt
Stream
.constant(true)
.covary[IO]
Expand All @@ -227,6 +234,7 @@ class Pipe2Spec extends Fs2Spec {
.compile
.drain
.unsafeRunSync
// }
}

"interrupt (5)" in {
Expand Down
32 changes: 28 additions & 4 deletions core/shared/src/main/scala/fs2/internal/Algebra.scala
Expand Up @@ -159,7 +159,7 @@ private[fs2] object Algebra {
}

case alg: Effectful[F, O, r] =>
F.flatMap(compileShared(scope, alg)) {
F.flatMap(compileEffect(scope, alg)) {
case (scope, r) =>
compileUncons(scope, f(r), chunkSize, maxSteps)
}
Expand Down Expand Up @@ -216,19 +216,43 @@ private[fs2] object Algebra {
}

case alg: Effectful[F, O, _] =>
F.flatMap(compileShared(scope, alg)) {
F.flatMap(compileEffect(scope, alg)) {
case (scope, r) =>
compileFoldLoop(scope, acc, g, f(r))
}

}
case Some(rsn) => compileFoldLoop(scope, acc, g, f(Left(rsn)))
case Some(int: Interrupted) =>
fx match {
case uncons: Algebra.Uncons[F, x, O] =>
F.flatMap(
F.attempt(
compileUncons(scope,
uncons.s.asHandler(int),
uncons.chunkSize,
uncons.maxSteps))) {
case Right((scope, u)) =>
compileFoldLoop(scope, acc, g, f(Right(u)))
case Left(err) => compileFoldLoop(scope, acc, g, f(Left(err)))
}

case c: Algebra.CloseScope[F, O] =>
F.flatMap(c.toClose.close) { result =>
F.flatMap(c.toClose.openAncestor) { scopeAfterClose =>
compileFoldLoop(scopeAfterClose, acc, g, f(Left(int)))
}
}

case other => compileFoldLoop(scope, acc, g, f(Left(int)))
}
case Some(rsn) =>
compileFoldLoop(scope, acc, g, f(Left(rsn)))
}
case e =>
sys.error("FreeC.ViewL structure must be Pure(a), Fail(e), or Bind(Eval(fx),k), was: " + e)
}

def compileShared[F[_], O](
def compileEffect[F[_], O](
scope: CompileScope[F],
eff: Effectful[F, O, _]
)(implicit F: Sync[F]): F[(CompileScope[F], Either[Throwable, Any])] =
Expand Down

0 comments on commit f73714e

Please sign in to comment.