Skip to content
Browse files

using trampolining to get rid of SOE's. Still work in progress

  • Loading branch information...
1 parent 79520da commit 9a1242f418473319df6c8aa4dbf3ad8be8ee1396 @arjanblokzijl committed Apr 26, 2012
View
2 benchmark/src/main/scala/conduits/benchmark/CLBenchmark.scala
@@ -24,5 +24,5 @@ class CLBenchmark extends CBenchmark with BenchmarkData {
}
def timeFoldStreamDirect(reps:Int) = run(reps)(foldStreamDirect(intStream))
-// def timeConduitFoldStream(reps:Int) = run(reps)(conduitFoldList(intStream)) //gives SOE, so work to be done...
+ def timeConduitFoldStream(reps:Int) = run(reps)(conduitFoldList(intStream))
}
View
6 conduits/src/main/scala/conduits/CL.scala
@@ -106,12 +106,6 @@ object CL {
}
def sourceList[F[_], A](l: => Stream[A])(implicit M: Monad[F]): Source[F, A] = yieldMany(l)
-// def go(l1: => Stream[A]): F[SourceStateResult[Stream[A], A]] = l1 match {
-// case Stream.Empty => M.point(StateClosed.apply)
-// case x #:: xs => M.point(StateOpen(xs, x))
-// }
-// sourceState[Stream[A], F, A](l, (s: Stream[A]) => go(s))
-// }
/**
* Apply a transformation to all values in a stream.
View
77 conduits/src/main/scala/conduits/Pipe.scala
@@ -4,6 +4,8 @@ import empty.Void
import pipes._
import Pipe._
import scalaz.{Forall, Monoid, MonadTrans, Monad}
+import scalaz.std.function._
+import scalaz.Free.{Trampoline, return_, suspend}
/**
* The underlying datatype for all the types in this package. In has four
@@ -57,12 +59,12 @@ sealed trait Pipe[A, B, F[_], R] {
def map[S](f: (R) => S)(implicit F: Monad[F]): Pipe[A, B, F, S] = flatMap(a => Done(None, f(a)))
def flatMap[S](f: (R) => Pipe[A, B, F, S])(implicit F: Monad[F]): Pipe[A, B, F, S] = {
- def through(p: Pipe[A, B, F, R]): Pipe[A, B, F, S] = p.fold(
+ def through(p: => Pipe[A, B, F, R]): Pipe[A, B, F, S] = p.fold(
haveOutput = (p, c, o) => HaveOutput(through(p), F.bind(c)(r => f(r) pipeClose), o)
, needInput = (p, c) => NeedInput(i => through(p.apply(i)), through(c))
, done = (oi, x) => oi match {
- case Some(i) => f(x).pipePush(i)
- case None => f(x)
+ case Some(i) => f(x).pipePush(i)
+ case None => f(x)
}
, pipeM = (mp, c) => PipeM(F.map(mp)(p1 => through(p1)), F.bind(c)(r => f(r) pipeClose))
)
@@ -218,38 +220,43 @@ trait PipeFunctions {
* Composition is biased towards checking the right Pipe first to avoid pulling
* data that is not needed. Doing so could cause data loss.
*/
- def pipeResume[A, B, C, F[_], R](p1: => Pipe[A, B, F, Unit], p2: => Pipe[B, C, F, R])(implicit F: Monad[F]): Pipe[A, C, F, (Pipe[A, B, F, Unit], R)] = (p1, p2) match {
- //both pipes finished, return leftover of left pipe and leftovers of right pipe in the result.
- case (Done(leftoverl, ()), Done(leftoverr, r)) => leftoverr match {
- case None => Done(leftoverl, (pipes.pipeMonad[A, B, F].point(()), r))
- case Some(i) => Done(leftoverl, (HaveOutput(Done(None, ()), F.point(()), i), r))
- }
- //right pipe is done, terminate and return leftovers.
- case (left, Done(leftoverr, r)) => leftoverr match {
- case None => Done(None, (left, r))
- case Some(i) => Done(None, (HaveOutput(left, left.pipeClose, i), r))
+ def pipeResume[A, B, C, F[_], R](p1: => Pipe[A, B, F, Unit], p2: => Pipe[B, C, F, R])(implicit F: Monad[F]): Pipe[A, C, F, (Pipe[A, B, F, Unit], R)] = {
+ //wip on getting this to run on large pipestreams without getting a SOE. This implementation is
+ //likely not very efficient, pipeResume is called recursively in a number of places, which runs the Trampoline each time.
+ def go(left: => Pipe[A, B, F, Unit], right: => Pipe[B, C, F, R]): Trampoline[Pipe[A, C, F, (Pipe[A, B, F, Unit], R)]] = {
+ (left, right) match {
+ //both pipes finished, return leftover of left pipe and leftovers of right pipe in the result.
+ case (Done(leftoverl, ()), Done(leftoverr, r)) => leftoverr match {
+ case None => return_(Done(leftoverl, (pipes.pipeMonad[A, B, F].point(()), r)))
+ case Some(i) => return_(Done(leftoverl, (HaveOutput(Done(None, ()), F.point(()), i), r)))
+ }
+ //right pipe is done, terminate and return leftovers.
+ case (left, Done(leftoverr, r)) => leftoverr match {
+ case None => return_(Done(None, (left, r)))
+ case Some(i) => return_(Done(None, (HaveOutput(left, left.pipeClose, i), r)))
+ }
+ //left pipe needs input, ask for it
+ case (NeedInput(p, c), right) => return_(NeedInput(a => pipeResume(p(a), right)
+ , pipeResume(c, right).flatMap(pr => {
+ pipeMonadTrans.liftM(pr._1.pipeClose)
+ pipes.pipeMonad[A, C, F].point((pipes.pipeMonad[A, B, F].point(()), pr._2))
+ })))
+ //left pipe has output, right pipe wants it
+ case (HaveOutput(lp, _, a), NeedInput(rp, _)) => suspend(go(lp, rp(a)))
+ //right pipe needs to run a monadic action
+ case (left, PipeM(mp, c)) => return_(PipeM(F.map(mp)(p => pipeResume(left, p)), F.map(c)(r => (left, r))))
+ //right Pipe has some output, provide it downstream and continue.
+ case (left, HaveOutput(p, c, o)) => return_(HaveOutput(pipeResume(left, p), F.map(c)(r => (left, r)), o))
+ //left pipe is Done, right pipe needs input. Tell the right pipe there is no more input
+ //eventually replace its leftovers with the left pipe leftover
+ case (Done(l, ()), NeedInput(_, c)) => return_(replaceLeftOver(l, c).map(r => (pipes.pipeMonad[A, B, F].point(()), r)))
+ //left pipe needs to run a monadic action
+ case (PipeM(mp, c), right) => return_(PipeM(F.map(mp)(p => pipeResume(p, right))
+ , F.bind(c)(_ => F.map(right.pipeClose)(r => (pipes.pipeMonad[A, B, F].point(()), r)))
+ ))
+ }
}
- //left pipe needs input, ask for it
- case (NeedInput(p, c), right) => NeedInput(a => pipeResume(p(a), right)
- , pipeResume(c, right).flatMap(pr => {
- pipeMonadTrans.liftM(pr._1.pipeClose)
- pipes.pipeMonad[A, C, F].point((pipes.pipeMonad[A, B, F].point(()), pr._2))
- }))
- //left pipe has output, right pipe wants it
- case (HaveOutput(lp, _, a), NeedInput(rp, _)) => pipeResume(lp, rp(a))
- //right pipe needs to run a monadic action
- case (left, PipeM(mp, c)) => PipeM(F.map(mp)(p => pipeResume(left, p)), F.map(c)(r => (left, r)))
- //right Pipe has some output, provide it downstream and continue.
- case (left, HaveOutput(p, c, o)) => HaveOutput(pipeResume(left, p), F.map(c)(r => (left, r)), o)
- //left pipe is Done, right pipe needs input. Tell the right pipe there is no more input
- //eventually replace its leftovers with the left pipe leftover
- case (Done(l, ()), NeedInput(_, c)) => replaceLeftOver(l, c).map(r => (pipes.pipeMonad[A, B, F].point(()), r))
- //left pipe needs to run a monadic action
- case (PipeM(mp, c), right) => PipeM(F.map(mp)(p => pipeResume(p, right))
- , F.bind(c)(_ => F.map(right.pipeClose)(r => (pipes.pipeMonad[A, B, F].point(()), r)))
- )
-
- case _ => sys.error("TODO")
+ go(p1, p2).run
}
private def replaceLeftOver[A, B, C, F[_], R](l: => Option[A], p1: => Pipe[C, B, F, R])(implicit F: Monad[F]): Pipe[A, B, F, R] = p1.fold(
@@ -307,7 +314,7 @@ trait PipeFunctions {
NeedInput(i => Done(Some(i), true), Done(None, false))
/**
- * The [[conduits.pipes.Zero]] type parameter for Sink in the output, makes
+ * The [[conduits.empty.Void]] type parameter for Sink in the output, makes
* it difficult to compose it with Sources and Conduits. This function replaces
* that parameter with a free variable. The function is essentially `id`: it
* only modifies the types, not the actions performed.
View
14 examples/src/main/scala/conduits/examples/RunConduits.scala
@@ -24,7 +24,7 @@ object RunConduits extends App {
val sourceId = CL.sourceList[Id, Int]((1 to 15).toStream)
val sg = CL.sourceList[Id, Int](List(1, 1, 2, 2, 2, 2, 3, 3, 4).toStream)
val sourceLarge = CL.sourceList[IO, Int](Stream.from(1).take(100000))
- val sourceLargeId = CL.sourceList[Id, Int](Stream.from(1).take(10000))
+ val sourceLargeId = CL.sourceList[Id, Int](Stream.from(1).take(100000))
val sourceT = CL.sourceList[RTIO, Int]((1 to 10).toStream)
val mapSource = sourceId %= CL.map[Id, Int, Int](i => i + 1)// =% sinkTake
@@ -34,9 +34,11 @@ object RunConduits extends App {
// println("groupBy " + ((groupBy[Id, Int]((a, b) => a == b) %= sg) %%== consume).take(15).force)
//
// println("result take " + (sourceId %%== sinkTakeId).take(15).force)
- println("result sum " + (sourceLargeId %%== CL.sum))
-// println("result consume " + (sourceStream %%== sinkConsume).flatten.take(15).force)
-// println("result large map io " + (mapSourceLarge %%== sinkTake).unsafePerformIO.take(15).force)
-// println("result large map id " + (mapSourceLargeId %%== sinkTakeId).take(15).force)
-// println("result resourceT " + runResourceT(sourceT %%== sinkT).unsafePerformIO)
+ val takeL = CL.take[Id, Int](Int.MaxValue)
+ println("result take large " + (sourceLargeId %%== CL.take(1000000)))//TODO evaluating the resulting stream results in SOE
+ println("result sum " + (sourceLargeId %%== CL.sum))
+ println("result consume " + (sourceStream %%== sinkConsume).flatten.take(15).force)
+ println("result large map io " + (mapSourceLarge %%== sinkTake).unsafePerformIO.take(15).force)
+ println("result large map id " + (mapSourceLargeId %%== sinkTakeId).take(15).force)
+ println("result resourceT " + runResourceT(sourceT %%== sinkT).unsafePerformIO)
}

0 comments on commit 9a1242f

Please sign in to comment.
Something went wrong with that request. Please try again.