<br><br><br>
# <center>Functional Programming #17</center>




### <center>2018. 9. 7.</center>
<br><br><br>

In [None]:
import $ivy.`org.typelevel::cats-core:1.1.0`

import cats.Monoid
import cats.instances.int._
import cats.instances.string._
import cats.syntax.semigroup._

def foldMap[A, B: Monoid](va: Vector[A])(f: A => B): B =
    va.map(f).foldLeft(Monoid[B].empty)(_ |+| _)

<br>
# <center>Cats - Case Study</center>
---
<br>
<center>Map-Reduce: Parallelizing ***map*** and ***fold***</center>
<br><br><br>

<br>
# <center>Map-Reduce</center>
---
<br>
<center>def foldMap\[A, B: Monoid](va: Vector[A])(f: A => B): B</center>
<br><br><br>

In [None]:
//def foldMap[A, B: Monoid](va: Vector[A])(f: A => B): B

foldMap(Vector(1, 2, 3))(identity)
foldMap(Vector(1, 2, 3))(_ + ">>> ")
foldMap("Hello, world!".toVector)(_.toString.toUpperCase)

In [None]:
import cats.Monoid
import cats.instances.int._
import cats.instances.string._
import cats.syntax.semigroup._

def foldMap[A, B: Monoid](va: Vector[A])(f: A => B): B =
    va.map(f).foldLeft(Monoid[B].empty)(_ |+| _)

In [None]:
foldMap(Vector(1, 2, 3))(identity)

foldMap(Vector(1, 2, 3))(_ + ">>> ")

foldMap("Hello, world!".toVector)(_.toString.toUpperCase)

In [None]:
// def foldMap[A, B: Monoid](va: Vector[A])(f: A => B): B =
//     va.map(f).foldLeft(Monoid[B].empty)(_ |+| _)

def foldMap[A, B: Monoid](va: Vector[A])(f: A => B): B =
    va.foldLeft(Monoid[B].empty)(_ |+| f(_))

foldMap(Vector(1, 2, 3))(identity)
foldMap(Vector(1, 2, 3))(_ + ">>> ")
foldMap("Hello, world!".toVector)(_.toString.toUpperCase)

<br><br><br>
# <center>Parallelizing Map-Reduce</center>
<br><br><br>

In [None]:
Runtime.getRuntime.availableProcessors

In [None]:
(1 to 12).toVector.grouped(3)
// (1 to 12).toVector.grouped(3).foreach(println)

<br>
# <center>Future - Scala</center>
---
<br>
<pre>
trait Future[+T] extends Awaitable[T]
<br>
object Future {
    def apply\[T](body: => T)(implicit executor: ExecutionContext): Future[T]
}
</pre>
<br><br><br>

In [None]:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

// trait Future[+T] extends Awaitable[T]

// object Future {
//     def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T]
// }

val f = Future(foldMap((1 to 1000000).toVector)(identity))

In [None]:
f

In [None]:
import scala.concurrent.duration._
import scala.concurrent.Await

val f = Future(foldMap((1 to 1000000).toVector)(identity))

Await.result(f, 1.second)

In [None]:
// def sequence[A, M[X] <: TraversableOnce[X]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]]

Future.sequence(List(Future(1), Future(2), Future(3)))

<br>
# <center>Parallelizing Map-Reduce</center>
---
<br>
<center>def parallelFoldMap\[A, B: Monoid](va: Vector[A])(f: A => B): Future[B]</center>
<br><br><br>

In [None]:
def divideVector[A](va: Vector[A]): Iterator[Vector[A]] = va.grouped(
    (1.0 * va.size / Runtime.getRuntime.availableProcessors).ceil.toInt
)

def parallelFoldMap[A, B: Monoid](va: Vector[A])(f: A => B): Future[B] = {
    val futures: Iterator[Future[B]] = divideVector(va) map (g => Future(foldMap(g)(f)))
    Future.sequence(futures) map (_.foldLeft(Monoid[B].empty)(_ |+| _))
}

In [None]:
Await.result(parallelFoldMap((1 to 1000000).toVector)(identity), 1.second)
Await.result(parallelFoldMap((1 to 1000).toVector)(_ * 1000), 1.second)

<br>
# <center>Parallelizing Map-Reduce with Cats</center>
---
<br>
<center>def parallelFoldMap\[A, B: Monoid](va: Vector[A])(f: A => B): Future[B]</center>
<br><br><br>

In [None]:
import cats.Monoid
import cats.Foldable
import cats.Traverse

import cats.instances.int._
import cats.instances.future._
import cats.instances.vector._

import cats.syntax.semigroup._
import cats.syntax.foldable._
import cats.syntax.traverse._

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

In [None]:
def parallelFoldMap[A, B: Monoid](va: Vector[A])(f: A => B): Future[B] =
    va.grouped((1.0 * va.size / Runtime.getRuntime.availableProcessors).ceil.toInt)
      .toVector
      .traverse(g => Future(g.toVector.foldMap(f)))
      .map(_.combineAll)

Await.result(parallelFoldMap((1 to 1000000).toVector)(identity), 1.second)
Await.result(parallelFoldMap((1 to 1000).toVector)(_ * 1000), 1.second)