Permalink
Browse files

Updated migration guide for chunk/segment changes

  • Loading branch information...
mpilquist committed Dec 4, 2017
1 parent 169e0f3 commit e5d2575f5ca9d8bba6f0a90e09957d401bd4f839
@@ -11,6 +11,10 @@ import java.nio.ByteBuffer
* defines a subtype of `Chunk` for each primitive type, using an unboxed primitive array.
* To work with unboxed arrays, use methods like `toBytes` to convert a `Chunk[Byte]` to a `Chunk.Bytes`
* and then access the array directly.
*
* The operations on `Chunk` are all defined strictly. For example, `c.map(f).map(g).map(h)` results in
* intermediate chunks being created (1 per call to `map`). In contrast, a chunk can be lifted to a segment
* (via `toSegment`) to get arbitrary operator fusion.
*/
abstract class Chunk[+O] {
@@ -8,13 +8,17 @@ import Segment._
/**
* Potentially infinite, pure sequence of values of type `O` and a result of type `R`.
*
* All methods which return a `Segment` support fusion with other arbitrary methods that
* All methods on `Segment` support fusion with other arbitrary methods that
* return `Segment`s. This is similar to the staging approach described in
* [[https://arxiv.org/pdf/1612.06668v1.pdf Stream Fusion, to Completeness]], but without
* code generation in staging.
*
* Stack safety is ensured by tracking a fusion depth. If the depth reaches the
* limit, the computation is trampolined using `cats.Eval`.
* To force evaluation of one or more values of a segment, call `.force` followed by one
* of the operations on the returned `Segment.Force` type. For example, to convert a
* segment to a vector, call `s.force.toVector`.
*
* Stack safety of fused operations is ensured by tracking a fusion depth. If the depth
* reaches the limit, the computation is trampolined using `cats.Eval`.
*
* Implementation notes:
* - Some operators ask for a segment remainder from within a callback (e.g., `emits`). As such,
View
@@ -98,7 +98,7 @@ There are a number of ways of interpreting the stream. In this case, we call `ru
```scala
scala> val task: IO[Unit] = written.run
task: cats.effect.IO[Unit] = IO$2027047179
task: cats.effect.IO[Unit] = IO$952127193
```
We still haven't *done* anything yet. Effects only occur when we run the resulting task. We can run a `IO` by calling `unsafeRunSync()` -- the name is telling us that calling it performs effects and hence, it is not referentially transparent.
View
@@ -137,13 +137,13 @@ val eff = Stream.eval(IO { println("TASK BEING RUN!!"); 1 + 1 })
// eff: fs2.Stream[cats.effect.IO,Int] = Stream(..)
val ra = eff.runLog // gather all output into a Vector
// ra: cats.effect.IO[Vector[Int]] = IO$979725695
// ra: cats.effect.IO[Vector[Int]] = IO$1881849274
val rb = eff.run // purely for effects
// rb: cats.effect.IO[Unit] = IO$1344379335
// rb: cats.effect.IO[Unit] = IO$1701750867
val rc = eff.runFold(0)(_ + _) // run and accumulate some result
// rc: cats.effect.IO[Int] = IO$2007225798
// rc: cats.effect.IO[Int] = IO$1051647402
```
Notice these all return a `IO` of some sort, but this process of compilation doesn't actually _perform_ any of the effects (nothing gets printed).
@@ -175,7 +175,7 @@ _Note:_ The various `run*` functions aren't specialized to `IO` and work for any
FS2 streams are segmented internally for performance. You can construct an individual stream segment using `Stream.segment`, which accepts an `fs2.Segment` and lots of functions in the library are segment-aware and/or try to preserve segments when possible.
Segments are potentially infinite and support lazy, fused operations. A `Chunk` is a specialized segment that's finite and supports efficient indexed based lookup of elements.
Segments are potentially infinite and support lazy, fused operations. A `Chunk` is a strict, finite sequence of values that supports efficient indexed based lookup of elements. A chunk can be lifted to a segment via `Segment.chunk(c)` or `c.toSegment`.
```scala
scala> import fs2.Chunk
@@ -187,7 +187,7 @@ s1c: fs2.Stream[fs2.Pure,Double] = Stream(..)
scala> s1c.mapChunks { ds =>
| val doubles = ds.toDoubles
| /* do things unboxed using doubles.{values,size} */
| doubles
| doubles.toSegment
| }
res16: fs2.Stream[fs2.Pure,Double] = Stream(..)
```
@@ -274,10 +274,10 @@ scala> val count = new java.util.concurrent.atomic.AtomicLong(0)
count: java.util.concurrent.atomic.AtomicLong = 0
scala> val acquire = IO { println("incremented: " + count.incrementAndGet); () }
acquire: cats.effect.IO[Unit] = IO$810167805
acquire: cats.effect.IO[Unit] = IO$1461250053
scala> val release = IO { println("decremented: " + count.decrementAndGet); () }
release: cats.effect.IO[Unit] = IO$1604274982
release: cats.effect.IO[Unit] = IO$747598215
```
```scala
@@ -554,7 +554,7 @@ import cats.effect.Sync
// import cats.effect.Sync
val T = Sync[IO]
// T: cats.effect.Sync[cats.effect.IO] = cats.effect.IOInstances$$anon$1@34a1c57e
// T: cats.effect.Sync[cats.effect.IO] = cats.effect.IOInstances$$anon$1@12224dc5
val s = Stream.eval_(T.delay { destroyUniverse() }) ++ Stream("...moving on")
// s: fs2.Stream[cats.effect.IO,String] = Stream(..)
@@ -611,12 +611,12 @@ val c = new Connection {
// Effect extends both Sync and Async
val T = cats.effect.Effect[IO]
// T: cats.effect.Effect[cats.effect.IO] = cats.effect.IOInstances$$anon$1@34a1c57e
// T: cats.effect.Effect[cats.effect.IO] = cats.effect.IOInstances$$anon$1@12224dc5
val bytes = T.async[Array[Byte]] { (cb: Either[Throwable,Array[Byte]] => Unit) =>
c.readBytesE(cb)
}
// bytes: cats.effect.IO[Array[Byte]] = IO$1508547038
// bytes: cats.effect.IO[Array[Byte]] = IO$1101984917
Stream.eval(bytes).map(_.toList).runLog.unsafeRunSync()
// res42: Vector[List[Byte]] = Vector(List(0, 1, 2))
@@ -50,7 +50,7 @@ As a result `fs2.Task` has been removed. Porting from `Task` to `IO` is relative
Performance is significantly better thanks to the introduction of `fs2.Segment`. A `Segment` is a potentially infinite, lazy, pure data structure which supports a variety of fused operations. This is coincidentally similar to the approach taken in [Stream Fusion, to Completeness](https://arxiv.org/pdf/1612.06668v1.pdf), though using a novel approach that does not require code generation.
Instead of a `Stream` being made up of `Chunk`s like in 0.9, it is now made up of `Segment`s. `Chunk[O]` is a subtype of `Segment[O,Unit]`. Many of the operations which operated in terms of chunks now operate in terms of segments. Occassionally, there are operations that are specialized for chunks -- typically when index based access to the underlying elements is more performant than the benefits of operator fusion.
Instead of a `Stream` being made up of `Chunk`s like in 0.9, it is now made up of `Segment`s. A `Segment[O,Unit]` can wrap a `Chunk[O]` (via `Segment.chunk(c)`) but can also be formed in a lot of other ways. Many of the operations which operated in terms of chunks now operate in terms of segments. Occasionally, there are operations that are specialized for chunks -- typically when index based access to the underlying elements is more performant than the benefits of operator fusion.
### API Simplification
View
@@ -126,7 +126,7 @@ _Note:_ The various `run*` functions aren't specialized to `IO` and work for any
FS2 streams are segmented internally for performance. You can construct an individual stream segment using `Stream.segment`, which accepts an `fs2.Segment` and lots of functions in the library are segment-aware and/or try to preserve segments when possible.
Segments are potentially infinite and support lazy, fused operations. A `Chunk` is a specialized segment that's finite and supports efficient indexed based lookup of elements.
Segments are potentially infinite and support lazy, fused operations. A `Chunk` is a strict, finite sequence of values that supports efficient indexed based lookup of elements. A chunk can be lifted to a segment via `Segment.chunk(c)` or `c.toSegment`.
```tut
import fs2.Chunk
@@ -136,7 +136,7 @@ val s1c = Stream.chunk(Chunk.doubles(Array(1.0, 2.0, 3.0)))
s1c.mapChunks { ds =>
val doubles = ds.toDoubles
/* do things unboxed using doubles.{values,size} */
doubles
doubles.toSegment
}
```
@@ -171,7 +171,7 @@ private[io] object JavaInputOutputStream {
val cloned = Chunk.Bytes(bytes.toArray)
if (bytes.size <= len) Ready(None) -> Some(cloned)
else {
val (out,rem) = cloned.strict.splitAt(len)
val (out,rem) = cloned.splitAt(len)
Ready(Some(rem.toBytes)) -> Some(out.toBytes)
}
}
@@ -203,7 +203,7 @@ private[io] object JavaInputOutputStream {
val (copy,maybeKeep) =
if (bytes.size <= len) bytes -> None
else {
val (out,rem) = bytes.strict.splitAt(len)
val (out,rem) = bytes.splitAt(len)
out.toBytes -> Some(rem.toBytes)
}
F.flatMap(F.delay {
@@ -75,7 +75,7 @@ package object file {
if (written >= buf.size)
Pull.pure(())
else
_writeAll1(buf.strict.drop(written), out, offset + written)
_writeAll1(buf.drop(written), out, offset + written)
}
/**
@@ -43,7 +43,7 @@ object pulls {
if (written >= buf.size)
Pull.pure(())
else
_writeAllToFileHandle2(buf.strict.drop(written), out, offset + written)
_writeAllToFileHandle2(buf.drop(written), out, offset + written)
}
/**

0 comments on commit e5d2575

Please sign in to comment.