Skip to content
Permalink
Browse files

Added Alternative[Chunk] and TraverseFilter[Chunk] instances

  • Loading branch information...
mpilquist committed Jun 18, 2019
1 parent c0a5513 commit 35244dda44fd3238c4493de84af5fb244fec5061
@@ -16,7 +16,7 @@ import java.nio.{
ShortBuffer => JShortBuffer
}

import cats.{Applicative, Eq, Eval, Functor, FunctorFilter, Monad, Traverse}
import cats.{Alternative, Applicative, Eq, Eval, Monad, Traverse, TraverseFilter}
import cats.data.{Chain, NonEmptyList}
import cats.implicits._
import fs2.internal.ArrayBackedSeq
@@ -1525,35 +1525,32 @@ object Chunk {
}

/**
* `Traverse`, `Monad`, and `FunctorFilter` instance for `Chunk`.
* `Traverse`, `Monad`, `Alternative`, and `TraverseFilter` instance for `Chunk`.
*
* @example {{{
* scala> import cats.implicits._, scala.util._
* scala> Chunk("1", "2", "NaN").mapFilter(s => Try(s.toInt).toOption)
* res0: fs2.Chunk[Int] = Chunk(1, 2)
* }}}
*/
implicit val instance: Traverse[Chunk] with Monad[Chunk] with FunctorFilter[Chunk] =
new Traverse[Chunk] with Monad[Chunk] with FunctorFilter[Chunk] {
def foldLeft[A, B](fa: Chunk[A], b: B)(f: (B, A) => B): B = fa.foldLeft(b)(f)
def foldRight[A, B](fa: Chunk[A], b: Eval[B])(f: (A, Eval[B]) => Eval[B]): Eval[B] = {
implicit val instance
: Traverse[Chunk] with Monad[Chunk] with Alternative[Chunk] with TraverseFilter[Chunk] =
new Traverse[Chunk] with Monad[Chunk] with Alternative[Chunk] with TraverseFilter[Chunk] {
override def foldLeft[A, B](fa: Chunk[A], b: B)(f: (B, A) => B): B = fa.foldLeft(b)(f)
override def foldRight[A, B](fa: Chunk[A], b: Eval[B])(
f: (A, Eval[B]) => Eval[B]): Eval[B] = {
def go(i: Int): Eval[B] =
if (i < fa.size) f(fa(i), Eval.defer(go(i + 1)))
else b
go(0)
}
override def toList[A](fa: Chunk[A]): List[A] = fa.toList
override def isEmpty[A](fa: Chunk[A]): Boolean = fa.isEmpty
def traverse[F[_], A, B](fa: Chunk[A])(f: A => F[B])(
implicit F: Applicative[F]): F[Chunk[B]] = {
foldRight[A, F[Vector[B]]](fa, Eval.always(F.pure(Vector.empty))) { (a, efv) =>
F.map2Eval(f(a), efv)(_ +: _)
}.value
}.map(Chunk.vector)
def pure[A](a: A): Chunk[A] = Chunk.singleton(a)
override def empty[A]: Chunk[A] = Chunk.empty
override def pure[A](a: A): Chunk[A] = Chunk.singleton(a)
override def map[A, B](fa: Chunk[A])(f: A => B): Chunk[B] = fa.map(f)
def flatMap[A, B](fa: Chunk[A])(f: A => Chunk[B]): Chunk[B] = fa.flatMap(f)
def tailRecM[A, B](a: A)(f: A => Chunk[Either[A, B]]): Chunk[B] = {
override def flatMap[A, B](fa: Chunk[A])(f: A => Chunk[B]): Chunk[B] = fa.flatMap(f)
override def tailRecM[A, B](a: A)(f: A => Chunk[Either[A, B]]): Chunk[B] = {
// Based on the implementation of tailRecM for Vector from cats, licensed under MIT
val buf = collection.mutable.Buffer.newBuilder[B]
var state = List(f(a).iterator)
@@ -1576,7 +1573,19 @@ object Chunk {
go()
Chunk.buffer(buf.result)
}
override def functor: Functor[Chunk] = this
override def combineK[A](x: Chunk[A], y: Chunk[A]): Chunk[A] =
Chunk.concat(List(x, y))
override def traverse: Traverse[Chunk] = this
override def traverse[F[_], A, B](fa: Chunk[A])(f: A => F[B])(
implicit F: Applicative[F]): F[Chunk[B]] =
foldRight[A, F[Vector[B]]](fa, Eval.always(F.pure(Vector.empty))) { (a, efv) =>
F.map2Eval(f(a), efv)(_ +: _)
}.value.map(Chunk.vector)
override def traverseFilter[F[_], A, B](fa: Chunk[A])(f: A => F[Option[B]])(
implicit F: Applicative[F]): F[Chunk[B]] =
foldRight[A, F[Vector[B]]](fa, Eval.always(F.pure(Vector.empty))) { (a, efv) =>
F.map2Eval(f(a), efv)((oa, efv) => oa.map(_ +: efv).getOrElse(efv))
}.value.map(Chunk.vector)
override def mapFilter[A, B](fa: Chunk[A])(f: A => Option[B]): Chunk[B] = {
val size = fa.size
val b = collection.mutable.Buffer.newBuilder[B]
@@ -1667,5 +1676,4 @@ object Chunk {
def empty[A]: Queue[A] = empty_.asInstanceOf[Queue[A]]
def apply[A](chunks: Chunk[A]*): Queue[A] = chunks.foldLeft(empty[A])(_ :+ _)
}

}
@@ -3,7 +3,7 @@ package fs2
import cats.Eq
import cats.kernel.CommutativeMonoid
import cats.kernel.laws.discipline.EqTests
import cats.laws.discipline.{FunctorFilterTests, MonadTests, TraverseTests}
import cats.laws.discipline.{AlternativeTests, MonadTests, TraverseFilterTests, TraverseTests}
import cats.implicits._
import org.scalacheck.Cogen
import org.scalactic.anyvals._
@@ -104,8 +104,9 @@ class ChunkSpec extends Fs2Spec {
import org.scalacheck.GeneratorCompat._

checkAll(s"Eq[Chunk[$of]]", EqTests[Chunk[A]].eqv)
checkAll(s"Monad[Chunk]", MonadTests[Chunk].monad[A, A, A])
checkAll(s"FunctorFilter[Chunk]", FunctorFilterTests[Chunk].functorFilter[A, A, A])
checkAll("Monad[Chunk]", MonadTests[Chunk].monad[A, A, A])
checkAll("Alternative[Chunk]", AlternativeTests[Chunk].alternative[A, A, A])
checkAll("TraverseFilter[Chunk]", TraverseFilterTests[Chunk].traverseFilter[A, A, A])

if (testTraverse)
checkAll(s"Traverse[Chunk]", TraverseTests[Chunk].traverse[A, A, A, A, Option, Option])
@@ -13,8 +13,7 @@ package object file {
/**
* Reads all data synchronously from the file at the specified `java.nio.file.Path`.
*/
def readAll[F[_]: Sync: ContextShift](path: Path, blocker: Blocker, chunkSize: Int)(
): Stream[F, Byte] =
def readAll[F[_]: Sync: ContextShift](path: Path, blocker: Blocker, chunkSize: Int): Stream[F, Byte] =
pulls
.fromPath(path, blocker, List(StandardOpenOption.READ))
.flatMap(c => pulls.readAllFromFileHandle(chunkSize)(c.resource))
@@ -29,8 +28,7 @@ package object file {
blocker: Blocker,
chunkSize: Int,
start: Long,
end: Long)(
): Stream[F, Byte] =
end: Long): Stream[F, Byte] =
pulls
.fromPath(path, blocker, List(StandardOpenOption.READ))
.flatMap(c => pulls.readRangeFromFileHandle(chunkSize, start, end)(c.resource))
@@ -50,8 +48,7 @@ package object file {
blocker: Blocker,
chunkSize: Int,
offset: Long = 0L,
pollDelay: FiniteDuration = 1.second)(
): Stream[F, Byte] =
pollDelay: FiniteDuration = 1.second): Stream[F, Byte] =
pulls
.fromPath(path, blocker, List(StandardOpenOption.READ))
.flatMap(c => pulls.tailFromFileHandle(chunkSize, offset, pollDelay)(c.resource))

0 comments on commit 35244dd

Please sign in to comment.
You can’t perform that action at this time.