Big Data Machine Learning, Distributed Machine Learning, Parallel Machine Learning with C++, CUDA, Scala, Spark
=============

Ron Wu
-------------

11/2/16

Reference: free courses from the creators of
<br>NVIDIA https://developer.nvidia.com/udacity-cs344-intro-parallel-programming<br>
Scala http://www.scala-lang.org/blog/2016/05/23/scala-moocs-specialization-launched.html<br>
Spark, Databricks https://databricks.com/blog/2016/06/01/databricks-to-launch-first-of-five-free-big-data-courses-on-apache-spark.html <br> 
### Contents


1. <a href =#scala>Functional and Object-Oriented Scala = Java + Clojure</a>
     - <a href=#scalab>Scala Foundation</a>
     
         - <a href =#fun>First Class Objects</a>
         - <a href=#cl>Classes and the Three Pillars</a>  
         - <a href=#obj>Objects Everywhere</a> 
         - <a href =#collect>Collections</a> 
     - <a href = #funDe>Functional Design in Scala</a> 
     
         - <a href=#for>For Expression</a>
         - <a href=#stream>Stream & Lazy Evaluation</a> 
         - <a href=#event>Event Handling</a>
         - <a href=#ober>Observer Pattern</a> 
         
     - <a href = #pppro>Parallel Programming in Scala</a> 
         - <a href =#thread>Parallel Threads</a>
         - <a href =#par>Parallel Algorithms</a>
         - <a href =#dataStr>Parallel Data Structures</a>
         - <a href =#akka>Concurrency & Actors Modes</a>
<br><br>
2. <a href=#Spark>Big Data, Distributed Analysis with Spark</a>
     - <a href = #scalaspark>Spark Streaming</a>  
         - <a href = #scalaspark>Twitter Sentiment Analysis in Real Time</a>
     - <a href =#ssql>Spark SQL</a> 
         - <a href =#ssql>Page Ranking Application</a>
     - <a href =#sml>Spark MLlib</a>  
         - <a href =#sml>Movie Recommendation System</a>
         - <a href=#otherML>Other Big Data Machine Learning Algorithms & Statistics</a>
     - <a href =#Amazon>The Clouds - AWS Ecosystem</a> 
         - <a href =#Amazon>AWS Ecosystem</a> 
         - <a href =#google>Google Clouds ML With TensorFlow</a> 
<br><br>
3. <a href =#cuda>Parallel with CUDA, C++</a>
     - <a href=#worked>Parallel Computing</a> 
     - <a href=#worked>GPU Programming</a> 
<br><br>
 

<a name = 'scala'></a>
# Functional and Object-Oriented Scala = Java + Clojure

![](https://www.scientiamobile.com/images/icons/scala.gif)


<a name = 'scalab'></a>
## Scala Foundation


The codes in this section are mostly copied from Martin Odersky course on coursera https://www.coursera.org/learn/progfun1/

For easier transition to parallel programming, throughout this notebook I will try to use functional style over imperative style whenever possible. That is because in pure functional programming variables are immutable. Once it is set, it cannot be changed, which avoids a lot deadlocks. 

If a block of code is written in loop fashion, the compiler will turn it into functions, the iteration and any change of variables inside the loop are passing through the function calls. Thus whether you write it in serial or recursion, it will end up in recursive call anyway.  

<a name='fun'></a>
### First Class Functions
 

In [1]:
//it is okay to do this, because def is call-by-name, only definition
def loop: Int = loop 

//Do not run, infinite loop, because val is call-by-value
//val l = loop
    
def square(x: Int) = x*x

def squareFirstEle(x: Int, y: => Int) = square(x)
// => makes y call-by-name, so passing in loop is okay
squareFirstEle(2,loop)

4

In [2]:
// Newton Method (steep descent or fix point) find square root

def sqrt(x: Double) = {
        
    def sqrtIter(guess: Double): Double =
        if (math.abs(guess*guess-x) / x < 0.01) guess
        else sqrtIter((guess + x / guess) / 2) 

    sqrtIter(1.0)
}

sqrt(4.5)

2.1224976448422046

In [3]:
//  High order function  

def sum_v1(f: Int => Int, a: Int, b: Int): Int = {
    if (a > b) 0
    else sum_v1(f, a+1, b) + f(a)
}


// find the sum of the squares from 1 to 5 
sum_v1(square, 1, 5)

55

In [4]:
// rectify above to tail-recursion, so will not build up the stack
    
def sum_v2(f: Int => Int, a: Int, b: Int): Int = {
    def loop(a: Int, acc: Int): Int = {
        if (a > b) acc
        else loop(a+1, acc + f(a))
    }
    loop(a, 0)
}

sum_v2(square, 1, 5)

55

In [5]:
// sum_v3 is a pure functional
    
def sum_v3(f: Int => Int): (Int, Int) => Int =  {
        def sumF(a: Int, b: Int): Int = { 
            if (a>b) 0
            else sumF(a+1,b) + f(a)
        }
        sumF
}

// passing in anonymous
sum_v3(x => x*x)(1, 5) 

55

<a name='cl'></a>
### Classes and the Three Pillars 

In [6]:
//
// Encapsulation
//

class Rational(x: Int, y: Int)  {
    require( y != 0 , "denominator not to be zero")

    //second constructor
    def this(x: Int) = this(x, 1)

    private def gcd(a:Int, b:Int) : Int =
    //Euler method
        if (b==0) a
        else gcd(b, a%b)
    private val g = math.abs(gcd(x, y))

    val numer = x / g
    val denom = y / g


    override def toString =
        if (y!=1) this.numer + "/" + this.denom
        else (this.numer).toString

    //normally + cannot be identifier, but in Scala is legal
    def + (that: Rational) =
        new Rational(this.numer*that.denom+this.denom*that.numer, this .denom*that.denom)

    def unary_- = new Rational(-numer, denom)

}

val r = new Rational(1,2)
println("r rational: " + r.toString)


val s = new Rational(3,4)

// this - is special it right next to the object, so use unary_-
println("negative of s rational: "+(-r).toString)

//this is same as (r.+(s)).toString
println("sum of rationals r & s: " + (r + s).toString)

val t = new Rational(2)
println("integer t: "+t.toString)

r rational: 1/2
negative of s rational: -1/2
sum of rationals r & s: 5/4
integer t: 2


In [7]:
//
//  inheritance
//

abstract class Node{
    def insert(x: Int) : Node
    def search(x: Int) : Boolean
}

//Java supports only single inheritance. For multiple, use with traits

//so the default constructor takes 3 arguments
class NonEmpty(elem: Int, left: Node, right: Node)  extends Node{


    def insert(x: Int) : Node = {
        if ( x < elem ) new NonEmpty(elem, left insert x , right)
        else if (x > elem) new NonEmpty(elem, left, right insert x)
        else this
    }

    def search(x: Int) : Boolean = {
        if ( x < elem ) left search x
        else if ( x > elem ) right search x
        else true
    }
    
    // because of override in Empty, override left toString will be called
    override def toString = "{" + left + elem + right + "}"
}    

class Empty extends Node{

    def insert(x: Int) : Node = new NonEmpty(x, new Empty, new Empty)
    def search(x: Int) : Boolean = false
    override def toString = "-"
}

val tree1 = new NonEmpty(3, new Empty, new  Empty)

// a copy of tree1 with 4 inserted
val tree2 = tree1 insert 4  

println(tree1.toString)
println(tree2.toString)

{-3-}
{-3{-4-}}


In [8]:
//
//  Polymorphism
//

import java.util.NoSuchElementException 

//type parameter
trait Node[T]{
    def isEmpty: Boolean
    def data: T
    def pt: Node[T]
}


//the default constructor takes 2 arguments
//this also means that 2 corresponding fields of the class are
//defined through pass in variables
class Cons[T](val data: T, val pt: Node[T]) extends Node[T]{
    def isEmpty = false
}

class Nil[T] extends Node[T]{
    def isEmpty = true
    def data : Nothing = throw new NoSuchElementException("Nil.data")
    def pt: Nothing = throw new NoSuchElementException("Nil.pt")
}

def singleton[T](elem:T) = new Cons[T](elem, new Nil[T])

println(singleton[Double](1.1).data)


// this will threw exception
// println(singleton[Double](1.1).pt.data)


//the type parameter is redundant 
val ll = new Cons(3, new Cons(2, new Cons(1, new Nil)))

println(ll.data)
println(ll.pt.data)
println(ll.pt.pt.data) 

1.1
3
2
1


<a name ='ooscala'></a> 
## Object-Oriented Aspect of Scala


<a name ='obj'></a>
### Objects Everywhere

In [9]:
/*
    this cell can only run once, because it has static object
*/

// This shows unsigned Int, a primitive type, can be represented as class object
// This is the same idea in Number theory that one can construct every natural number
// starting from the empty set

abstract class Nat {
    def isZero: java.lang.Boolean
    def predecessor: Nat
    def successor: Nat
    def + (that: Nat): Nat
    def - (that: Nat): Nat 
    override def toString: String
}


class Succ(n: Nat) extends Nat{
    def isZero = false
    def predecessor = n
    def successor = new Succ(this)
    def +(that: Nat) = new Succ(n + that)
    def -(that: Nat) = if (that.isZero) this else n - that.predecessor

    override def toString: String = "I" + n
}

//hence zero is unique
object Zero extends Nat{
    def isZero = true
    def predecessor = throw new Error("0 has no pred")
    def successor = new Succ(this)
    def +(that: Nat) = that
    def -(that: Nat) = if (that.isZero) this else throw new Error("negative number")

    override def toString: String = ""
}


def NextNat = new Succ(Zero)
val one = NextNat
val two = one.successor
val three = two.successor
println(three)
println(three-one)

III
II


In [10]:
//
// Functions as objects too
//

// function take 1 argument, with type parameters A, B 
trait Function1[A,B]{
    def apply(x : A) : B
}


def f =  {
    // anonymous function 
    // (x: Int) => x*x
    // the name AnonFun only exists inside of the block
    class AnonFun extends Function1[Int, Int] {
        def apply(x: Int) = x*x
    }
    new AnonFun
}

f(2)

4

#### Covariance, Invariance and Contravariance


In [11]:

/*

Let us denote type A is derived from type B as

    A < B 
    
If f is a type transformation, e.g. A -> Array[A], or work with generic A -> List<A>, 

Convariance 

    A < B => f(A) < f(B)

controvariance

    A < B => f(A) > f(B)
    
invariance

    Neither above.
    
In Java, Array[] is convariance, and generic is invariance, 

but in scala, both are not covariant.

but just like in Java, one can turn them into co/controvariace by using the wildcards


That is
 
    B[] array_b = new A[1]; 

is allowed and ArrayList<B> list_b = new ArrayList<A>(); is not allowed.

and elements of array_b are actually upcasted type A object. This could cause runtime errors.


That is

        
    B b = new A();  // both are fine
    
    b = new B();
        
    

But if you put them in array

    B[] array_b = new A[1];  
    
    array_b[0] = new B();     // this passes the compiler but at runtime throw ArrayStoreException
    
    //what one should do is
    
    array_b[0] = new A(); 
    
    //or
    
    array_b[0] = (B)new A(); 


The wildcards make compiler aware the problems
    
    ArrayList<? extends B> list_b = new ArrayList<A>();
    
    ArrayList<? super A> list_a = new ArrayList<B>();
    

In this way, the compiler can check the type and will not pass the check. Because

now the compiler knows 
    

    elements of list_b are sub class of B, hence any method of any sub class of B
    
including B itself can be safely invoked on elements of list_b


    elements of list_a are super class of A, hence any super class of A including A itself 
    
can be safely casted into list_a. 


so unlike before

    list_b.add(new B()); // will not pass the compiler. In fact the compiler won't allow to add anything
    
    // the legitimate way is to copy
    
    // first create something
    ArrayList<A> someList = new ArrayList<A>();
    someList.add(new A());
        
    // then
    ArrayList<? extends B> list_b = new ArrayList<A>(someList);
    list_b.get(0) //this is actually object A, not the upcasted A

    //For the super list
    ArrayList<? super A> list_a = new ArrayList<B>();
    list_a.add(new A());
    
    // or
    
    
*/

// In Scala  putting + / - , otherwise it is invariant
    

class Covariance_List[+T]{}
class Controvariace_List[-T]{} 

class B {}
class A extends B {} 

object main extends App{
    val list_B : Covariance_List[B] = new Covariance_List[A] 
    val list_A : Controvariace_List[A] = new Controvariace_List[B]  
}



With the use of Covariance, Invariance and Contravariance

One needs to know the Type Hierarchy<br>
Ref http://docs.scala-lang.org/tutorials/tour/unified-types.html
![](http://docs.scala-lang.org/resources/images/classhierarchy.img_assist_custom.png)

<a name ='collect'></a>
## Collections

Immutable Collection
![](http://docs.scala-lang.org/resources/images/collections.immutable.png)
<br>
Mutable Collection
![](http://docs.scala-lang.org/resources/images/collections.mutable.png)

In [12]:
//
// List, Immutable
//

val list1 = List(1,2,3)
val list2 = List(4,5,6) 
println(list1.length)
println(list1.take(2))
println(list1(0))

//same as list1 ++ list2, 
println(list1 ::: list2)

//:: is the append method in List1, with three ::: another : reverse the operands
println(list1.::( list2) )

3
List(1, 2)
1
List(1, 2, 3, 4, 5, 6)
List(List(4, 5, 6), 1, 2, 3)


In [13]:
//
// impletment concatenation

def concat[T](xs: List[T], ys: List[T]): List[T] = xs match {
    case List() => ys
    case z :: zs => z :: concat(zs, ys)    
}

concat(list1, list2)

List(1, 2, 3, 4, 5, 6)

In [14]:
// implement removeAt

def removeAt[T](n: Int, x: List[T]) = {
    (x take n) ::: (x drop n+1)
}

removeAt(1, list1)

List(1, 3)

In [15]:
// insertion sort


def isort(xs: List[Int]): List[Int] = xs match {
    case List() => List()
    case y :: ys => {
        def insert(x1: Int, xs1: List[Int]): List[Int] = xs1 match {
            case List() => List(x1)
            case y1 :: ys1 => if (x1 < y1) x1 :: xs1 else y1 :: insert(x1, ys1)
        }
        insert(y, isort(ys))
    }
}

isort(List(5, 30, 10, 1, 15))

List(1, 5, 10, 15, 30)

In [16]:
//
// merge sort
//
 
def msort(xs: List[Int]) : List[Int] = {

    val n = xs.length / 2
    if (n==0) xs
    else {
        def merge(xs: List[Int], ys: List[Int]): List[Int] = (xs, ys) match {
            case (Nil, ys) => ys
            case (xs, Nil) => xs
            case (x :: xs1, y:: ys1) => { 
                
                if (x < y) x :: merge(xs1, ys)
                else y :: merge(xs, ys1)
            }
             
        } 
        val (fst , snd ) = xs splitAt n
        merge(msort(fst), msort(snd))
    }
}

msort(List(5, 30, 10, 1, 15))



List(1, 5, 10, 15, 30)

In [17]:
// merge sort with type parameter, user-defined comparison


def msort[T](xs: List[T])(lt: (T,T) => Boolean) : List[T] = {

        val n = xs.length / 2
        if (n==0) xs
        else {
            def merge(xs: List[T], ys: List[T]): List[T] = (xs, ys) match {
                case (Nil, y :: ys1) => ys
                case (x :: xs1, Nil) => xs
                case (x :: xs1, y:: ys1) => {

                    if (lt(x,y)) x :: merge(xs1, ys)
                    else y :: merge(xs, ys1)
                }

            }

            val (fst , snd ) = xs splitAt n
            merge(msort(fst)(lt), msort(snd)(lt))
        }
    }


msort(List(5, 30, 10, 1, 15))((x, y) => x < y)

List(1, 5, 10, 15, 30)

In [18]:
// merge sort with type parameter, default ordering comparison
// adding implicit and ask compiler to fill in the missing part

import math.Ordering 

def msort[T](xs: List[T])(implicit ord: Ordering[T]) : List[T] = {

    val n = xs.length / 2
    if (n==0) xs
    else {
        def merge(xs: List[T], ys: List[T]) : List[T] = (xs, ys) match {
            case (Nil, y :: ys1) => ys
            case (x :: xs1, Nil) => xs
            case (x :: xs1, y:: ys1) => {

                if (ord.lt(x,y)) x :: merge(xs1, ys)
                else y :: merge(xs, ys1)
            }

        }

        val (fst , snd ) = xs splitAt n
        merge(msort(fst), msort(snd)) 
        //merge(msort(fst)(ord), msort(snd)(ord)) 
    }
}


//without implicit, we need to
//msort(List(5, 30, 10, 1, 15))(Ordering.Int) 

msort(List(5, 30, 10, 1, 15))

List(1, 5, 10, 15, 30)

In [19]:
//  functions on list, using map, filter ... (called transformers)
// Later we will talk about transformation vs action in spark
// using them wisely is important, because we don't want to write any loops.


def rescale(xs: List[Int], factor :Int) = {
    xs map (x => x * factor)   //Scala map is tail-recursive
}

val l = List(1, 3, 5, 2, 4)
println(rescale(l, 5))

// or

println(l map (x => x * 3)) 

println(l filter (x => x < 3))
println(l filterNot (x => x < 3))
println(l partition (x => x <3))  //partition around filter and fitlerNot
  

println(l takeWhile (x => x< 3))  // take the prefix of the list up the criteria 
println(l dropWhile (x => x< 3)) 
println(l span (x => x< 3))       // combine takeWhile and dropWhile

List(5, 15, 25, 10, 20)
List(3, 9, 15, 6, 12)
List(1, 2)
List(3, 5, 4)
(List(1, 2),List(3, 5, 4))
List(1)
List(3, 5, 2, 4)
(List(1),List(3, 5, 2, 4))


In [20]:
// implement encode. No loops

def encode [T](xs: List[T]) = {

        def pack(ls: List[T]) : List[List[T]] =  ls match{

            case List() => List()
            case x :: xs1 => {
                val (fst, scn) = ls span (y => y == x)
                fst :: pack(scn)
            }
        }
        pack(xs) map ( x => (x.head, x.length))
    }

 

val data = List("a", "a", "a", "b", "b", "c", "a")

encode(data)

List((a,3), (b,2), (c,1), (a,1))

In [21]:
// reduceLeft applies operation to the left of the element 
// so below is the same as, but it will be tail-recursive, using foldLeft
//      
//     (( 1 + 2 ) + 3 ) +4
//
List(1,2,3,4).reduceLeft( _ + _ ) //  _ + _  is the same as (x, y) => x + y 

10

In [22]:
// reduceRight applies operation to the right of the element 
// so below is the same as
//      
//    1 + ( 2 + ( 3 + 4 ) )
//
List(1,2,3,4).reduceRight( _ + _ )

10

In [23]:
// foldLeft

val z = 0

println((List(1,2,3,4) foldLeft z )( _ + _ )) 

//tail-recursive, using z as initial accumulator, but the value of z doesn't change

println(z)

10
0


In [24]:

// List has sorted 

val fruit = List("apple","pineapple", "pear", "banana")

println(fruit sortWith(_.length < _.length))
println(fruit sorted)

List(pear, apple, banana, pineapple)
List(apple, banana, pear, pineapple)


In [25]:
// iterable
//   sequence
//     List -> linked list
//     Vectors -> very shallow tree, node has 32 children 
//     array
//     string
//     range
//   set 
//   map
()

In [26]:
// example dot product

val v1 = Vector(1,2,3,4,5)
val v2 = Vector(1,2,3,4,5)

def dotProd(v1: Vector[Int], v2: Vector[Int]) = {
    (v1 zip v2).map(xy => xy._1 * xy._2).sum 
}

dotProd(v1, v2)

55

In [27]:
// range is seq

def isPrime(n: Int)  = (2 until n) forall (d => n%d != 0)

isPrime(19)

true

In [28]:

// double loops in scala

((1 until 4) map (i => (1 until 4 ) map (j => (i,j)))).flatten

Vector((1,1), (1,2), (1,3), (2,1), (2,2), (2,3), (3,1), (3,2), (3,3))

In [29]:
// or 

(1 until 4) flatMap (i => (1 until 4 ) map (j => (i,j)))

Vector((1,1), (1,2), (1,3), (2,1), (2,2), (2,3), (3,1), (3,2), (3,3))

In [30]:
// simplier way is to use For

// For

// find pairs summing to a prime
 
for {                 //use curly bracket save commas
    i <- 1 until 4    //generator
    j <- 1 until i    
    if isPrime(i+j)   //filter i, j 
} yield (i,j)

Vector((2,1), (3,2))

In [31]:
// same as above, but much convoluted

                                        //  the case is pattern matching, 
                                        //   the full expression is  filter (e=>e match { case ...
(1 until 4) flatMap (i => (1 until i ) map (j => (i,j))) filter ( {case (x,y)=> isPrime(x+y)} )


Vector((2,1), (3,2))

In [32]:
// rewrit dot product using For

val v1 = Vector(1,2,3,4,5)
val v2 = Vector(1,2,3,4,5)

def dotProd(v1: Vector[Int], v2: Vector[Int]) = {
    (for ( (i,j) <- (v1 zip v2)) yield i*j).sum
}

dotProd(v1, v2)

55

In [33]:
 
//
// Set not seq, unorder, no duplicate
//

// 8-queens

// the following codes are copied from Martin Odersky course
// Compare it to my code http://nbviewer.jupyter.org/github/ronnnwu/codetheInterviewExercises/blob/master/Ch8.ipynb
// problem 8.12 

// Scala immutable List makes recursion very natural and the high-order list function, 
// for iterator really helped too

// In python I had to make copies and write loops over loops. Scala is faster too.

def queens(n: Int) : Set[List[Int]] ={ 
    
    //place queen at row k
    def placeQueens(k: Int) : Set[List[Int]] = {
        if (k==0) Set(List())
        else {
            for {
                queens <- placeQueens(k-1)
                col <- 0 until n
                if isSafe(col, queens)

            } yield col :: queens  
            //output e.g. List(1,5,3) means queens at row 0 col 3, row 1 col 5, row 2 col 1 
            //hence as k increases it appends queens to the front
        }
    }
    
    def isSafe(col: Int, queens: List[Int]): Boolean = {
        val row  = queens.length
        // decode queens row and col and rows are in reverse order
        val queensWithRow = (row - 1 to 0 by -1) zip queens
        queensWithRow forall {
            case (r,c) => col != c && math.abs(col-c) != row - r // check no col used, no diagonal match
        }
    }
    
    placeQueens(n)
}

 
println(queens(8).size) 

def prettyShow(queens: List[Int]) = {
    val lines = {
        for (col <- queens.reverse)
            yield Vector.fill(queens.length)("* ").updated(col, "X ").mkString

    }
    print("\n\n" + (lines mkString "\n"))
}

queens(8) take 3 map prettyShow

92


* * * * * X * * 
* * * X * * * * 
* X * * * * * * 
* * * * * * * X 
* * * * X * * * 
* * * * * * X * 
X * * * * * * * 
* * X * * * * * 

* * * * X * * * 
* * * * * * X * 
* X * * * * * * 
* * * X * * * * 
* * * * * * * X 
X * * * * * * * 
* * X * * * * * 
* * * * * X * * 

* * * * * X * * 
* * X * * * * * 
* * * * * * X * 
* * * X * * * * 
X * * * * * * * 
* * * * * * * X 
* X * * * * * * 
* * * * X * * * 

Set(())

In [34]:
//
// Map
//

val Capital = Map("US"->"DC", "France"->"Paris")

println(Capital get "US")

println(Capital + ("US"->"NYC")) 

// + map will replace value which is same as

println(Capital ++ Map("US"->"NYC")) 

Some(DC)
Map(US -> NYC, France -> Paris)
Map(US -> NYC, France -> Paris)


In [35]:

// List has groupBy
// returns a Map

val fruit = List("apple","pineapple", "orange", "banana")

fruit groupBy(_.length)
 

Map(5 -> List(apple), 9 -> List(pineapple), 6 -> List(orange, banana))

In [36]:
// example polynomial class

                              // deg, coeff
class Poly (val terms0: Map[Int, Double])  {
    
    val terms = terms0 withDefaultValue 0.0  //this allows terms(deg) in addTerm to return 0
    
    def + (other: Poly) = new Poly((other.terms foldLeft terms)(addTerm))
    
    def addTerm(terms: Map[Int,Double], term: (Int, Double)) = {
        val (deg, coeff) = term
        terms + (deg -> (coeff + terms(deg))) //use terms as initial accumulator and update its value
    }
    override def toString =
        (for ( (deg, ceof)<- terms.toList.sorted.reverse )
            yield ceof + " * x ^ " + deg) mkString(" + ")
}

val p1 = new Poly(Map(1->1.1, 2->2.2))
val p2 = new Poly(Map(1->1.1, 3->3.3))

p1 + p2

3.3 * x ^ 3 + 2.2 * x ^ 2 + 2.2 * x ^ 1

In [37]:
// an improved version

class Poly (val terms0: Map[Int, Double])  {
    
    // another constructor will take a squence of (Int,Double) unspecified size
    // and pass to the default constructor
    def this(bindings: (Int, Double)*) = this(bindings.toMap)
    
    val terms = terms0 withDefaultValue 0.0

    def + (other: Poly) = new Poly((other.terms foldLeft terms)(addTerm))
    def addTerm(terms: Map[Int,Double], term: (Int, Double)) = {
        val (deg, coeff) = term
        terms + (deg -> (coeff + terms(deg)))
    }
    override def toString =
        (for ( (deg, ceof)<- terms.toList.sorted.reverse )
            yield ceof + "*x^" + deg) mkString("+")
}

val p1 = new Poly(1->1.1, 2->2.2)
val p2 = new Poly(1->1.1, 3->3.3)

p1 + p2
 

3.3*x^3+2.2*x^2+2.2*x^1

In [38]:
//example of translating a sequence of number to a sentence

import scala.io.Source

val in = Source.fromURL("http://lamp.epfl.ch/files/content/sites/lamp/files/teaching/progfun/linuxwords.txt")

//get common English words, drop hyphenated words, symbols etc
val words = in.getLines.toList filter (word => word forall (chr => chr.isLetter)) 
println(words.length)   //  45382 words

// telephone keypad
val mnem = Map('2' -> "ABC", '3' -> "DEF", '4' -> "GHI",'5' -> "JKL",
    '6' -> "MNO", '7' -> "PQRS", '8' -> "TUV", '9' -> "WXYZ")

// reverse Map of mnem
val charCode: Map[Char, Char] =  for ((digit, str) <- mnem; ltr <- str) yield (ltr -> digit)

// the groupBy will return a map from numbers to all 45382 words
val wordsForNum: Map[String, List[String]] = {
    
    def wordCode(word: String) =  word.toUpperCase map charCode
    words groupBy wordCode withDefaultValue List()  //not found default is an empty List()
}

def encode(number: String): Set[List[String]] = {
    if (number.isEmpty) Set(List())
    else {
        for {
            split <- 1 to number.length
            word <- wordsForNum(number take split)  
            // if wordsForNum return an empty List, the iterator is Nil 
            // so that that iteration is skipped
            
            rest <- encode(number drop split) //the iterator is an element of the set hence a List
        } yield word :: rest // a string of a word concat to a list
    }.toSet
}


def translate(number: String): Set[String] =
    encode(number) map ( _ mkString " " )

translate("7225247386")

45382


Set(sack air fun, pack ah re to, pack bird to, Scala ire to, Scala is fun, rack ah re to, pack air fun, sack bird to, rack bird to, sack ah re to, rack air fun)

<a name = 'funDe'></a>
## Functional Design in Scala

Codes in this section are mostly copied from Martin Odersky course on coursera https://www.coursera.org/learn/progfun2/


<a name='for'></a>
### For Expression

In [39]:
// Scala with JSON

/*
    run this cell only once 
*/

abstract class JSON
case class JSeq(elem: List[JSON]) extends JSON
case class JObj(bingdings: Map[String, JSON]) extends JSON
case class JNum(num: Int) extends JSON
case class JStr(str: String) extends JSON
case class JBool(b: Boolean) extends JSON
case object JNull extends JSON

val data = JObj(Map(
    "firstName" -> JStr("John"),
    "lastName"->JStr("Smith"),
    "address"->JObj(Map(
        "streeAddress"->JStr("21 2nd Street"),
        "state"->JStr("NY"),
        "postslCode"->JNum(10021)
    )),
    "phoneNumer"->JSeq(List(
        JObj(Map(
            "type"->JStr("home"),
            "number"->JStr("212 344 1345")
        )),
        JObj(Map(
            "type"->JStr("cell"),
            "number"->JStr("212 542 1345")
        ))
    ))
))

def prettyJSON(json: JSON) : String = json match {
    case JSeq(elem) =>
        "[" + (elem map prettyJSON mkString ", ") + "]"
    case JObj(bindings) =>
        val assoc = bindings map {
            case (key, value) => "\"" + key + "\": " + prettyJSON(value)
        }
        "{" + (assoc mkString ", ") + "}"
    case JNum(num) => num.toString
    case JStr(str) => "\"" + str + "\""
    case JBool(b) => b.toString
    case JNull => "Null"
}

prettyJSON(data)


{"firstName": "John", "lastName": "Smith", "address": {"streeAddress": "21 2nd Street", "state": "NY", "postslCode": 10021}, "phoneNumer": [{"type": "home", "number": "212 344 1345"}, {"type": "cell", "number": "212 542 1345"}]}

In [40]:
// use For as select for JSON
// same idea as Linq in C#

val data_collect = List(data) // suppose we have more than one piece of data

    for {
        JObj(bindings) <- data_collect
        JSeq(phones) = bindings("phoneNumer")
        JObj(phone) <- phones
        JStr(digits) = phone("number")
        if digits contains "344"
    } yield (bindings("firstName"), bindings("lastName"), digits)


List((JStr(John),JStr(Smith),212 344 1345))

In [41]:
// For for NoSql

case class Book(title: String, author: String)

val books: Set[Book] =  Set(  

    //using set will avoid not only duplicated items, but also duplicated selections
    
    Book( "Harry Potter and the Deathly Hallows", "J. K. Rowling"),
    Book( "Harry Potter and the Philosopher's Stone", "J. K. Rowling"), 
    Book( "Harry Potter And The Cursed Child", "J. K. Rowling"),  
    Book( "Angels & Demons", "Dan Brown"),
    Book( "The Audacity of Hope", "Barack Obama")
)


for {
    b1 <- books
    b2 <- books
    if b1.title < b2.title
    if b1.author == b2.author  //author has more than 1 book
} yield  b1.author


Set(J. K. Rowling)

In [42]:
// For as random generator

trait Generator[+T] {
    self =>
    def generate: T

    //handmade map to trick For-expression 
    def map[S](f: T => S): Generator[S] = new Generator[S] {
        def generate = f(self.generate)
    }

    def flatMap[S](f: T => Generator[S]): Generator[S] = new Generator[S] {
        def generate = f(self.generate).generate
    }
}

val integers = new Generator[Int] {
    val ran = new java.util.Random
    def generate = ran.nextInt()
}
println(integers.generate)

//for other types, .nextBoolean(), 
//val booleans = new Generator[ Boolean] {
//    def generate = integers.generate > 0
//}

// or
val booleans = for( x <- integers) yield (x > 0)

//that is because
//
// for ( x <- e1 ) yield e2
//
// is actually
//
//     e1.map(x => e2)
//
// which translates to
//
//     val booleans = integers.map(_>0)
//
println(booleans.generate)


def pair[U,V](u: Generator[U], v: Generator[V]) = for {
    x <- u
    y <- v
} yield (x,y)

//that is because
//
// for {
//      x <- e1
//      y <- e2
// } yield e3
//
// is actually
//
//     e1.flatMap(x => ( e2.map( y => e3)) )
//
// which translates to
//
//     def pair[U,V](u: Generator[U], v: Generator[V]) = for {
//             u flatMap (x => v map ( y => (x,y) )
//
println(pair(booleans, booleans).generate)

-1855451955
false
(false,false)


In [43]:
// get random number from 1 upto 9

def choose(lo: Int, hi: Int) : Generator[Int] =
        for (x <- integers) yield lo + (math.abs(x)%(hi - lo))

choose(1, 10).generate

9

In [44]:
// return a random element

def oneOf[T](xs: T*): Generator[T] ={
        for (idx <- choose (0, xs.length)) yield xs(idx)
    }

oneOf("a","b","c","d","e","f","g").generate

c

In [45]:
// random list of arbitrary length

def lists: Generator[List[Int]] = {
    
    def single[T] (x: T)  = new Generator[T] {
        def generate = x
    }

    def nonEmptyList = for {
        head <- choose(1, 11)
        tail <- lists
    } yield head :: tail
    
    for {
        isEmpty <- choose(1, 11)
        list <- if (isEmpty >= 10) single(Nil) else nonEmptyList
        //need to put Nil in wrap because the generate call will be called on it
    } yield list

}

lists.generate

List()

<a name='stream'></a>
### Stream & Lazy Evaluation

In [46]:
//
// Stream same as List, but only create them fully when someone evaluates it

(1 to 100) 

Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

In [47]:
(1 to 100).toStream 

Stream(1, ?)

In [48]:
// lazy means store results from last evaluation.
// let us compare call-by-name(def), call-by-value(val), and lazy

def expr = {
    def x = { print("x"); 1 }  // this is evaluated when it is called
    
    lazy val y = { print("y"); 1 } //this will not be evaluted until it is called, 
                                   //After it is called, it will not be called again, the value is stored

    val z = { print("z") ; 1 }  // this is evaluated when the program goes through this line 
                                // and the value is stored

    x + x + y + y + z + z   //this is evaluated from left to right
}

expr

zxxy

6

In [49]:
//with Stream one can play with an infinite list

//this creates an infinite Stream
def from(n: Int) : Stream[Int] = n #:: from(n+1)

// #:: is for stream concat
def sieve(s: Stream[Int]): Stream[Int] =
    s.head #:: sieve(s.tail filter ( _ % s.head != 0)) //filter out any multiple of primes

val primes = sieve(from(2))

//this returns the first 100 primes
primes.take(100).toList  //Stream take method will only creates list up to the first 100th primes


List(2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541)

In [50]:
// for convergence problem, we no longer have to worry if the sequence is infinite or not

def sqrtStream(x : Double) : Stream[Double] = {
    def improve (guess: Double) = (guess + x / guess) / 2
    lazy val guesses: Stream[Double] = 1 #:: (guesses map improve) 
    guesses
}
def isGoodEnough(x: Double, n: Double)={
    math.abs(x*x-n) < 0.0001
}
sqrtStream(4).filter(x=>isGoodEnough(x,4)).take(1).toList

List(2.0000000929222947)

In [51]:
//
// N Water jugs problems 
//
//
class Pouring(capacity: Vector[Int]){  //capacity is the max amount of water each glass can take 
    
    val glasses = 0 until capacity.length //label glasses 0 to n-1
 
    type State = Vector[Int] 
    val initialState = capacity map ( x => 0)  //create a new Vector with all zeros
   
    //total possbile moves
    val moves =
        (for (g <- glasses) yield Empty(g)) ++
        (for (g <- glasses) yield Fill(g)) ++
        (for (from <- glasses; to <- glasses if from != to) yield Pour(from, to))

 
    trait Move {
        def change(state: State) : State //state: amount of water in each glass
    }
    case class Empty(glass: Int) extends Move{
        def change(state: State)  =  state updated (glass, 0)
    }
    case class Fill(glass: Int) extends Move{
        def change(state: State)  =  state updated (glass, capacity(glass))
    }
    case class Pour(from: Int, to : Int) extends Move{
        def change(state: State)  = {
            val amount = state(from) min (capacity(to) - state(to))
            state updated(from, state(from) - amount) updated(to, state(to) + amount)
        }
    }

      
    class Path(history: List[Move], val endState: State){
        //def endState: State = trackState(history)
        //private def trackState(xs: List[Move]) : State = xs match {
        //    case Nil => initialState
        //    case move :: xs1 => move change trackState(xs1)
        //}

        //accomplished the same as above
        //def endState: State = (history foldRight initialState) ( _ change _ )

        //or avoid recursive compute endState, update the endState when new moves are added
        //new moves are added in front
        def extend(move: Move) = new Path(move::history, move change endState)
        
        override def toString = (history.reverse mkString " " ) + " --> " + endState
    }

    val initialPath = new Path(Nil, initialState)

    def from(paths: Set[Path], explored: Set[State]): Stream[Set[Path]] =
        if (paths.isEmpty) Stream.empty
        else {
            val more = for {
                path <- paths
                next <- moves map path.extend  //this will add the every possible move to all pathes
                if !(explored contains next.endState) //if the final state has been reached, it will skip
            } yield next
            paths #:: from(more, explored ++ (more map (_.endState))) //generating pathes, an infinite list of set
        }                                                             

    val pathSets = from(Set(initialPath), Set(initialState)) 

    def solution (target: Int) : Stream[Path] =
        for {
            pathSet <- pathSets   //and thanks Stream in from method, it will only create path when it needs to
            path <- pathSet
            if path.endState contains target
        } yield path
}


// any number of glasses.
// specify the desired amount to be in any one of the glasses
val problem = new Pouring(Vector(4, 5, 9))
problem.solution(7).take(1).toList

List(Fill(0) Pour(0,1) Fill(0) Pour(0,2) Fill(0) Pour(0,2) Pour(2,1) --> Vector(0, 5, 7))

<a name='event'></a>
### Event Handling

In [52]:
/*
    the follow codes cannot run in the cell. It has to save and compile in files.
    
    
    It contains 4 classes and 1 test unit. It would be better to save them in 5 separated files
    
    The program simulates a digital halfAdder comprised by inverter, andGate, orGate
    
    and produces
    
    
            At time = 0, sum wire = 0
            At time = 0, carry wire = 0
            *** Simulation started, time = 0 ***
            At time = 2, sum wire = 1
            *** Simulation started, time = 2 ***
            At time = 3, carry wire = 1
            At time = 5, sum wire = 0
            *** Simulation started, time = 5 ***
            At time = 6, carry wire = 0
            At time = 8, sum wire = 1
 

*/


abstract class Simulation {
    type Action = () => Unit

    case class Event(time: Int, action: Action) //logging

    private var curTime = 0
    def currentTime: Int = curTime //time getter

    private var agenda : List[Event] = List()

    private def insert(ag: List[Event], item: Event) : List[Event] = ag match{
        //insert event based on time, later event add to the end
        case first:: rest if first.time <= item.time =>
            first :: insert(rest, item)
        case _ =>
            item :: ag
    }

    def afterDelay(delay: Int)(block : => Unit): Unit = {
        //as signal passing through each gate, events will be logged
        //the delay creates a time stamp. 
        //Without it, outputs will change instantaneously, thus it helps to simulate the signal 
        //more realistically. 
        //From the time stamps one can create electrical signal graph like in an oscilloscope
        val item = Event(curTime + delay, () => block)
        agenda = insert (agenda, item)
    }
    def run(): Unit = {
        afterDelay(0){
            println("*** Simulation started, time = " + currentTime + " ***")
        }
        loop() //run everything in the log
    }
    private def loop(): Unit = agenda match {
        case first :: rest =>
            agenda = rest
            curTime = first.time
            first.action()
            loop()
        case Nil =>
    }
}

trait Parameters{

    def InverterDelay = 1  // delay time in seconds
    def AndGateDelay = 1
    def OrGateDelay = 1
}

abstract class Gates extends Simulation{

    def InverterDelay : Int
    def AndGateDelay : Int
    def OrGateDelay : Int

    class Wire{

        private var sigVal = false
        private var actions: List[Action] = List()  //connect wire to gates

        def getSignal: Boolean = sigVal

        def setSignal(s: Boolean) : Unit ={
            if (s != sigVal ) {
                sigVal = s

                // when signal is changed,
                // action is triggered
                actions foreach (_())
            }
        }
        def addAction(a: Action) : Unit ={
            actions = a :: actions
            // actions are triggered
            a()
        }

    }
    
    def inverter(input: Wire, output: Wire): Unit = {
        def invertAction(): Unit = {
            val inputSig = input.getSignal
            afterDelay(InverterDelay){
                output setSignal !inputSig
            }
        }
        input addAction invertAction
    }
    def andGate(input1: Wire, input2: Wire, output: Wire ): Unit ={
        def andAction(): Unit = {
            val in1Sig = input1.getSignal
            val in2Sig = input2.getSignal
            afterDelay(AndGateDelay){
                output setSignal (in1Sig & in2Sig)
            }
        }

        input1 addAction andAction
        input2 addAction andAction
    }

    def orGate(input1: Wire, input2: Wire, output: Wire ): Unit = {
        def orAction(): Unit = {
            val in1Sig = input1.getSignal
            val in2Sig = input2.getSignal
            afterDelay(OrGateDelay){
                output setSignal (in1Sig | in2Sig)
            }
        }

        input1 addAction orAction
        input2 addAction orAction
    }


    def probe(name: String, wire: Wire): Unit = {
        def probeAcion(): Unit = {
            println(s"At time = $currentTime, $name wire = ${if (wire.getSignal) 1 else 0}")
        }
        wire addAction probeAcion
    }
}

abstract class Circuits extends Gates{

    def halfAdder(a: Wire, b: Wire, s: Wire, c: Wire): Unit ={
        //s: sum = a+b. 0 when a=b=0 or a=b=1, else 1
        //c: carry = 1 when a=1, b=1, else 0
        val d, e = new Wire
        orGate(a,b,d)
        andGate(a,b,c)
        inverter(c,e)
        andGate(d,e,s)
    }

    def fullAdder(a: Wire, b: Wire, cin: Wire, sum: Wire, cout: Wire): Unit ={
        // sum = (a+b+cin) / 2
        // cout = (a+b+cin) % 2
        val s, c1, c2 = new Wire
        halfAdder(b, cin, s, c1)
        halfAdder(a, s, sum, c2)
        orGate(c1,c2, cout)
    }
}

object main extends App{
    object sim extends Circuits with Parameters
    import sim._

    val in1, in2, sum, carry = new Wire

    halfAdder(in1, in2, sum, carry)
    probe("sum", sum)
    probe("carry", carry)

    in1 setSignal true

    run()

    in2 setSignal true
    run()


    in1 setSignal false
    run()
}

<a name ='ober'></a>
### Observer Pattern

In [53]:
/*
    the follow codes are copied from Martin Odersky course
     

*/

object main extends App {
    
    trait Subscriber {
        def handler(pub: Publisher)
    }

    trait Publisher{
        private var subscribers: Set[Subscriber] = Set()

        def subscribe(sub: Subscriber): Unit = {
            subscribers += sub
        }

        def unsubscribe(sub: Subscriber): Unit = {
            subscribers -= sub
        }

        def publish():  Unit = {
            subscribers.foreach(_.handler(this))
        }

    }



    class BankAccount extends Publisher {
        private var balance = 0  // Mutable States
        def currentBalance = balance // getter, only read , no write
        def deposit(amount: Int)  = {
            if ( amount > 0 ) {
                balance = balance + amount  
                publish()
            }

        }
        def withdraw (amount: Int) ={
            if (0 < amount && amount <= balance) {
                balance = balance - amount
                publish()
            }else throw new Error("insufficient funds")
        }
    }

    class Consolidator(observed: List[BankAccount]) extends Subscriber{

        observed.foreach(_.subscribe(this))  //make Consolidator instance to subscribe
                                             //the list of bankAccounts, which are publishers

        private var total: Int = _ // uninitialize
        compute() //calling compute() to initial total

        def totalBalance = total

        private def compute() =
            total = observed.map(_.currentBalance).sum 

        def handler(pub: Publisher) = compute()


    }


    val a, b = new BankAccount
    val c = new Consolidator(List(a, b))

    println(c.totalBalance)
    a deposit 20 
    b deposit 30
    println(c.totalBalance)

}

In [54]:
/*

Observer Pattern has a lot shortcomings, 

so instead of writing our own observer pattern, a new way is to use reactive programming library.

See new book just released October, 2016

    Reactive Programming with RxJava
    Creating Asynchronous, Event-Based Applications
    By Tomasz Nurkiewicz, Ben Christensen
    

    https://www.youtube.com/watch?v=-UVeBTeyDB0



The scala.rx library  

Created by Li Haoyi 

https://github.com/lihaoyi/scala.rx 

Also Netflix library http://reactivex.io/

 

object main extends App {
    import rx._

    class BankAccount {
        val balance  = Var(0)

        def deposit(amount: Int)  = {
            if ( amount > 0 ) {
                balance() = balance.now + amount
            }

        }
        def withdraw (amount: Int) ={
            if (0 < amount && amount <= balance.now) {
                balance() = balance.now - amount
            }else throw new Error("insufficient funds")
        }
    }


    val a1, b1 = new BankAccount
    val c = Rx{a1.balance() + b1.balance()}

    println(c.now)
    a1 deposit 50
    println(c.now)

}

*/
()

<a name = 'pppro'></a>
## Parallel Programming in Scala

Codes in this section are mostly copied from <br>Viktor Kuncak, Aleksandar Prokopec course on coursera https://www.coursera.org/learn/parprog1/<br>
Aleksandar Prokopec course code:  https://github.com/axel22/parprog-snippets

<a name = 'thread'></a>
### Parallel Threads

In [55]:
class HelloWorld extends Thread{
    override def run(): Unit = {
        Thread.sleep(2000)
        println("hello world!")
    }
}

val t = new HelloWorld
t.start()
t.join() //This tells the main thread to wait till t completes. 
         //Without it, main will go on to the next line
println("I am from Main")

hello world!
I am from Main


In [56]:
// When the two parallel threads are modifying the same variable Count, 
// output is not deterministic 

var uidCount = 0L

def getUniqueID = {
    Thread.sleep(100)
    uidCount += 1
    uidCount
}

def startThread()={
    val t = new Thread{
        override def run() = {
            val uids = for ( x <- 0 until 10 ) yield getUniqueID
            println(uids)
        }
    }
    t.start()
}

startThread()
startThread()

In [57]:
//
// one can wrap it in a object and use synchronized, 
//then JVM will allow one thread access the object at a time

val x = new AnyRef {}
var uidCount = 0

def getUniqueID = x.synchronized {
    Thread.sleep(100)
    uidCount += 1
    uidCount
}

def startThread()={
    val t = new Thread{
        override def run() = {
            val uids = for ( x <- 0 until 10 ) yield getUniqueID
            println(uids)
        }
    }
    t.start()
}

startThread()
startThread()


In [58]:
// this causes deadlock

class Account(private var amount: Int = 0){
    def transfer(target: Account, n: Int)  {
        this.synchronized {        // the two locks 
            target.synchronized {  // each creats a lock, and they lock each other
                this.amount -= n
                target.amount += n
            }
        }

    }
}

def StartThread(from: Account, to : Account, n: Int) = {
    val t = new Thread {
        override def run () = {
            for (i <- 0 until n ) {
                from.transfer(to, 1)
            }
        }
    }
    t.start()
    t
}

val a = new Account(500)
val b = new Account(500)

val t = StartThread(a, b, 100)
val s = StartThread(b, a, 100)

t.join()
s.join()

In [59]:
// resolve deadlock by reverse the two locks


val x = new AnyRef {}
var uidCount = 0

def getUniqueID = x.synchronized {
    uidCount += 1
    uidCount
}


class Account(var amount: Int = 0){

    val uid = getUniqueID //each account gets an uniqueID

    def balance: Int = amount

    private def lockTransfer(target: Account, n: Int): Unit = {
        this.synchronized {
            target.synchronized {
                this.amount -= n
                target.amount += n
            }
        }
    }

    def transfer(target: Account, n: Int) {
        if (this.uid < target.uid)
            this.lockTransfer(target, n)   
        else
            target.lockTransfer(this, -n)  //flip from and to with negative transfer
    }


}

def StartThread(from: Account, to : Account, n: Int) = {
        val t = new Thread {
            override def run () = {
                for (i <- 0 until n ) {
                    from.transfer(to, 1)
                }
            }
        }
        t.start()
        t
    }

val a = new Account(500)
val b = new Account(500)

val t = StartThread(a, b, 100)
val s = StartThread(b, a, 200)

t.join()
s.join()

a.uid
 

1

In [60]:
//
// copied from https://github.com/axel22/parprog-snippets/blob/master/src/main/scala/common/package.scala
// this defines the parallel function will be used later
// 
object para {
    import java.util.concurrent._
    import scala.util.DynamicVariable

    val forkJoinPool = new ForkJoinPool

    val scheduler =
        new DynamicVariable[TaskScheduler](new DefaultTaskScheduler)


    def task[T](body: => T): ForkJoinTask[T] = {
        scheduler.value.schedule(body)
    }

    abstract class TaskScheduler {
        def schedule[T](body: => T): ForkJoinTask[T]
    }

    class DefaultTaskScheduler extends TaskScheduler {
        def schedule[T](body: => T): ForkJoinTask[T] = {
            val t = new RecursiveTask[T] {
                def compute = body
            }
            forkJoinPool.execute(t)
            t
        }
    }

    def parallel[A, B](taskA: => A, taskB: => B): (A, B) = {
        val right = task {
            taskB
        }
        val left = taskA
        (left, right.join())
    }
}

In [61]:
//
// Monte Carlo Pi in 4 parallel threads
//


import scala.util.Random

def mcCount(iter: Int) : Int = {
    val randomX = new Random()
    val randomY = new Random()
    var hits = 0
    for ( i <- 0 until iter ) {
        val x = randomX.nextDouble()
        val y = randomY.nextDouble()
        if (x*x + y*y < 1) hits += 1
    }
    hits
}

def monterCarloPiPar(iter: Int): Double = {
    val smalliter = iter/4
    val ((pi1,pi2),(pi3,pi4)) = para.parallel(
        para.parallel(mcCount(smalliter), mcCount(smalliter)),
        para.parallel(mcCount(smalliter), mcCount(iter-3*smalliter)))

    4.0*(pi1+pi2+pi3+pi4)/iter
}

monterCarloPiPar(40000)


3.1412

<a name = 'par'></a>
### Parallel Algorithms


In [62]:
/*
    Because of JIT compilation, run this cell at least once to get an answer
*/

//
// parallel merge sort
//

val rand = new Random()
val leng = 10000

//unlike List, elements of Array are mutable
val xs =  (for (i <- 0 until leng ) yield rand.nextInt(10000)).toArray
val ys = new Array[Int](leng)
 
def merge(src: Array[Int], dst: Array[Int], from: Int, mid: Int, to: Int) {
    var left = from
    var right = mid
    var i = from
    while (left < mid && right < to) {
        while (left < mid && src(left) <= src(right)) {
            dst(i) = src(left)
            i += 1
            left += 1
        }
        while (right < to && src(right) <= src(left)) {
            dst(i) = src(right)
            i += 1
            right += 1
        }
    }
    while (left < mid) {
        dst(i) = src(left)
        i += 1
        left += 1
    }
    while (right < to) {
        dst(i) = src(right)
        i += 1
        right += 1
    }
}

val maxDepth = 4 //parallel depth

def mergeSort(xs: Array[Int], from: Int, to: Int, depth: Int = 0): Unit= {
    if (depth == maxDepth) {
        java.util.Arrays.sort(xs, from, to) //this is java in-place quickSort 
        if (maxDepth == 0)
            println("Sorted" + xs.toList.take(10))
    }
    else{
        val mid = (from + to)/2
        mergeSort(xs,  from, mid, depth + 1)
        mergeSort(xs,  mid, to, depth + 1)
        val flip = (maxDepth - depth) % 2 == 0
        val src = if (flip) ys else xs
        val dst = if (flip) xs else ys
        merge(src, dst, from, mid, to)
        if (depth == 0)
            println("Sorted (first 10 itmes): " + dst.toList.take(10))
    }
}


println("Array (first 10 items): " + xs.toList.take(10))

val t0 = System.nanoTime()
mergeSort(xs,  0 , leng) 
println("\ntime for parallel merge sort: "+(System.nanoTime()-t0)/1e9+" seconds")


val t1 = System.nanoTime()
java.util.Arrays.sort(xs,  0 , leng) 
println("time for sequential quicksort: "+(System.nanoTime()-t1)/1e9+" seconds")



Array (first 10 items): List(9909, 917, 4866, 8537, 2226, 9163, 9208, 4529, 5129, 3310)
Sorted (first 10 itmes): List(0, 0, 0, 3, 4, 4, 7, 7, 8, 8)

time for parallel merge sort: 0.108155505 seconds
time for sequential quicksort: 0.095088366 seconds


In [63]:
// Parallelize
//    map, reduce/fold, scan on collections 
// 
println(List(1,2,3,4).map(2*_))
println(List(1,2,3,4).reduce(_+_))
println(List(1,2,3,4).fold(100)(_+_))
println(List(1,2,3,4).scan(100)(_+_))

List(2, 4, 6, 8)
10
110
List(100, 101, 103, 106, 110)


In [64]:
//
// parallel operations on collections: map on array
//


val leng = 1000000

val threshold = leng/10

val xs =  (for (i <- 0 until leng ) yield i).toArray
val ys = new Array[Int](leng)

def f(x:Int) = x*x


def mapSeqSeq[A, B](in: Array[A], out: Array[B], f: A => B, left: Int, right: Int): Unit = {
    var i = left
    while  ( i < right){
        out(i) = f(in(i))
        i += 1
    }
}


def mapSeqPar[A, B](in: Array[A], out: Array[B], f: A => B, left: Int, right: Int): Unit = {
    if (right-left < threshold  ) mapSeqSeq(in, out, f, left, right)
    else {
        val mid = left + (right - left) / 2
        para.parallel(mapSeqPar(in, out, f, left, mid), mapSeqPar(in, out, f, mid, right))
    }
}



val t0 = System.nanoTime()
mapSeqSeq(xs, ys, f, 0 , leng)
println("time for sequential map: "+(System.nanoTime()-t0)/1e9+" seconds")

val t1 = System.nanoTime()
mapSeqPar(xs, ys, f, 0 , leng)
println("time for parallel map: "+(System.nanoTime()-t1)/1e9+" seconds")

println("results: "+ys.toList.take(10))


time for sequential map: 0.134020695 seconds
time for parallel map: 0.17011698 seconds
results: List(0, 1, 4, 9, 16, 25, 36, 49, 64, 81)


In [65]:
//
// parallel operations on collections:  map on tree
//

sealed abstract class aTree[A] {val size: Int}
case class aLeaf[A](a: Array[A]) extends aTree[A]{  // leaf has no children nodes
override val size: Int = a.size               //  a is the data
override def toString: String = a.toList.toString
}
case class aNode[A](left: aTree[A], right: aTree[A]) extends aTree[A]{  //node has l/r children
override val size: Int = left.size + right.size                 //but has no data
override def toString: String = "{ "+left.toString +" , "+ right.toString + " }"
}


//trees are immutable, hence return a copy of a apply mapped tree
def mapTreePar(t: aTree[Int], f: Int => Int): aTree[Int] = t match {
    case aNode(l, r) => {
        val (lb, rb) = para.parallel(mapTreePar(l,f), mapTreePar(r,f))
        aNode(lb, rb)
    }
    case aLeaf(a) => {
        val len = a.length
        val b = new Array[Int](len)
        for (i <- 0 until len) b(i) = f(a(i))
        aLeaf(b)
    }
}

var count = 1
val leafSize = 4

def makeTree(n: Int): aTree[Int] = {
    if (n <= leafSize){
        val a = new Array[Int](leafSize)
        for (i <- 0 until n) {
            a(i) = count
            count += 1
        }
        aLeaf(a)
    }
    else
        aNode(makeTree(n/2) , makeTree(n-n/2))
}


val num = 20
print( mapTreePar(makeTree(num), (x:Int) => x) )



{ { { List(1, 2, 0, 0) , List(3, 4, 5, 0) } , { List(6, 7, 0, 0) , List(8, 9, 10, 0) } } , { { List(11, 12, 0, 0) , List(13, 14, 15, 0) } , { List(16, 17, 0, 0) , List(18, 19, 20, 0) } } }

In [66]:
//
// parallel operations on collections:  reduce / fold on tree
//


sealed abstract class Tree[A]

case class Leaf[A](value: A) extends Tree[A] {
    // leaf has no children nodes, only store value
    override def toString: String = value.toString
}
case class Node[A](left: Tree[A], right: Tree[A]) extends Tree[A]{  //node has l/r children

    override def toString: String = "( "+left.toString +" - "+ right.toString + " )"
}

// reducce a tree to s single element A
def reduceTreePar[A](t: Tree[A], f: (A, A) => A): A = t match {
    case Node(l, r) => {
        val (lb, rb) = para.parallel(reduceTreePar(l,f), reduceTreePar(r,f))
        f(lb, rb)
    }
    case Leaf(a) => a
}

val num = 10
var count = 0

def makeTree(n: Int): Tree[Int] = {
    if (n <= 1){
        count += 1
        Leaf(count)
    }
    else
        Node(makeTree(n/2) , makeTree(n-n/2))
}

val tt = makeTree(num)
val tt_reduce = reduceTreePar(tt, (x:Int,y:Int)=> x - y)
println(tt)
println(s"apply reducer at tree level, result = $tt_reduce")

( ( ( 22 - 23 ) - ( 24 - ( 25 - 26 ) ) ) - ( ( 27 - 28 ) - ( 29 - ( 30 - 31 ) ) ) )
apply reducer at tree level, result = 5


In [67]:
//
// parallel operations on collections:  scan on array  
//   the operation has to be associative, so the array will be converted to a (balance) reduction tree
//  if the operation is not associative, then right/left scan is essentially a one-sided tree
//  so parallelism has no advantages
//
//  O(log n)
//

sealed abstract class TreeRes[A]{var res: A}

case class LeafRes[A](override var res: A) extends TreeRes[A] {
    // leaf has no children nodes, only store value
    override def toString: String = res.toString
}
case class NodeRes[A](left: TreeRes[A], override var res: A , right: TreeRes[A]) extends TreeRes[A]{  //node has l/r children

    //override def toString: String =  "{"+ left.toString +", "+res+", "+ right.toString +"}"
    override def toString: String =  left.toString +", "+ right.toString
} 

val in = (for (i <- 1 to 10) yield i).toArray
val a = 100 //fold accumulator
val out = new Array[Int](in.length+1)
out(0) = a

//use the intermediate tree created by makeTree, apply the accumulator a and get the correct sequence
def downsweep(t: TreeRes[Int], a: Int, f:(Int, Int)=>Int ,left: Int, right: Int): Unit = t match {
    case  LeafRes(v) => {
         out(left+1) = Integer.parseInt(f(a, t.res).toString);
    }
    case NodeRes(l, v, r) => {
        val mid = left + ( right - left ) / 2
         para.parallel(downsweep(l, a, f, left, mid), downsweep(r,f(a, l.res),f, mid, right))
    }
}

//create a temp tree, and apply f to the tree structure and store the intermediate result to the node
def makeTree(in: Array[Int],  f:(Int,Int) => Int, left: Int, right: Int): TreeRes[Int] = {
    if (left == right - 1){
        LeafRes(in(left))
    }
    else{
        val mid = left + ( right - left ) / 2
        val (tl, tr) =  para.parallel(makeTree(in, f, left, mid) , makeTree(in, f, mid, right))
        NodeRes(tl,f(tl.res, tr.res) , tr)
    }
}

def scanLeft(in: Array[Int], a: Int, f:(Int,Int) => Int) = {
    val t1 = makeTree(in,   f, 0 , in.length) 
    downsweep(t1, a, f, 0 , in.length)
}



println (in.toList)

scanLeft(in, a,  (x:Int, y:Int) => x+y)

println (out.toList)


List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


Name: Compile Error
Message: <console>:69: error: type mismatch;
 found   : Account
 required: Int
       scanLeft(in, a,  (x:Int, y:Int) => x+y)
                    ^
StackTrace: 

<a name ='dataStr'></a>
### Parallel Data Structures

Scala Collections and Parallel Collections Hierarchy
![](http://docs.scala-lang.org/resources/images/parallel-collections-hierarchy.png)

In [68]:
//
// Par 
//

for (i <- (1 to 20).par)  yield i

ParVector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

In [69]:
import scala.collection.mutable.ListBuffer
var ll = new ListBuffer[Int]()
for (i <- (1 to 20).par) {
    ll += i  
}
println(ll)  // showing the data are processing in parallel and the order is not deterministic

ListBuffer(11, 8, 18, 3, 9, 12, 17, 19, 4, 10, 20, 5, 13, 14, 15, 7, 6, 2, 1)


In [70]:
//
// Just make sure, writing and reading are not atomic
// 
// Calculate Pi using Par

import scala.util.Random
 
val randomX = new Random()
val randomY = new Random()
var hits = 0
var nonhits = 0

for ( i <- (0 until 2000000).par ) {
    val x = randomX.nextDouble()
    val y = randomY.nextDouble()
    if (x*x + y*y < 1) hits += 1   // only writing to hits/nonhits
    else nonhits += 1              // no reading from hits/nonhits  
}

println("estimate pi : " + (4.0*hits)/(hits+nonhits))

estimate pi : 3.1363150640927047


In [71]:
//
// let us check the time 
//

val t0 = System.nanoTime()
(1 to 100000000).par.count(i => i%3==0)
println("time for parallel: "+(System.nanoTime()-t0)/1e9+" seconds")

val t1 = System.nanoTime()
(1 to 100000000).count(i => i%3==0)
println("time for sequential: "+(System.nanoTime()-t1)/1e9+" seconds")

time for parallel: 0.460319134 seconds
time for sequential: 1.46092184 seconds


In [72]:
// the count method above may run in parallel or sequential in the split data set 
// and it bases on sum method

// So how parallel sum works in parallel?

// it first distributes data parallel, then use the parallel fold method we discussed before 
// or sequential fold if the redistribute sets are not too big.


def sum(xs: Array[Int]): Int = xs.par.fold(0)(_+_)

// similar idea apply to max, because they are associative op

def max(xs: Array[Int]): Int = xs.par.fold(Int.MaxValue)(math.max)



In [73]:
// fold is more restrict because it requires the two operands to be same type

// Use aggregate to replace fold

// a sequence of type A, accumulator in type B

// def aggregate[B](z: ⇒ B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B

Array("Hello","How","are","you").par.
    aggregate(0)((count, c)=> if (c.length == 3) count + 1 else count, _+_)

3

 Finally we discuss how par works? first split (splitter) the collection then combine (combiner) them. 
 Both should be in O(log N).
 
 Ref: http://docs.scala-lang.org/overviews/parallel-collections/custom-parallel-collections.html
  
 

In [74]:

val A = Iterator(1,2,3,4, 5)   //Iterator is a base of all collections
while (A.hasNext) println(A.next()) 
A // hence A is a pointer attached to the first element in the array

1
2
3
4
5


empty iterator

In [75]:
// sequence splitter starts on line 542 
// https://github.com/scala/scala/blob/v2.12.0-RC1/src/library/scala/collection/parallel/RemainsIterator.scala#L542

trait Iterator[A]{
    def next() : A
    def hasNext: Boolean
}

trait Splitter[A] extends Iterator[A]{
    def split: Seq[Splitter[A]]
    def remaining: Int
}

trait Builder[A, B] {  //A type of element of collection, B type of the collection, 
    def +=(elem: A) : Builder[A, B]
    def result: B
}

trait Combiner[A, B] extends Builder[A,B]{
    def combine(that: Combiner[A,B]): Combiner[A,B]
}

<a name='akka'></a>
### Concurrency & Actors Model

message-oriented programming model and actor model. No share memory and instead passing variables like in MPI.  Java library

http://akka.io


In [76]:
/*
    Run this cell in an ide, it will print "action received" 
    
*/


object main extends App {
    import akka.actor._

    class Action extends Actor {
        def receive = {
            case "action" => println("action received")
            case _ => println("something else")
        }
    }

    val A = ActorSystem("Action")
    val act = A.actorOf(Props[Action], "Action")

    act ! "action"   //the String "action" is passed to the actor on a separated thread

} 

<a name='Spark'></a>
# Big Data, Distributed Analysis with Spark

Spark is better than Hadoop MapReduce, because in addition it is 10 times faster and it runs on Hadoop clusters, Spark provides in-memory caching and real-time data processing. 

It has four components:
 - Spark SQL (similar to hive in hadoop)
 - Spark Streaming (similar to Flink)
 - GraphX
 - ML lib
 

![](https://databricks.com/wp-content/uploads/2016/06/spark-logo-trademark.png)
 


<a name = 'scalaspark'></a>  
## Spark Streaming - Twitter Sentiment Analysis in Real Time

This following app will process twitter feeds in real time  and produces the happiest hashtag from the last 60 seconds.



copied from https://github.com/apache/bahir/blob/master/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterHashTagJoinSentiments.scala
   

Berkeley AMP Lab http://ampcamp.berkeley.edu/3/exercises/realtime-processing-with-spark-streaming.html

Databricks https://github.com/databricks/spark-training/blob/master/streaming/scala/TutorialHelper.scala

In [77]:
/*

object main extends App {
 

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.twitter.TwitterUtils

 
    //   OAuth credentials
    val consumerKey = "hth8cYGEudYJAmKWfjmEKieAbu"
    val consumerSecret = "dd5dGzoODyhR4cZTdPHKJTIPJEs9CeP2qPjxDjiDSKdogUEj"
    val accessToken = "2813163296-mIzkxDQKt0k1LWTUhfsWDRvf4lshovH84u3J3m"
    val accessTokenSecret = "45RN05C7vc5aqhHl3paogTx2DGk1zB8L7Ta3nU4vDsfDAz"
       
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

    val sparkConf = new SparkConf().setAppName("HStreet").setMaster("local[2]") 
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None)

    val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

    // Read in the word-sentiment list and create a static RDD from it
    //save https://raw.githubusercontent.com/apache/bahir/master/streaming-twitter/examples/data/AFINN-111.txt
    val wordSentimentFilePath = "AFINN-111.txt"
    val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
        val Array(word, happinessValue) = line.split("\t")
        (word, happinessValue.toInt)
    }.cache() 

    // Determine the hash tags with the highest sentiment values by joining the streaming RDD
    // with the static RDD inside the transform() method and then multiplying
    // the frequency of the hash tag by its sentiment value
    val happiest60 = hashTags.map(hashTag => (hashTag.tail, 1))
            .reduceByKeyAndWindow(_ + _, Seconds(60))
            .transform{topicCount => wordSentiments.join(topicCount)}
            .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
            .map{case (topic, happinessValue) => (happinessValue, topic)}
            .transform(_.sortByKey(false))

    // Print hash tags with the most positive sentiment values
    happiest60.foreachRDD(rdd => {
        val topList = rdd.take(10)
        println("\nHappiest topics in last 60 seconds (%s total):".format(rdd.count()))
        topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
    })
 
    ssc.start()
    ssc.awaitTermination()

}
*/
()

<a name = "ssql"></a> 
## Spark GraphX - Page Ranking Application

http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html

Data: https://github.com/databricks/spark-training/tree/master/data/graphx
There are two columns in graphx-wiki-vertices.txt, one is the id, the other is an English key phrase. Each line in graphx-wiki-edges.txt is a connection by 2 ids. This makes an undirected graph.

E.g. from the graph if "A" is connected to "B", "C" and "D" (first degree connection), then a web page that contains words "B",  "C" and "D" should have higher rank to the key search word "A".

For details http://www.math.cornell.edu/~mec/Winter2009/RalucaRemus/Lecture3/lecture3.html


In [78]:
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val vertexArray = Array(
  (1L, ("Alice", 28)),
  (2L, ("Bob", 27)),
  (3L, ("Charlie", 65)),
  (4L, ("David", 42)),
  (5L, ("Ed", 55)),
  (6L, ("Fran", 50))
  )
val edgeArray = Array(
  Edge(2L, 1L, 7),  //the weigth indicating how much one likes the other
  Edge(2L, 4L, 2),
  Edge(3L, 2L, 4),
  Edge(3L, 6L, 3),
  Edge(4L, 1L, 1),
  Edge(5L, 2L, 2),
  Edge(5L, 3L, 8),
  Edge(5L, 6L, 3)
  )

val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)

In [79]:
// display the edage show who likes who

for (triplet <- graph.triplets.collect) {
  println(s"${triplet.srcAttr._1} likes ${triplet.dstAttr._1}")
}

                                                                                Bob likes Alice
Bob likes David
Charlie likes Bob
Charlie likes Fran
David likes Alice
Ed likes Bob
Ed likes Charlie
Ed likes Fran


<a name = "sml"></a> 
## Spark MLlib 

### ALS - Movie Recommendation System 

The following program will output movie recommendations to users, using alternative least square matrix factorization.

From users who have similar movie rating preference, to predict what a user may want to watch.


Three data files: 

users.dat contains 6000 active users (each users have rated > 20 movies)   

    UserID::Gender::Age::Occupation::Zip-code

movies.dat contais 4000 movies 

    MovieID::Title::Genres
    
ratings.dat contains 1 million ratings, the scale is 1-5 

    UserID::MovieID::Rating::Timestamp

<br>

Data: https://github.com/databricks/spark-training/tree/master/data/movielens/medium


Explanation: http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html



In [80]:
//modified from https://github.com/databricks/spark-training/blob/master/machine-learning/scala/solution/MovieLensALS.scala

import java.io.File
import scala.io.Source
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}

//load ratings
val ratings = sc.textFile("ratings.dat").map { line =>
        val fields = line.split("::") 
      (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
    }

val numRatings = ratings.count()
val numUsers = ratings.map(_._2.user).distinct().count()
val numMovies = ratings.map(_._2.product).distinct().count()

println("We have " + numRatings + " number of ratings from "
    + numUsers + " users on " + numMovies + " movies.")

We have 1000209 number of ratings from 6040 users on 3706 movies.


In [81]:
//load movies

val movies = sc.textFile("movies.dat").map { line =>
        val fields = line.split("::") 
        (fields(0).toInt, fields(1)) }.collect().toMap

In [82]:
//split data into training, test and cache RDD

val numPartitions = 4
val training = ratings.filter(x => x._1 < 6).values.repartition(numPartitions).cache()
val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8).values.repartition(numPartitions).cache()
val test = ratings.filter(x => x._1 >= 8).values.cache()

val numTraining = training.count()
val numValidation = validation.count()
val numTest = test.count()
    
println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)

Training: 602241, validation: 198919, test: 199049


In [83]:
// ALS algorithm
// spark.apache.org/docs/0.8.1/api/mllib/org/apache/spark/mllib/recommendation/package.html


//hyperparameters
val ranks = List(8, 12)
val lambdas = List(0.1, 10.0)
val numIters = List(10, 20)

var bestModel: Option[MatrixFactorizationModel] = None
var bestValidationRmse = Double.MaxValue
var bestRank = 0
var bestLambda = -1.0
var bestNumIter = -1

def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = {
    val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
    val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating)).join(
        data.map(x => ((x.user, x.product), x.rating))).values
    math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n)
}


val t1 = System.nanoTime()
for (rank <- ranks; lambda <- lambdas; numIter <- numIters) 
{
    val model = ALS.train(training, rank, numIter, lambda)  //training
    val validationRmse = computeRmse(model, validation, numValidation) //error on validation set
    
    println("RMSE = " + f"$validationRmse%1.3f" + " for the model trained with rank = " 
        + rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
    if (validationRmse < bestValidationRmse) 
    {
            bestModel = Some(model)
            bestValidationRmse = validationRmse
            bestRank = rank
            bestLambda = lambda
            bestNumIter = numIter
    }
}
println("\ntime for training: "+(System.nanoTime()-t1)/1e9+" seconds")

RMSE = 0.880 for the model trained with rank = 8, lambda = 0.1, and numIter = 10.
RMSE = 0.872 for the model trained with rank = 8, lambda = 0.1, and numIter = 20.
RMSE = 3.756 for the model trained with rank = 8, lambda = 10.0, and numIter = 10.
RMSE = 3.756 for the model trained with rank = 8, lambda = 10.0, and numIter = 20.
RMSE = 0.877 for the model trained with rank = 12, lambda = 0.1, and numIter = 10.
RMSE = 0.871 for the model trained with rank = 12, lambda = 0.1, and numIter = 20.
RMSE = 3.756 for the model trained with rank = 12, lambda = 10.0, and numIter = 10.
RMSE = 3.756 for the model trained with rank = 12, lambda = 10.0, and numIter = 20.

time for training: 32.996611969 seconds


In [84]:
// evaluate the best model on the test set

val testRmse = computeRmse(bestModel.get, test, numTest)
    println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda
+ ", and numIter = " + bestNumIter + "\nThe RMSE on the test set is " +  f"$testRmse%1.3f"+ ".")


The best model was trained with rank = 12 and lambda = 0.1, and numIter = 20
The RMSE on the test set is 0.869.


<a name='otherML'></a>
### Other Big Data Machine Learning Algorithms & Big Data Statistics Tools

Import Statistics

    org.apache.spark.mllib.stat.Statistics.colStats

Import Optimization

    org.apache.spark.mllib.optimization.{LBFGS, LogisticGradient, SquaredL2Updater}
    
Classification

    org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
    org.apache.spark.mllib.classification{LogisticRegressionModel, SVMModels, NaiveBayes}
    
Regression

    org.apache.spark.mllib.regression.{LassoWithSGD, RidgeRegressionWithSGD}
    
Clustering

    org.apache.spark.mllib.clustering.{KMeans, LDA}

Dimensional reduction 

    mat.computePrincipalComponents
    mat.computeSVD
    
Big Data TimeSeries

    Spark-ts(Cloudera) https://github.com/sryza/spark-timeseries
    
Bid Data Neural Net
    
    TensorFrames https://github.com/databricks/tensorframes
    

<a name ='Amazon'></a>  
## The Clouds

### AWS Ecosystem


Courtesy Keith Steward slice Big Data Architectural Patterns and Best Practices on AWS http://www.slideshare.net/AmazonWebServices/big-data-architectural-patterns-and-best-practices-on-aws-67650658

![](https://raw.githubusercontent.com/ronnnwu/DistributedDeepLearning/master/arch.png)


Get estimate costs

https://calculator.s3.amazonaws.com/index.html

Hot - Cool means data transfer latency, throughput and the usage of storage is mainly for adding new data, or rewritting old data. ec2 (elastic computing cloud, auto scaling, virtual server), S3 (simple storage service, massive and redundant), Kinesis (stream, up to 7 days live data).

<a name ='google'></a> 
### Google Clouds ML With TensorFlow


Reference: 

Kaz Sato, Google Cloud Platform Empowers TensorFlow and Machine Learning
http://www.slideshare.net/HadoopSummit/google-cloud-platform-empowers-tensorflow-and-machine-learning

https://research.googleblog.com/2016/03/machine-learning-in-cloud-with.html

Jeff Dean Talk on Google Cloud https://www.youtube.com/watch?v=ud2Ipnq0pTU


<a name ='cuda'></a>
# Parallel with CUDA, C++

![](http://images.anandtech.com/doci/6839/nvidia-cuda2.png)