# Partitioning
    - large operation을 효율적으로 사용하기 위해서 데이터를 partitioning을 하여 분산처리
        - large operation: join(), reduceByKey(), groupBykey()
    - executor를 더 많이 가져가는 방법
    - 너무 많이가져가도 비효율적
        - 너무 많이 쪼개고, 붙이면 오히려 비 효율적
    - spark 작동 원리
        1) shuffle data
        2) individual tasks로 구분
        3) distributed to each node(executor) of the cluster


- groupBy()전에 moviePairs를 partitioning함

```scala
val moviePairs = uniqueJoinedRatings.map(makePairs).partitionBy(new HashPartitioner(100))
```

---

# 13 예제에서 partitionning을 적용한 예제

In [1]:
import org.apache.spark._
import scala.io.{Codec, Source}
import java.nio.charset.CodingErrorAction

Intitializing Scala interpreter ...

Spark Web UI available at http://163.152.---.---:----
SparkContext available as 'sc' (version = 2.3.1, master = local[*], app id = local-1554289479030)
SparkSession available as 'spark'


import org.apache.spark._
import scala.io.{Codec, Source}
import java.nio.charset.CodingErrorAction


In [2]:
implicit val codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

codec: scala.io.Codec = UTF-8
res0: codec.type = UTF-8


---

# load

In [3]:
val lines = Source.fromFile("data/u.item").getLines()

lines: Iterator[String] = non-empty iterator


In [4]:
val tmp = lines.toList.take(1)(0)

tmp: String = 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0


In [5]:
tmp.split("\\|")

res1: Array[String] = Array(1, Toy Story (1995), 01-Jan-1995, "", http://us.imdb.com/M/title-exact?Toy%20Story%20(1995), 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)


In [6]:
def loadMovieNames() : Map[Int, String] = {    
    
    var movieNames: Map[Int, String] = Map()
    val lines = Source.fromFile("data/u.item").getLines()

    for (line <- lines) {
      val fields = line.split("\\|")
      movieNames += (fields(0).toInt -> fields(1))
    }

    movieNames
}


loadMovieNames: ()Map[Int,String]


In [7]:
val movieNames = loadMovieNames()

movieNames: Map[Int,String] = Map(645 -> Paris Is Burning (1990), 892 -> Flubber (1997), 69 -> Forrest Gump (1994), 1322 -> Metisse (Caf� au Lait) (1993), 1665 -> Brother's Kiss, A (1997), 1036 -> Drop Dead Fred (1991), 1586 -> Lashou shentan (1992), 1501 -> Prisoner of the Mountains (Kavkazsky Plennik) (1996), 809 -> Rising Sun (1993), 1337 -> Larger Than Life (1996), 1411 -> Barbarella (1968), 629 -> Victor/Victoria (1982), 1024 -> Mrs. Dalloway (1997), 1469 -> Tom and Huck (1995), 365 -> Powder (1995), 1369 -> Forbidden Christ, The (Cristo proibito, Il) (1950), 138 -> D3: The Mighty Ducks (1996), 1190 -> That Old Feeling (1997), 1168 -> Little Buddha (1993), 760 -> Screamers (1995), 101 -> Heavy Metal (1981), 1454 -> Angel and the Badman (1947), 1633 -> � k�ldum klaka (Cold Fever) (1...

---

# parseLine

In [8]:
val data = sc.textFile("data/u.data")

data: org.apache.spark.rdd.RDD[String] = data/u.data MapPartitionsRDD[1] at textFile at <console>:32


- 1 col: User ID
- 2 col: Movie ID
- 3 col: Rating
- 4 col: Timestamp

In [9]:
data.take(1)(0).split("\t")

res2: Array[String] = Array(196, 242, 3, 881250949)


In [10]:
type Rating = (Int, Double)

def parseLine(line: String): (Int, Rating) = {
    val fields = line.split("\t")
    (fields(0).toInt, (fields(1).toInt, fields(2).toDouble))
}

val ratings = data.map(parseLine)

defined type alias Rating
parseLine: (line: String)(Int, Rating)
ratings: org.apache.spark.rdd.RDD[(Int, Rating)] = MapPartitionsRDD[2] at map at <console>:41


In [11]:
ratings

res3: org.apache.spark.rdd.RDD[(Int, Rating)] = MapPartitionsRDD[2] at map at <console>:41


In [12]:
ratings.take(10).foreach(println)

(196,(242,3.0))
(186,(302,3.0))
(22,(377,1.0))
(244,(51,2.0))
(166,(346,1.0))
(298,(474,4.0))
(115,(265,2.0))
(253,(465,5.0))
(305,(451,3.0))
(6,(86,3.0))


---

# join

In [13]:
val shipMap = sc.parallelize(Array((1, "Enterprise"),
                  (1, "Enterprise-D"),
                  (2, "Deep Space Nine"),
                  (2 -> "Voyager")))

shipMap: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[3] at parallelize at <console>:32


- 자기 자신을 join할 경우 같은 key(user)에 대한 모든 조합의 경우의 수(두개의 영화)로 concat됨

In [14]:
val joined_shipMap = shipMap.join(shipMap)

joined_shipMap: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[6] at join at <console>:34


In [15]:
joined_shipMap.foreach(println)

(1,(Enterprise,Enterprise))
(2,(Deep Space Nine,Deep Space Nine))
(1,(Enterprise,Enterprise-D))
(2,(Deep Space Nine,Voyager))
(1,(Enterprise-D,Enterprise))
(2,(Voyager,Deep Space Nine))
(1,(Enterprise-D,Enterprise-D))
(2,(Voyager,Voyager))


In [16]:
val joinedRatings = ratings.join(ratings)

joinedRatings: org.apache.spark.rdd.RDD[(Int, (Rating, Rating))] = MapPartitionsRDD[9] at join at <console>:38


In [17]:
joinedRatings.take(5).foreach(println)

(778,((94,2.0),(94,2.0)))
(778,((94,2.0),(78,1.0)))
(778,((94,2.0),(7,4.0)))
(778,((94,2.0),(1273,3.0)))
(778,((94,2.0),(265,4.0)))


---

# unique filter

In [18]:
val ratingPair = joinedRatings.take(2)(1)

ratingPair: (Int, (Rating, Rating)) = (778,((94,2.0),(78,1.0)))


In [19]:
val firstRating = ratingPair._2._1
val secondRating = ratingPair._2._2

firstRating: Rating = (94,2.0)
secondRating: Rating = (78,1.0)


In [20]:
firstRating._1

res7: Int = 94


In [21]:
secondRating._2

res8: Double = 1.0


In [22]:
firstRating._1 < secondRating._2

res9: Boolean = false


In [23]:
 def filterDuplicates(ratingPair: (Int, (Rating, Rating))): 
    Boolean = {
    val firstRating = ratingPair._2._1
    val secondRating = ratingPair._2._2
    //unique
    firstRating._1 < secondRating._1
  }

val uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)

filterDuplicates: (ratingPair: (Int, (Rating, Rating)))Boolean
uniqueJoinedRatings: org.apache.spark.rdd.RDD[(Int, (Rating, Rating))] = MapPartitionsRDD[10] at filter at <console>:53


In [24]:
uniqueJoinedRatings.take(10).foreach(println)

(778,((94,2.0),(1273,3.0)))
(778,((94,2.0),(265,4.0)))
(778,((94,2.0),(239,4.0)))
(778,((94,2.0),(193,4.0)))
(778,((94,2.0),(1035,1.0)))
(778,((94,2.0),(616,4.0)))
(778,((94,2.0),(230,2.0)))
(778,((94,2.0),(582,1.0)))
(778,((94,2.0),(262,4.0)))
(778,((94,2.0),(238,3.0)))


---

# makePairs

In [25]:
val _ratings = uniqueJoinedRatings.take(1)(0)

_ratings: (Int, (Rating, Rating)) = (778,((94,2.0),(1273,3.0)))


In [26]:
val _firstRating = _ratings._2._1
val _secondRating = _ratings._2._2

_firstRating: Rating = (94,2.0)
_secondRating: Rating = (1273,3.0)


In [27]:
(_firstRating._1, _secondRating._1) -> (_firstRating._2, _secondRating._2)

res11: ((Int, Int), (Double, Double)) = ((94,1273),(2.0,3.0))


In [28]:
def makePairs(ratings: (Int, (Rating, Rating))): ((Int, Int), (Double, Double)) = {
    val firstRating = ratings._2._1
    val secondRating = ratings._2._2

    (firstRating._1, secondRating._1) -> (firstRating._2, secondRating._2)
  }

val moviePairs = uniqueJoinedRatings.map(makePairs).partitionBy(new HashPartitioner(100))

makePairs: (ratings: (Int, (Rating, Rating)))((Int, Int), (Double, Double))
moviePairs: org.apache.spark.rdd.RDD[((Int, Int), (Double, Double))] = ShuffledRDD[12] at partitionBy at <console>:55


In [29]:
// ((movie_id1, movie_id2),(rating1, rating2))
moviePairs.take(10).foreach(println)

((7,405),(4.0,3.0))
((239,496),(4.0,1.0))
((193,262),(4.0,4.0))
((262,268),(4.0,2.0))
((262,712),(4.0,3.0))
((121,209),(3.0,4.0))
((204,738),(4.0,1.0))
((234,629),(3.0,2.0))
((180,197),(4.0,4.0))
((11,367),(5.0,5.0))


In [30]:
val moviePairsRatings = moviePairs.groupByKey()

moviePairsRatings: org.apache.spark.rdd.RDD[((Int, Int), Iterable[(Double, Double)])] = MapPartitionsRDD[13] at groupByKey at <console>:51


---

#  mapvalues
    - value에 대해서만 연산하여 (key, value)출력

In [31]:
val shipMap = sc.parallelize(Array((1, "Enterprise"),
                  (1, "Enterprise-D"),
                  (2, "Deep Space Nine"),
                  (2 -> "Voyager")))

shipMap: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:32


- x $\rightarrow$ tuple(1, "Enterprise")

In [32]:
shipMap.map(x => x._1).collect()

res13: Array[Int] = Array(1, 1, 2, 2)


- For each key, x $\rightarrow$ value("Enterprise")

In [33]:
shipMap.mapValues(x => x(0)).collect()

res14: Array[(Int, Char)] = Array((1,E), (1,E), (2,D), (2,V))


---

# groupby

In [34]:
val things =sc.parallelize(List(("animal", "bear"),
             ("animal", "duck"),
             ("plant", "cactus"), 
             ("vehicle", "speed boat"),
             ("vehicle", "school bus")))

things: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[17] at parallelize at <console>:32


In [35]:
things.groupByKey().foreach(println)

(plant,CompactBuffer(cactus))
(animal,CompactBuffer(bear, duck))
(vehicle,CompactBuffer(speed boat, school bus))


---

# Cosine similarity

In [36]:
var numPairs: Int = 0
var sum_xx: Double = 0.0
var sum_yy: Double = 0.0
var sum_xy: Double = 0.0

numPairs: Int = 0
sum_xx: Double = 0.0
sum_yy: Double = 0.0
sum_xy: Double = 0.0


In [37]:
// iterator = (key, value)
// moviePairsRatings = (key, iterator)
moviePairsRatings

res16: org.apache.spark.rdd.RDD[((Int, Int), Iterable[(Double, Double)])] = MapPartitionsRDD[13] at groupByKey at <console>:51


- for example

In [38]:
moviePairs

res17: org.apache.spark.rdd.RDD[((Int, Int), (Double, Double))] = ShuffledRDD[12] at partitionBy at <console>:55


In [39]:
val tmp : (Int, Int) = (94,1273)
val ratingPairs = moviePairs.filter(x =>  x._1 == tmp).map(x=>x._2).collect()

tmp: (Int, Int) = (94,1273)
ratingPairs: Array[(Double, Double)] = Array((2.0,3.0), (5.0,2.0), (2.0,2.0), (4.0,3.0), (2.0,2.0), (3.0,2.0), (2.0,2.0), (4.0,2.0), (4.0,3.0))


In [40]:
ratingPairs.foreach(println)

(2.0,3.0)
(5.0,2.0)
(2.0,2.0)
(4.0,3.0)
(2.0,2.0)
(3.0,2.0)
(2.0,2.0)
(4.0,2.0)
(4.0,3.0)


In [41]:
val pair = ratingPairs(0)

pair: (Double, Double) = (2.0,3.0)


In [42]:
val ratingX = pair._1
val ratingY = pair._2

ratingX: Double = 2.0
ratingY: Double = 3.0


In [43]:
sum_xx += ratingX * ratingX
sum_yy += ratingY * ratingY
sum_xy += ratingX * ratingY

In [44]:
numPairs += 1

In [45]:
ratingPairs(0)

res21: (Double, Double) = (2.0,3.0)


In [46]:
var numPairs: Int = 0
var sum_xx: Double = 0.0
var sum_yy: Double = 0.0
var sum_xy: Double = 0.0

for (pair <- ratingPairs) {
  val ratingX = pair._1
  val ratingY = pair._2

  sum_xx += ratingX * ratingX
  sum_yy += ratingY * ratingY
  sum_xy += ratingX * ratingY
  numPairs += 1
}

numPairs: Int = 9
sum_xx: Double = 98.0
sum_yy: Double = 51.0
sum_xy: Double = 66.0


In [47]:
val numerator: Double = sum_xy
val denominator = Math.sqrt(sum_xx) * Math.sqrt(sum_yy)

numerator: Double = 66.0
denominator: Double = 70.69653456853455


In [48]:
var score:Double = 0.0
if (denominator != 0) {
  score = numerator / denominator
}

score: Double = 0.9335676833780072


In [49]:
(score, numPairs)

res24: (Double, Int) = (0.9335676833780072,9)


- cache(): reuse the results

In [50]:
def computeCosineSimilarity(ratingPairs: Iterable[(Double, Double)]): (Double, Int) = {
    var numPairs: Int = 0
    var sum_xx: Double = 0.0
    var sum_yy: Double = 0.0
    var sum_xy: Double = 0.0

    for (pair <- ratingPairs) {
      val ratingX = pair._1
      val ratingY = pair._2

      sum_xx += ratingX * ratingX
      sum_yy += ratingY * ratingY
      sum_xy += ratingX * ratingY
      numPairs += 1
    }

    val numerator: Double = sum_xy
    val denominator = Math.sqrt(sum_xx) * Math.sqrt(sum_yy)

    var score:Double = 0.0
    if (denominator != 0) {
      score = numerator / denominator
    }

    (score, numPairs)
}
val moviePairSimilarities = moviePairsRatings.mapValues(computeCosineSimilarity).cache() //reuse results

computeCosineSimilarity: (ratingPairs: Iterable[(Double, Double)])(Double, Int)
moviePairSimilarities: org.apache.spark.rdd.RDD[((Int, Int), (Double, Int))] = MapPartitionsRDD[21] at mapValues at <console>:97


In [51]:
val scoreThreshold = 0.60
val coOccurenceThreshold = 50.0
var movieId = 94 
val filteredResults = moviePairSimilarities.filter(x => {
    //pair = (movie_id1, movie_id2)
    val pair = x._1
    //sim = (rating_sim, num)
    val sim = x._2
    (pair._1 == movieId || pair._2 == movieId) &&
      sim._1 > scoreThreshold && sim._2 > coOccurenceThreshold
})
val results = filteredResults.map(x => (x._2, x._1)).sortByKey(ascending = false).take(10)


scoreThreshold: Double = 0.6
coOccurenceThreshold: Double = 50.0
movieId: Int = 94
filteredResults: org.apache.spark.rdd.RDD[((Int, Int), (Double, Int))] = MapPartitionsRDD[22] at filter at <console>:77
results: Array[((Double, Int), (Int, Int))] = Array(((0.9586049379734621,66),(73,94)), ((0.9540093745210545,124),(94,204)), ((0.953888417198362,117),(94,174)), ((0.9514502380030145,67),(94,742)), ((0.9513610928045688,80),(94,208)), ((0.9512871292224748,105),(79,94)), ((0.9511736470315294,56),(94,232)), ((0.9511281290554398,65),(94,692)), ((0.9487881988287913,66),(94,258)), ((0.9481276017281098,119),(94,210)))


- movieName = Map(idx => name)

In [52]:
println("\nTop similar movies (max of 10) for " + movieNames(movieId))


Top similar movies (max of 10) for Home Alone (1990)


In [53]:
//(sim, (movie_id1, movie_id2))
results

res26: Array[((Double, Int), (Int, Int))] = Array(((0.9586049379734621,66),(73,94)), ((0.9540093745210545,124),(94,204)), ((0.953888417198362,117),(94,174)), ((0.9514502380030145,67),(94,742)), ((0.9513610928045688,80),(94,208)), ((0.9512871292224748,105),(79,94)), ((0.9511736470315294,56),(94,232)), ((0.9511281290554398,65),(94,692)), ((0.9487881988287913,66),(94,258)), ((0.9481276017281098,119),(94,210)))


In [54]:
val result = results(0)

result: ((Double, Int), (Int, Int)) = ((0.9586049379734621,66),(73,94))


In [55]:
val _sim = result._1
val _pair = result._2

_sim: (Double, Int) = (0.9586049379734621,66)
_pair: (Int, Int) = (73,94)


In [56]:
var similarMovieId = _pair._1

similarMovieId: Int = 73


In [57]:
movieId

res27: Int = 94


In [58]:
similarMovieId == movieId

res28: Boolean = false


In [None]:
for (result <- results) {
    val sim = result._1
    val pair = result._2

    var similarMovieId = pair._1
    if (similarMovieId == movieId) {
      similarMovieId = pair._2
    }
    println(movieNames(similarMovieId) + "\tscore: " + sim._1 + "\tstrength: " + sim._2)
}