In [None]:
trait Sink[To] { sink =>
  def apply(t: To): Unit
  final def stage[E]: StagedSink[E, E, To] = StagedSink(StagedSink.stagedIdentity[E], this)
}

final case class StagedSink[First,Current,Final](staged: First => Traversable[Current], sink: Sink[Final]) {
  def map[B, To](f: Current => B)(implicit isDone: CanSink[First, B, Final, To]): To = isDone.result(this, f)
  def flatMap[B, To](f: Current => TraversableOnce[B])(implicit isDone: CanSink[First, B, Final, To]): To = isDone.result2(this, f)
  def withFilter(f: Current => Boolean): StagedSink[First, Current,Final] =
    StagedSink(staged andThen { xs => 
       for(x <- xs; if f(x)) yield x
    }, sink)
}

object StagedSink {
  def stagedIdentity[E]: E => Traversable[E] = (e: E) => List(e)
}

trait CanSink[First, Now, Final, To] {
  def result[E](in: StagedSink[First, E, Final], f: E => Now): To
  def result2[E](in: StagedSink[First, E, Final], f: E => TraversableOnce[Now]): To
}

trait LowPrioritySinkImplicits {
  implicit def sinkChain[First, Now, Final]: CanSink[First, Now, Final, StagedSink[First, Now, Final]] =
    new CanSink[First,Now, Final, StagedSink[First, Now, Final]] {
      def result[E](in: StagedSink[First, E, Final], f: E => Now): StagedSink[First, Now, Final] =
        StagedSink(in.staged andThen (x => x map f), in.sink)
      def result2[E](in: StagedSink[First, E, Final], f: E => TraversableOnce[Now]): StagedSink[First, Now, Final] =
        StagedSink(in.staged andThen (x => x flatMap f), in.sink)
    }
}

object CanSink extends LowPrioritySinkImplicits {
  implicit def finalSink[First, E]: CanSink[First, E,E, Sink[First]] =
    new CanSink[First, E,E, Sink[First]] {
      def result[A](ss: StagedSink[First, A, E], f: A => E): Sink[First] =
        new Sink[First] {
          def apply(in: First): Unit = {
            val staged = ss.staged.andThen { xs => xs map f }
            for { x <- staged(in) } ss.sink(x)
          }
        }
      def result2[A](ss: StagedSink[First, A, E], f: A => TraversableOnce[E]): Sink[First] =
        new Sink[First] {
          def apply(in: First): Unit = {
            val staged = ss.staged.andThen { xs => xs flatMap f }
            for { x <- staged(in) } ss.sink(x)
          }
        }
    }

}



In [None]:
  case class User(name: String, city: String) {
     def livesIn(in: String): Boolean = city == in
  }

  object stdout extends  Sink[String] {
     override def apply(in: String): Unit = System.out.println(in);
  }

  def userSink: Sink[User] =  
    for {
      user <- stdout.stage[User]
      if user livesIn "pittsburgh"
    } yield user.name


  for {
     user <- List(User("josh", "pittsburgh"), User("dick","morgan hill"))
  }  userSink(user)
