Skip to content
Permalink
Browse files

Merge branch 'series/1.0' into patch-1

  • Loading branch information...
mpilquist committed Apr 11, 2019
2 parents 52d384b + 1194292 commit 24a5e7ef755a7274b26850d2efd12fb49a0a3e90
Showing with 2,241 additions and 568 deletions.
  1. +1 −0 CONTRIBUTORS.md
  2. +8 −8 README.md
  3. +22 −16 build.sbt
  4. +0 −1 core/jvm/src/test/scala/fs2/CompressSpec.scala
  5. +14 −0 core/jvm/src/test/scala/fs2/GroupWithinSpec.scala
  6. +20 −1 core/jvm/src/test/scala/fs2/Pipe2Spec.scala
  7. +132 −0 core/jvm/src/test/scala/fs2/ResourceCompilationSpec.scala
  8. +49 −5 core/jvm/src/test/scala/fs2/StreamSpec.scala
  9. +9 −0 core/shared/src/main/scala-2.12-/fs2/internal/ArrayBackedSeq.scala
  10. +9 −0 core/shared/src/main/scala-2.13+/fs2/internal/ArrayBackedSeq.scala
  11. +212 −154 core/shared/src/main/scala/fs2/Chunk.scala
  12. +20 −6 core/shared/src/main/scala/fs2/Pull.scala
  13. +322 −61 core/shared/src/main/scala/fs2/Stream.scala
  14. +7 −26 core/shared/src/main/scala/fs2/internal/Algebra.scala
  15. +8 −21 core/shared/src/main/scala/fs2/internal/CompileScope.scala
  16. +5 −8 core/shared/src/main/scala/fs2/internal/Resource.scala
  17. +10 −2 core/shared/src/main/scala/fs2/text.scala
  18. +270 −0 core/shared/src/test/scala/fs2/ChunkGen.scala
  19. +2 −2 core/shared/src/test/scala/fs2/ChunkProps.scala
  20. +47 −134 core/shared/src/test/scala/fs2/ChunkSpec.scala
  21. +2 −2 core/shared/src/test/scala/fs2/Fs2Spec.scala
  22. +1 −13 core/shared/src/test/scala/fs2/TestUtil.scala
  23. +47 −49 io/src/main/scala/fs2/io/JavaInputOutputStream.scala
  24. +32 −2 io/src/main/scala/fs2/io/io.scala
  25. +8 −18 io/src/test/scala/fs2/io/JavaInputOutputStreamSpec.scala
  26. +4 −1 io/src/test/scala/fs2/io/tcp/SocketSpec.scala
  27. +6 −5 project/plugins.sbt
  28. +2 −2 reactive-streams/src/test/scala/fs2/interop/reactivestreams/PublisherToSubscriberSpec.scala
  29. +1 −1 reactive-streams/src/test/scala/fs2/interop/reactivestreams/StreamUnicastPublisherSpec.scala
  30. +1 −1 reactive-streams/src/test/scala/fs2/interop/reactivestreams/SubscriberSpec.scala
  31. +58 −0 scalafix/build.sbt
  32. +40 −0 scalafix/input/src/main/scala/fix/Bracket.scala
  33. +17 −0 scalafix/input/src/main/scala/fix/Chunk.scala
  34. +51 −0 scalafix/input/src/main/scala/fix/ConcurrentDataTypes.scala
  35. +21 −0 scalafix/input/src/main/scala/fix/MyAppClass.scala
  36. +19 −0 scalafix/input/src/main/scala/fix/MyAppObject.scala
  37. +24 −0 scalafix/input/src/main/scala/fix/Scheduler.scala
  38. +16 −0 scalafix/input/src/main/scala/fix/Usability.scala
  39. +37 −0 scalafix/output/src/main/scala/fix/Bracket.scala
  40. +15 −0 scalafix/output/src/main/scala/fix/Chunk.scala
  41. +49 −0 scalafix/output/src/main/scala/fix/ConcurrentDataTypes.scala
  42. +17 −0 scalafix/output/src/main/scala/fix/MyAppClass.scala
  43. +15 −0 scalafix/output/src/main/scala/fix/MyAppObject.scala
  44. +21 −0 scalafix/output/src/main/scala/fix/Scheduler.scala
  45. +14 −0 scalafix/output/src/main/scala/fix/Usability.scala
  46. +1 −0 scalafix/project/build.properties
  47. +2 −0 scalafix/project/plugins.sbt
  48. +1 −0 scalafix/rules/src/main/resources/META-INF/services/scalafix.v1.Rule
  49. +511 −0 scalafix/rules/src/main/scala/fix/v1.scala
  50. +7 −0 scalafix/tests/src/test/scala/fix/RuleSuite.scala
  51. +32 −27 site/src/main/tut/concurrency-primitives.md
  52. +1 −1 site/src/main/tut/guide.md
  53. +1 −1 version.sbt
@@ -19,3 +19,4 @@ Also see the [GitHub contributor stats](https://github.com/functional-streams-fo
- Jed Wesley-Smith ([@jedws](https://github.com/jedws)): really very minor tweaks, cleanups and pestering, hardly worth the mention
- Michael Pilquist ([@mpilquist](https://github.com/mpilquist)): 0.9 redesign work, maintenance
- Daniel Urban ([@durban](https://github.com/durban)): queue peek implementation
- Tamer Abdulradi ([@tabdulradi](https://github.com/tabdulradi)): implementation of scanMap and scanMonoid
@@ -38,34 +38,34 @@ summarizes the differences between 1.0 and 0.10. To get 1.0.x, add the following

```
// available for Scala 2.11, 2.12
libraryDependencies += "co.fs2" %% "fs2-core" % "1.0.2" // For cats 1.5.0 and cats-effect 1.1.0
libraryDependencies += "co.fs2" %% "fs2-core" % "1.0.4" // For cats 1.5.0 and cats-effect 1.2.0
// optional I/O library
libraryDependencies += "co.fs2" %% "fs2-io" % "1.0.2"
libraryDependencies += "co.fs2" %% "fs2-io" % "1.0.4"
// optional reactive streams interop
libraryDependencies += "co.fs2" %% "fs2-reactive-streams" % "1.0.2"
libraryDependencies += "co.fs2" %% "fs2-reactive-streams" % "1.0.4"
// optional experimental library
libraryDependencies += "co.fs2" %% "fs2-experimental" % "1.0.2"
libraryDependencies += "co.fs2" %% "fs2-experimental" % "1.0.4"
```

The previous stable release is 0.10.6. You may want to first
The previous stable release is 0.10.7. You may want to first
[read the 0.10 migration guide](https://github.com/functional-streams-for-scala/fs2/blob/series/0.10/docs/migration-guide-0.10.md)
if you are upgrading from 0.9 or earlier. To get 0.10, add the following to your SBT build:

```
// available for Scala 2.11, 2.12
libraryDependencies += "co.fs2" %% "fs2-core" % "0.10.6"
libraryDependencies += "co.fs2" %% "fs2-core" % "0.10.7"
// optional I/O library
libraryDependencies += "co.fs2" %% "fs2-io" % "0.10.6"
libraryDependencies += "co.fs2" %% "fs2-io" % "0.10.7"
```

The fs2-core library is also supported on Scala.js:

```
libraryDependencies += "co.fs2" %%% "fs2-core" % "0.10.6"
libraryDependencies += "co.fs2" %%% "fs2-core" % "1.0.4"
```

### <a id="about"></a>Example ###
@@ -49,24 +49,19 @@ lazy val commonSettings = Seq(
javaOptions in (Test, run) ++= Seq("-Xms64m", "-Xmx64m"),
libraryDependencies ++= Seq(
compilerPlugin("org.spire-math" %% "kind-projector" % "0.9.9"),
"org.typelevel" %%% "cats-core" % "1.5.0",
"org.typelevel" %%% "cats-laws" % "1.5.0" % "test",
"org.typelevel" %%% "cats-effect" % "1.1.0",
"org.typelevel" %%% "cats-effect-laws" % "1.1.0" % "test",
"org.scala-lang.modules" %%% "scala-collection-compat" % "0.2.1"
"org.typelevel" %%% "cats-core" % "1.6.0",
"org.typelevel" %%% "cats-laws" % "1.6.0" % "test",
"org.typelevel" %%% "cats-effect" % "1.2.0",
"org.typelevel" %%% "cats-effect-laws" % "1.2.0" % "test",
"org.scala-lang.modules" %%% "scala-collection-compat" % "0.3.0",
"org.scalatest" %%% "scalatest" % "3.0.6" % "test"
),
libraryDependencies ++= {
libraryDependencies += {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, v)) if v >= 13 =>
Seq(
"org.scalatest" %%% "scalatest" % "3.0.6-SNAP5" % "test",
"org.scalacheck" %%% "scalacheck" % "1.14.0" % "test"
)
"org.scalacheck" %%% "scalacheck" % "1.14.0" % "test"
case _ =>
Seq(
"org.scalatest" %%% "scalatest" % "3.0.5" % "test",
"org.scalacheck" %%% "scalacheck" % "1.13.5" % "test"
)
"org.scalacheck" %%% "scalacheck" % "1.13.5" % "test"
}
},
scmInfo := Some(ScmInfo(url("https://github.com/functional-streams-for-scala/fs2"),
@@ -223,6 +218,8 @@ lazy val mimaSettings = Seq(
ProblemFilters
.exclude[DirectMissingMethodProblem]("fs2.io.package.invokeCallback"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.tcp.Socket.client"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"fs2.io.JavaInputOutputStream.toInputStream"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.tcp.Socket.server"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.tcp.Socket.mkSocket"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.udp.Socket.mkSocket"),
@@ -231,7 +228,9 @@ lazy val mimaSettings = Seq(
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.Stream.bracketFinalizer"),
// Compiler#apply is private[fs2]
ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.Stream#Compiler.apply"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.Stream#Compiler.apply")
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.Stream#Compiler.apply"),
// bracketWithToken was private[fs2]
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.Stream.bracketWithToken")
)
)

@@ -254,7 +253,14 @@ lazy val core = crossProject(JVMPlatform, JSPlatform)
.settings(
name := "fs2-core",
sourceDirectories in (Compile, scalafmt) += baseDirectory.value / "../shared/src/main/scala",
libraryDependencies += "org.scodec" %%% "scodec-bits" % "1.1.7"
unmanagedSourceDirectories in Compile += {
val dir = CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, v)) if v >= 13 => "scala-2.13+"
case _ => "scala-2.12-"
}
baseDirectory.value / "../shared/src/main" / dir
},
libraryDependencies += "org.scodec" %%% "scodec-bits" % "1.1.9"
)
.jsSettings(commonJsSettings: _*)

@@ -16,7 +16,6 @@ import java.util.zip.{
}

import scala.collection.mutable
import scala.collection.compat._

import TestUtil._
import compress._
@@ -21,6 +21,20 @@ class GroupWithinSpec extends Fs2Spec {
runLog(action) shouldBe (result)
}

"groupWithin should never emit empty groups" in forAll {
(s: PureStream[VeryShortFiniteDuration], d: ShortFiniteDuration, maxGroupSize: SmallPositive) =>
whenever(s.get.toVector.nonEmpty) {
val action =
s.get
.covary[IO]
.evalTap(shortDuration => IO.sleep(shortDuration.get))
.groupWithin(maxGroupSize.get, d.get)
.map(_.toList)

runLog(action).foreach(group => group should not be empty)
}
}

"groupWithin should never have more elements than in its specified limit" in forAll {
(s: PureStream[VeryShortFiniteDuration], d: ShortFiniteDuration, maxGroupSize: SmallPositive) =>
val maxGroupSizeAsInt = maxGroupSize.get
@@ -2,7 +2,7 @@ package fs2

import scala.concurrent.duration._
import cats.effect.IO
import cats.effect.concurrent.Semaphore
import cats.effect.concurrent.{Deferred, Semaphore}
import cats.implicits._
import org.scalacheck.Gen
import TestUtil._
@@ -494,6 +494,25 @@ class Pipe2Spec extends Fs2Spec {

}

"interrupt (19)" in {
// interruptible eval

def prg =
Deferred[IO, Unit]
.flatMap { latch =>
Stream
.eval {
latch.get.guarantee(latch.complete(()))
}
.interruptAfter(200.millis)
.compile
.drain >> latch.get.as(true)
}
.timeout(3.seconds)

prg.unsafeRunSync shouldBe true
}

"nested-interrupt (1)" in forAll { s1: PureStream[Int] =>
val s = Semaphore[IO](0).unsafeRunSync()
val interrupt: IO[Either[Throwable, Unit]] =
@@ -0,0 +1,132 @@
package fs2

import cats.implicits._
import cats.effect.{ExitCase, IO, Resource}
import cats.effect.concurrent.{Deferred, Ref}
import scala.concurrent.duration._

class ResourceCompilationSpec extends AsyncFs2Spec {
"compile.resource - concurrently" in {
val prog: Resource[IO, IO[Unit]] =
Stream
.eval(Deferred[IO, Unit].product(Deferred[IO, Unit]))
.flatMap {
case (startCondition, waitForStream) =>
val worker = Stream.eval(startCondition.get) ++ Stream.eval(waitForStream.complete(()))
val result = startCondition.complete(()) >> waitForStream.get

Stream.emit(result).concurrently(worker)
}
.compile
.resource
.lastOrError

prog.use(x => x).timeout(5.seconds).unsafeToFuture
}

"compile.resource - onFinalise" in {
val expected = List(
"stream - start",
"stream - done",
"io - done",
"io - start",
"resource - start",
"resource - done"
)

Ref[IO]
.of(List.empty[String])
.flatMap { st =>
def record(s: String): IO[Unit] = st.update(_ :+ s)

def stream =
Stream
.emit("stream - start")
.onFinalize(record("stream - done"))
.evalMap(x => record(x))
.compile
.lastOrError

def io =
Stream
.emit("io - start")
.onFinalize(record("io - done"))
.compile
.lastOrError
.flatMap(x => record(x))

def resource =
Stream
.emit("resource - start")
.onFinalize(record("resource - done"))
.compile
.resource
.lastOrError
.use(x => record(x))

stream >> io >> resource >> st.get
}
.unsafeToFuture
.map(_ shouldBe expected)
}

"compile.resource - allocated" in {
Ref[IO]
.of(false)
.flatMap { written =>
Stream
.emit(())
.onFinalize(written.set(true))
.compile
.resource
.lastOrError
.allocated >> written.get
}
.unsafeToFuture
.map(written => written shouldBe false)
}

"compile.resource - interruption (1)" in {
val s = Stream
.resource {
Stream.never[IO].compile.resource.drain
}
.interruptAfter(200.millis)
.drain ++ Stream.emit(true)

s.compile.lastOrError
.timeout(2.seconds)
.unsafeToFuture
.map(_ shouldBe true)
}

"compile.resource - interruption (2)" in {
val p = (Deferred[IO, ExitCase[Throwable]]).flatMap { stop =>
val r = Stream
.never[IO]
.compile
.resource
.drain
.use { _ =>
IO.unit
}
.guaranteeCase(stop.complete)

r.start.flatMap { fiber =>
IO.sleep(200.millis) >> fiber.cancel >> stop.get
}
}

p.timeout(2.seconds)
.unsafeToFuture
.map(_ shouldBe ExitCase.Canceled)
}
}

object ResourceCompilationSpec {

/** This should compile */
val pure: List[Int] = Stream.range(0, 5).compile.toList
val io: IO[List[Int]] = Stream.range(0, 5).covary[IO].compile.toList
val resource: Resource[IO, List[Int]] = Stream.range(0, 5).covary[IO].compile.resource.toList
}
@@ -7,14 +7,12 @@ import cats.effect.laws.discipline.arbitrary._
import cats.effect.laws.util.TestContext
import cats.effect.laws.util.TestInstances._
import cats.implicits._
import cats.laws.discipline.MonadErrorTests

import org.scalacheck.{Arbitrary, Gen}
import cats.laws.discipline._
import org.scalacheck.{Arbitrary, Cogen, Gen}
import Arbitrary.arbitrary
import org.scalacheck.Arbitrary
import org.scalatest.Inside
import scala.concurrent.duration._

import scala.concurrent.duration._
import TestUtil._

class StreamSpec extends Fs2Spec with Inside {
@@ -572,6 +570,43 @@ class StreamSpec extends Fs2Spec with Inside {
unfoldTree(1).flatten.take(10).toList shouldBe List.tabulate(10)(_ + 1)
}

"rechunkRandomlyWithSeed" - {

"is deterministic" in forAll { (s: PureStream[Int], seed: Long) =>
val x = runLog(s.get.rechunkRandomlyWithSeed[IO](minFactor = 0.1, maxFactor = 2.0)(seed))
val y = runLog(s.get.rechunkRandomlyWithSeed[IO](minFactor = 0.1, maxFactor = 2.0)(seed))
x shouldBe y
}

"does not drop elements" in forAll { (s: PureStream[Int], seed: Long) =>
runLog(s.get.rechunkRandomlyWithSeed[IO](minFactor = 0.1, maxFactor = 2.0)(seed)) shouldBe s.get.toVector
}

"chunk size in interval [inputChunk.size * minFactor, inputChunk.size * maxFactor]" in forAll {
(s: PureStream[Int], seed: Long) =>
val c = s.get.chunks.toVector
if (c.nonEmpty) {
val (min, max) = c.tail.foldLeft(c.head.size -> c.head.size) {
case ((min, max), c) => Math.min(min, c.size) -> Math.max(max, c.size)
}
val (minChunkSize, maxChunkSize) = (min * 0.1, max * 2.0)
// Last element is drop as it may not fulfill size constraint
all(
runLog(
s.get
.rechunkRandomlyWithSeed[IO](minFactor = 0.1, maxFactor = 2.0)(seed)
.chunks
.map(_.size)).dropRight(1)
) should ((be >= minChunkSize.toInt).and(be <= maxChunkSize.toInt))
}
}

}

"rechunkRandomly" in forAll { (s: PureStream[Int]) =>
runLog(s.get.rechunkRandomly[IO]()) shouldBe s.get.toVector
}

{
implicit val ec: TestContext = TestContext()

@@ -581,13 +616,22 @@ class StreamSpec extends Fs2Spec with Inside {
Gen.frequency(8 -> arbitrary[PureStream[O]].map(_.get.take(10).covary[F]),
2 -> arbitrary[F[O]].map(fo => Stream.eval(fo))))

// borrowed from ScalaCheck 1.14
// TODO remove when the project upgrades to ScalaCheck 1.14
implicit def arbPartialFunction[A: Cogen, B: Arbitrary]: Arbitrary[PartialFunction[A, B]] =
Arbitrary(implicitly[Arbitrary[A => Option[B]]].arbitrary.map(Function.unlift))

implicit def eqStream[O: Eq]: Eq[Stream[IO, O]] =
Eq.instance(
(x, y) =>
Eq[IO[Vector[Either[Throwable, O]]]]
.eqv(x.attempt.compile.toVector, y.attempt.compile.toVector))

checkAll("MonadError[Stream[F, ?], Throwable]",
MonadErrorTests[Stream[IO, ?], Throwable].monadError[Int, Int, Int])
checkAll("FunctorFilter[Stream[F, ?]]",
FunctorFilterTests[Stream[IO, ?]].functorFilter[String, Int, Int])
checkAll("MonoidK[Stream[F, ?]]", MonoidKTests[Stream[IO, ?]].monoidK[Int])
}
}
}
Oops, something went wrong.

0 comments on commit 24a5e7e

Please sign in to comment.
You can’t perform that action at this time.