In [1]:
import java.util.concurrent._
import language.implicitConversions
val parEngine = Executors.newFixedThreadPool(2)

scala.language$@411293fe

In [2]:
// id : ex_7.1

class Par[A](ap : () => A) {
    
    val a : () => A = ap
        
    def get : A = a()
    
}

object Par {
    
    def unit[A](a : => A) : Par[A] = new Par(() => a)

    def map2[A,B,C](pa : Par[A], pb: Par[B])(f: (A, B) => C): Par[C] = {
        unit(f(pa.get,pb.get))
    }
    
}


$line19.$read$$iw$$iw$Par$@4c225079

In [3]:
def sum(ints: IndexedSeq[Int]): Par[Int] = 
    if (ints.size <= 1) Par.unit(ints.headOption getOrElse 0) 
    else {
        val (l,r) = ints.splitAt(ints.length/2)
        Par.map2(sum(l), sum(r))(_ + _)
    }

sum: (ints: IndexedSeq[Int])Par[Int]


In [4]:
sum(IndexedSeq(1,2,3,4,5)).get

15

In [5]:
// id : ex_7.2

// Shamelessly (or shamefully) taking the relevant portions from FP in Scala answers
type Par[A] = ExecutorService => Future[A]

object Par {

  def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)

  /*
  `unit` is represented as a function that returns a `UnitFuture`, which is a 
  simple implementation of `Future` that just wraps a constant value. It doesn't 
  use the `ExecutorService` at all. It's always done and can't be cancelled. Its 
  `get` method simply returns the value that we gave it.
  */
  def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a) 

  private case class UnitFuture[A](get: A) extends Future[A] {
    def isDone = true
    def get(timeout: Long, units: TimeUnit) = get
    def isCancelled = false
    def cancel(evenIfRunning: Boolean): Boolean = false
  }

  /*
  `map2` doesn't evaluate the call to `f` in a separate logical thread, in accord 
  with our design choice of having `fork` be the sole function in the API for 
  controlling parallelism. We can always do `fork(map2(a,b)(f))` if we want the 
  evaluation of `f` to occur in a separate thread.
  */
  def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] = 
    (es: ExecutorService) => {
      val af = a(es)
      val bf = b(es)
      /* 
      This implementation of `map2` does _not_ respect timeouts. It simply passes 
      the `ExecutorService` on to both `Par` values, waits for the results of the 
      Futures `af` and `bf`, applies `f` to them, and wraps them in a `UnitFuture`. 
      In order to respect timeouts, we'd need a new `Future` implementation that 
      records the amount of time spent evaluating `af`, then subtracts that time 
      from the available time allocated for evaluating `bf`.
      */
      UnitFuture(f(af.get, bf.get)) 
    }

  /*
  This is the simplest and most natural implementation of `fork`, but there are 
  some problems with it--for one, the outer `Callable` will block waiting for the 
  "inner" task to complete. Since this blocking occupies a thread in our thread pool, 
  or whatever resource backs the `ExecutorService`, this implies that we're losing 
  out on some potential parallelism. Essentially, we're using two threads when one 
  should suffice. This is a symptom of a more serious problem with the implementation, 
  and we will discuss this later in the chapter.
  */
  def fork[A](a: => Par[A]): Par[A] = 
    es => es.submit(new Callable[A] {
      def call = a(es).get
    })

  def lazyUnit[A](a: => A): Par[A] = fork(unit(a))

}


$line22.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Par$@34811034

In [6]:
// id : try ex_7.2

def sum(ints: IndexedSeq[Int]): Par[Int] = 
    if (ints.size <= 1) Par.unit(ints.headOption getOrElse 0) 
    else {
        val (l,r) = ints.splitAt(ints.length/2)
        Par.map2(sum(l), sum(r))(_ + _)
    }

val future = Par.run(Executors.newFixedThreadPool(1))(sum(IndexedSeq(1,2,3,4,5)))

println("future", future)
println("get", future.get)

(future,UnitFuture(15))
(get,15)


null

In [7]:
// id : ex_7.3
import java.util.concurrent.Future

type Par[A] = ExecutorService => Future[A]

object Par {

    def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)

    def unit[A](a: () => A): Par[A] = (es: ExecutorService) => es.submit(new Callable[A] {
          def call = a()
    })

    def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] = 
        (es: ExecutorService) => {
        val af = a(es)
        val bf = b(es)
        Map2UnitFuture(af, bf, f) 
    }

    // https://github.com/fpinscala/fpinscala/blob/master/answerkey/parallelism/03.answer.scala
    
    case class Map2UnitFuture[A, B, C](
        a: Future[A], 
        b: Future[B],
        f : (A, B) => C) extends Future[C] {
        
        @volatile var cache: Option[C] = None
        def isDone = cache.isDefined
        def isCancelled = a.isCancelled || b.isCancelled
        def cancel(evenIfRunning: Boolean) =
            a.cancel(evenIfRunning) || b.cancel(evenIfRunning)
        def get = compute(Long.MaxValue)
        def get(timeout: Long, units: TimeUnit): C =
            compute(TimeUnit.NANOSECONDS.convert(timeout, units))

        private def compute(timeoutInNanos: Long): C = cache match {
            case Some(c) => c
            case None =>
              println("Timeout " + TimeUnit.SECONDS.convert(timeoutInNanos, TimeUnit.NANOSECONDS))
              val start = System.nanoTime
              val ar = a.get(timeoutInNanos, TimeUnit.NANOSECONDS)
              val stop = System.nanoTime;val aTime = stop-start
              println("Timeout " + TimeUnit.SECONDS.convert(timeoutInNanos - aTime, TimeUnit.NANOSECONDS))
              val br = b.get(timeoutInNanos - aTime, TimeUnit.NANOSECONDS)
              val ret = f(ar, br)
              cache = Some(ret)
              ret
        }
    }

}

$line24.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Par$@75924cb5

In [8]:
def sum(ints: IndexedSeq[Int]): Par[Int] = 
    if (ints.size <= 1) {
        ints match {
            case IndexedSeq(x) => Par.unit(() => x)
            case IndexedSeq() => Par.unit(() => 0)
        }
    } else {
        val (l,r) = ints.splitAt(ints.length/2)
        Par.map2(sum(l), sum(r))({
            Thread.sleep(1000)
            _ + _
        })
    }


sum: (ints: IndexedSeq[Int])Par[Int]


In [9]:
val future = Par.run(Executors.newFixedThreadPool(4))(sum(IndexedSeq(1,2,3,4,5,6,7,8,9,10)))

Map2UnitFuture(Map2UnitFuture(Map2UnitFuture(java.util.concurrent.FutureTask@606a521b,java.util.concurrent.FutureTask@c30b6dd,<function2>),Map2UnitFuture(java.util.concurrent.FutureTask@1631a346,Map2UnitFuture(java.util.concurrent.FutureTask@7d8a3871,java.util.concurrent.FutureTask@4f425751,<function2>),<function2>),<function2>),Map2UnitFuture(Map2UnitFuture(java.util.concurrent.FutureTask@46cd963a,java.util.concurrent.FutureTask@5d7d3636,<function2>),Map2UnitFuture(java.util.concurrent.FutureTask@7198c458,Map2UnitFuture(java.util.concurrent.FutureTask@2cbdfde5,java.util.concurrent.FutureTask@66bc20e,<function2>),<function2>),<function2>),<function2>)

In [10]:
future.get(1, TimeUnit.NANOSECONDS)

Timeout 0
Timeout 0
Timeout 0
Timeout 0
Timeout 0
Timeout 0
Timeout 0
Timeout 0
Timeout 0
Timeout 0
Timeout 0
Timeout 0
Timeout 0
Timeout 0
Timeout 0
Timeout 0
Timeout 0
Timeout 0


55

In [11]:
val sum2 = Par.map2(Par.unit(() => { Thread.sleep(10000); 1}), 
                    Par.unit(() => { Thread.sleep(10000); 2}))((a: Int, b: Int) => a + b)
sum2(Executors.newFixedThreadPool(4)).get(0, TimeUnit.NANOSECONDS)

Timeout 0


java.util.concurrent.TimeoutException: java.util.concurrent.TimeoutException

In [12]:
val sum2 = Par.map2(Par.unit(() => { Thread.sleep(10000); 1}), 
                    Par.unit(() => { Thread.sleep(30000); 2}))((a: Int, b: Int) => a + b)
sum2(Executors.newFixedThreadPool(4)).get(11, TimeUnit.SECONDS)

Timeout 11
Timeout 1


java.util.concurrent.TimeoutException: java.util.concurrent.TimeoutException

In [13]:
val sum2 = Par.map2(Par.unit(() => { Thread.sleep(2500); 1}), 
                    Par.unit(() => { Thread.sleep(2500); 2}))((a: Int, b: Int) => a + b)
Par.run(Executors.newFixedThreadPool(4))(sum2).get(3, TimeUnit.SECONDS)

Timeout 3
Timeout 0


3

In [14]:
// id : ex_7.4

type Par[A] = ExecutorService => Future[A]

object Par {

  def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)

  def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a) 

  private case class UnitFuture[A](get: A) extends Future[A] {
    def isDone = true
    def get(timeout: Long, units: TimeUnit) = get
    def isCancelled = false
    def cancel(evenIfRunning: Boolean): Boolean = false
  }

  def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] = 
    (es: ExecutorService) => {
      val af = a(es)
      val bf = b(es)
      UnitFuture(f(af.get, bf.get)) 
    }

  def fork[A](a: => Par[A]): Par[A] = 
    es => es.submit(new Callable[A] {
      def call = a(es).get
    })

  def lazyUnit[A](a: => A): Par[A] = fork(unit(a))

  def asyncF[A,B](f: A => B): A => Par[B] = a => lazyUnit(f(a))
}


$line35.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Par$@3f7486ee

In [15]:
// id : try ex_7.4

val func = Par.asyncF((a : Int) => a + 1)
val future = Par.run(Executors.newSingleThreadExecutor)(func(5))
future.get

6

In [16]:
def sortPar(parList: Par[List[Int]]): Par[List[Int]] = Par.map2(parList, Par.unit(()))((a, _) => a.sorted)
def map[A,B](pa: Par[A])(f: A => B): Par[B] = Par.map2(pa, Par.unit(()))((a,_) => f(a))

sortPar: (parList: Par[List[Int]])Par[List[Int]]
map: [A, B](pa: Par[A])(f: A => B)Par[B]


In [17]:
def sortPar(parList: Par[List[Int]]) = map(parList)(_.sorted)

sortPar: (parList: Par[List[Int]])Par[List[Int]]


In [18]:
// id : ex_7.5

/* Note: Show sequence for the pattern
def sequence[A](fs: List[Rand[A]]) : Rand[List[A]] = {
    rng => {
        fs match {
            case Nil => (List().asInstanceOf[List[A]], rng)
            case r2s :: rest => {
                val (r2sa, r2sr) = r2s(rng)
                val rem = sequence(rest)(r2sr)
                (r2sa :: rem._1, rem._2)
            }
        }
    }
}
*/

// def map2[A,B,C](a : Par[A], b : Par[B])(f: (A, B) => C): Par[C] = 
//     map2       (a : Par[A], b : Par[A])(f: (A, A) => List[A]) : Par[List[A]]
def sequence[A](ps: List[Par[A]]): Par[List[A]] = ps match {
    case Nil => Par.unit(List().asInstanceOf[List[A]])
    case p :: pr   => Par.map2(p, sequence(pr))((a1: A, a2 : List[A]) => List(a1) ++ a2)
}

sequence: [A](ps: List[Par[A]])Par[List[A]]


In [19]:
// id : try ex_7.5

val listOfPars = sequence(List(Par.unit(1), Par.unit(2), Par.unit(3), Par.unit(4)))
Par.run(parEngine)(listOfPars).get

[[1, 2, 3, 4]]

In [20]:
def parMap[A,B](ps: List[A])(f: A => B): Par[List[B]] = Par.fork { 
    val fbs: List[Par[B]] = ps.map(Par.asyncF(f))
    sequence(fbs)
}

parMap: [A, B](ps: List[A])(f: A => B)Par[List[B]]


In [21]:
// id : ex_7.6
def parFilter[A](as: List[A])(f: A => Boolean): Par[List[A]] = 
    map(parMap(as)(x => if (f(x)) Some(x) else None))(xs => xs.flatMap(x => x))

parFilter: [A](as: List[A])(f: A => Boolean)Par[List[A]]


In [22]:
// id : try ex_7.6
val xpool7 = Executors.newFixedThreadPool(7)
Par.run(xpool7)(parFilter(List(1,2,3,4,5,6))(x => {
    println("Sleeping 1 second for", x)
    Thread.sleep(1000)
    x > 4
})).get

(Sleeping 1 second for,1)
(Sleeping 1 second for,2)
(Sleeping 1 second for,4)
(Sleeping 1 second for,6)
(Sleeping 1 second for,3)
(Sleeping 1 second for,5)


[[5, 6]]

In [23]:
// id : book answer ex_7.6
def parFilter[A](l: List[A])(f: A => Boolean): Par[List[A]] = {
  val pars: List[Par[List[A]]] = 
    l map (Par.asyncF((a: A) => if (f(a)) List(a) else List())) 
  map(sequence(pars))(_.flatten) // convenience method on `List` for concatenating a list of lists
}

parFilter: [A](l: List[A])(f: A => Boolean)Par[List[A]]


In [24]:
// id : try book answer ex_7.6
Par.run(xpool7)(parFilter(List(1,2,3,4,5,6))(x => {
    println("Sleeping 1 second for", x)
    Thread.sleep(1000)
    x > 4
})).get

(Sleeping 1 second for,3)
(Sleeping 1 second for,6)
(Sleeping 1 second for,4)
(Sleeping 1 second for,5)
(Sleeping 1 second for,2)
(Sleeping 1 second for,1)


[[5, 6]]