Skip to content

Commit

Permalink
add Par[T]
Browse files Browse the repository at this point in the history
  • Loading branch information
ShigekiKarita committed Sep 29, 2015
1 parent 270622a commit d47d256
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 3 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-u", {
val dir = System.getenv("CI_REPORTS")
if(dir == null) "target/reports"
else dir
})
})
4 changes: 2 additions & 2 deletions src/main/scala/part1/state/State.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ object Machine {
case (Coin, Machine(false, _, _)) => s
case (Turn, Machine(true, _, _)) => s
case (Coin, Machine(true, candy, coin)) =>
Machine(false, candy, coin + 1)
Machine(locked = false, candy, coin + 1)
case (Turn, Machine(false, candy, coin)) =>
Machine(true, candy - 1, coin)
Machine(locked = true, candy - 1, coin)
}

def simulate(inputs: List[Input]): State[Machine, (Int, Int)] = for {
Expand Down
64 changes: 64 additions & 0 deletions src/main/scala/part2/paralellism/Par.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package part2.paralellism

import java.util.concurrent.{Callable, TimeUnit, Future, ExecutorService}

import scala.collection.immutable.Nil


object Par {
type Par[A] = ExecutorService => Future[A]

def run[A](es: ExecutorService)(a: Par[A]): Future[A] = a(es)

def unit[A](a: => A): Par[A] = {
(es: ExecutorService) => UnitFuture(a)
}

private case class UnitFuture[A](get: A) extends Future[A] {
override def isCancelled: Boolean = false
override def get(timeout: Long, unit: TimeUnit): A = get
override def cancel(mayInterruptIfRunning: Boolean): Boolean = false
override def isDone: Boolean = true
}

def map2[L, R, F](l: Par[L], r: Par[R])(f: (L, R) => F): Par[F] =
(es: ExecutorService) => UnitFuture(f(l(es).get, r(es).get))

def fork[A](a: => Par[A]): Par[A] =
(es: ExecutorService) => es.submit(new Callable[A] {
def call = a(es).get
})

def lazyUnit[A](a: => A): Par[A] = fork(unit(a))

def sum(xs: Seq[Int]): Par[Int] = {
if (xs.length <= 1)
unit(xs.headOption.getOrElse(0))
else xs.splitAt(xs.length / 2) match {
case (l, r) => map2(fork(sum(l)), fork(sum(r)))(_ + _)
}
}

def asyncF[A, B](f: A => B): A => Par[B] =
a => lazyUnit(f(a))

def map[A, B](pa: Par[A])(f: A => B): Par[B] =
map2(pa, unit(()))((a, _) => f(a))

def parSort(parList: Par[Seq[Int]]): Par[Seq[Int]] =
map(parList)(_.sorted)

def sequence[A](ps: List[Par[A]]): Par[List[A]] =
ps.foldRight(unit(List.empty[A]))((x, z) => map2(x, z)(_ :: _))

def parMap[A, B](ps: List[A])(f: A => B): Par[List[B]] =
fork(sequence(ps map asyncF(f)))

def parFilter[A](as: List[A])(f: A => Boolean): Par[List[A]] =
map(sequence(
as map asyncF(a => if(f(a)) List(a) else Nil)
))(_.flatten)

def equal[A](es: ExecutorService)(lhs: Par[A], rhs: Par[A]): Boolean =
lhs(es).get == rhs(es).get
}
32 changes: 32 additions & 0 deletions src/test/scala/part2/paralellism/ParTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package part2.paralellism

import java.util.concurrent.Executors

import common.ScalaTestCommon


class ParTest extends ScalaTestCommon {
def go[A](pa: Par.Par[A]): A =
Par.run(Executors.newCachedThreadPool())(pa).get()

"Par.sum" should "be" in forAll {
xs: IndexedSeq[Int] => go(Par.sum(xs)) mustBe xs.sum
}

"Par.perSort" should "be" in forAll {
xs: IndexedSeq[Int] => go(Par.parSort(Par.unit(xs))) mustBe xs.sorted
}

"Par.parMap" should "be" in forAll {
(xs: List[Int], i: Int) => go(Par.parMap(xs)(_ + i)) mustBe xs.map(_ + i)
}

"Par.parFilter" should "be" in forAll {
(xs: List[Int], i: Int) => go(Par.parFilter(xs)(_ < i)) mustBe xs.filter(_ < i)
}

"Par.equal" should "be" in forAll {
x: Int => Par.equal(Executors.newSingleThreadExecutor())(
Par.map(Par.unit(x))(_ + 1), Par.unit(x + 1)) mustBe true
}
}

0 comments on commit d47d256

Please sign in to comment.