<h1 id="tocheading">Table of Contents</h1>
<div id="toc"></div>

In [3]:
%%javascript
$.getScript('https://kmahelona.github.io/ipython_notebook_goodies/ipython_notebook_toc.js')

# Chapter 4: Working with Key/Value Pairs

## Pair RDDs
* Spark provides special operations on RDDs containing key/value pairs - these RDDs are called pair RDDs
* We can use the <code>map()</code> function to create a pair RDD as shown below
* Pair RDDs are still RDDs (of Tuple2 objects in Java/Scala or of Python tuples) 

### Example: Creating a pair RDD using the first word as the key

In [27]:
val lines = sc.textFile("name_example.txt")
val rddpair = lines.map(x => (x.split(" ")(0),x))


println("Results from map: ")
rddpair.collect().foreach(println)

println("\nResults from lookup: ")
rddpair.lookup("her").foreach(println)

Results from map: 
(his,his name is pat)
(his,his name is peter)
(his,his name is olaf)
(her,her name is Joanne)
(her,her name is Therese)
(they,they work at algebraix in Encintas )
(they,they like to eat yogurt )

Results from lookup: 
her name is Joanne
her name is Therese


## Transformations on Pair RDDs

### Transformation on one pair RDD
* Pair RDDs are allowed to use all the transformations available to standard RDDs
* The same rules apply from "Passing Functions to Spark" from Chapter 3
    - Since pair RDDs contain tuples, we need to pass functions that operate on tuples rather than on individual elements
* <code>reduceByKey(func)</code>: Combine values with the same key 
    - Note that calling <code>reduceByKey()</code> and <code>foldByKey()</code> will automatically perform combining locally on each machine before computing global totals for each key and hte user does not need to specify a combiner
    - The more general <code>combineByKey</code> interface allows you to customize combining behavior
    
* <code>groupByKey()</code>: Group values with the same key
* <code>combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner)</code>: Combine values with the same key using a different result type
    - <code>combineByKey()</code> is the most general of the per-key aggregation functions
    - Most of the other per-key combiners are implemented using it
    - Like <code>aggregate</code>, <code>combineByKey()</code> allows the user to return values that are not hte same type as our input data 
    - Understsanding <code>combineByKey()</code> in depth:<br>
    For each element in the partition, the element either has a key it hasn't seen before or has the same key as a previous element:
        - If it's a new element, <code>combineByKey()</code> uses a function we provide, called <code>createCombiner()</code> to create the initial value for the accumulator on the key (**Note: This happens the first time a key is found in each partition rather than only the first time the key is found in the RDD**)
        - If it is a value we have seen before while processing that partition, it will instead use the provided function, <code>mergeValue()</code>, with the current value for the accumulator for that key and the new value
        - <code>mergeCombiners()</code>: merges the accumulators of the same key across all partitions
* <code>mapValues(func)</code>: Apply a function to each value of a pair RDD without changing they key
* <code>flatMapValues(func)</code>: Apply a function taht returns an iterator to each value of a pair RDD, and for each element returned produce a key/value entry with the old key. Often used for tokenization 
* <code>keys()</code>: Return an RDD of just the keys
* <code>values()</code>: Return an RDD of just the values
* <code>sortByKey()</code>: Return an RDD sorted by the key

### Transformation on two pair RDDs
* <code> rdd.subtractByKey(other)</code>: Remove elements with a key present in the other RDD
* <code> rdd.join(other)</code>: Perform an inner join between two RDDs
* <code> rdd.rightOuterJoin(other)</code>: Perform a join between two RDDs where the key must be present in the first RDD 
* <code> rdd.leftOuterJoin(other)</code>: Perform a join between two RDDs where the key must be present in the other RDD
* <code> rdd.cogroup(other)</code>: Group data from both RDDs sharing the same key. 
    - <code>cogroup()</code> over two RDDs sharing the same key type, K, with the respective value types V and W returns <code>RDD[(K,(Iterable[V]),Iterable[W]))]</code>
    - If one of the RDDs doesn't have elements for a given key that is present in the other RDD, the corresponding <code>Iterable</code> is simply empty
    - can be used to implement intersection by key
    - can work on three or more RDDs at once


### Example on one pair RDD

In [121]:
val rdd = sc.parallelize(List((3,2),(1,4),(3,6),(3,1)))
println("rdd: " + rdd.collect().mkString(","))

//add values with the same key 
println("reduceByKey example: " + rdd.reduceByKey(_ + _).collect().mkString(","))

//groupByKey() returns an iterable in the values, the mapValues function below converts the iterable to a list
println("groupByKey example: " + rdd.groupByKey().mapValues(x => x.toList).collect().mkString(","))

//mapValues example
println("mapValues example: " + rdd.mapValues(_ + 1).collect().mkString(","))

//flatMap example:
// rdd.flatMapValues(x => (x to 3)).collect()
println("flatMapValues example:")
println("\tbefore flatMapValues:" + rdd.mapValues(x=>(x to 3)).collect().mkString(","))
println("\tafter flatMapValues: " + rdd.flatMapValues(x=>(x to 3)).collect().mkString(","))
println("keys example: " + rdd.keys.collect().mkString(","))
println("values example: " + rdd.values.collect().mkString(","))
println("sortByKey example: " + rdd.sortByKey().collect().mkString(","))


rdd: (3,2),(1,4),(3,6),(3,1)
reduceByKey example: (1,4),(3,9)
groupByKey example: (1,List(4)),(3,List(2, 6, 1))
mapValues example: (3,3),(1,5),(3,7),(3,2)
flatMapValues example:
	before flatMapValues:(3,Range(2, 3)),(1,Range()),(3,Range()),(3,Range(1, 2, 3))
	after flatMapValues: (3,2),(3,3),(3,1),(3,2),(3,3)
keys example: 3,1,3,3
values example: 2,4,6,1
sortByKey example: (1,4),(3,2),(3,6),(3,1)


## Actions Available on Pair RDDs
* <code>countByKey()</code>: Count the number of elements for each key
* <code>collectAsMap()</code>: Collect the result as a map to provide easy lookup
* <code>lookup(key)</code>: Return all values associated with the provided key

### Examples of Actions

In [36]:
val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
println("example rdd: " + rdd.collect().mkString(","))
println("countByKey result: " + rdd.countByKey())
println("lookup result: " + rdd.lookup(3))


example rdd: (1,2),(3,4),(3,6)
countByKey result: Map(1 -> 1, 3 -> 2)
lookup result: WrappedArray(4, 6)


### combineByKey Example

<code>combineByKey(createCombiner, mergeValue,mergeCombiners,partitioner)</code>: Combine values with the same key using a different result type


In [214]:
val lines = sc.textFile("rddpair_vehicles.txt")
println("rddpair_vehicles.txt file content: ")
lines.collect().foreach(println)

val rddpair = lines.map(x => (x.split("\t")(0),x.split("\t")(1).toInt))
val result = rddpair.combineByKey(x => (x,1),(x:(Int,Int),y) => (x._1 + y, x._2 + 1),
                                    (x: (Int,Int),y:(Int,Int)) => (x._1 + y._1, x._2 + y._2))
println("results:")
result.collectAsMap()

rddpair_vehicles.txt file content: 
car	1
car	2
car	100
taxi	1
taxi	47
bus	250
bus	26
uber	10
uber	17
results:


Map(taxi -> (48,2), uber -> (27,2), bus -> (276,2), car -> (103,3))

### reduceByKey Example

In [115]:
val lines2 = sc.textFile("rddpair_vehicles2.txt")
lines2.collect().foreach(println)
val rddpair = lines2.map(x => (x.split("\t")(0),x.split("\t")(1).toInt))

println("\nresults:")
rddpair.reduceByKey((x,y) => x+y).collectAsMap()

car	1
car	2
car	3
car	4
taxi	1
taxi	47
bus	250
uber	10
uber	17

results:


Map(taxi -> 48, uber -> 27, bus -> 250, car -> 10)

### Per-key average with <code>reduceByKey()</code> and <code>mapValues()</code> Example
<b> Note:</b> In Chapter 3 it was pointed out that <code>map()'s</code> return type does not have to be the same as the input type

In [211]:
val rdd = sc.parallelize(List(("panda",0),("pink",3),("pirate",3),("panda",1),("pink",4)))
val inter = rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2+y._2))
println("intermediate result:" + inter.collect().mkString(",")) 
println("final result: " + inter.mapValues(x => x._1.toDouble/x._2).collectAsMap())

intermediate result:(pirate,(3,1)),(panda,(1,2)),(pink,(7,2))
final result: Map(pirate -> 3.0, pink -> 3.5, panda -> 0.5)


## Word Count Example
The word count example below takes the following steps:
1. Split each line from the file into words
2. Map each word to a tuple containing the word and an initial count of 1 
3. Sum up the count for each word


In [216]:
val rdd = sc.textFile("README_spark.md")
val result = rdd.flatMap(x=>x.split(" ")).map(x=> (x,1)).reduceByKey((x,y) => (x+y))
println("number of times Spark appears in README_spark.md: " + result.lookup("Spark").mkString(","))

number of times Spark appears in README_spark.md: 13


## Data Partitioning (Advanced)
* In a distributed program, communication is very expensive so laying out the data to minimize network traffic can greatly improve performance
* Partitioning is only useful when a dataset is reused <b>multiple</b> times in key-oriented operations such joins
* Spark's partitioning is available on all RDDs of key/value pairs and causes the system to group elements based on a function of each key 
* Ensures that a set of keys will appear together on some node
* Example:
    - You might choose to hash-partition an RDD to 100 partitions so that keys that have the same hash value modulo 100 appear on the same node
    - You might range-partition the RDD into sorted ranges of keys so that the elements with keys in the same range appear on the same node
* Many other Spark operations automatically result in an RDD with known partitioning information and many operations other than <code>join()</code> will take advantage of this information
    - For example, <code>sortByKey()</code> and <code>groupByKey</code> will result in range-partitioned and hash-partitioned RDDs, respectively
    - On the other hand, operations like <code>map()</code> can cause the new RDD to forget the parent's partitioning information because such operations could theoretically modify the key of each record
    - Spark's Java and Python APIs benefit from partitioning in the same way as the scala API. However, in Python you pass a number of partitions desired (e.g. <code>rdd.partitionBy(100)</code>)
    
### Scala Simple Application Example
* Consider an aplication that keeps a large table of user information in memory - say, an RDD of <code>(UserID,UserInfo)</code> pairs, where <code>UserInfo</code> contains a list of topics the user is subscribed to 
* The application periodically combines this table with a smaller file representing events that happen in the past five minutes - say, a table of <code>(UserID, LinkInfo)</code> pairs for users who have clicked a link on a website in those five minutes
* We may want to count how many users visited a link that was <b>not</b> one of their subscribed topics
* Initialization code: we load the user info from a Hadoop SequenceFile on HDFS
* This distributes the elements of <code>userData</code> by the HDFS block where they are found and <b>doesn't provide Spark with any way of knowing which partition a particular <code>UserID</code> is located</b>

        val sc = new SparkContext(...)
        val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
        
* A Function is called peroidically to process a logfile of events in the past 5 minutes
* We assume that this is a SequenceFile containing (<code>userID, LinkInfo</code>) pairs 

        def processNewLogs(logFileName: String) {
            val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
            val joined = userData.join(events) //RDD of (UserID, (UserInfo, LinkInfo)) pairs
            val offTopicVisits = joined.filter {
                case (userID, (userInfo, linkInfo)) => //Expand the tuple into its components
                    !userInfo.topics.contains(linkInfo.topic)
                    }.count()
                    println("Number of Visits to non-subscribed topics: " + offTopicVisits)
        }
        
* This is inefficient because the <code>join()</code> operation is called each time <code>processNewLogs()</code> is invoked, but does not know anything about how the keys are partitioned in the datasets
* By default, this operation will hash all the keys of both datasets sending elements with the same key across the network to the same machine and then joining together the elements with the same key as shown below

<img src='figures/fig4_4.png' alt="Drawing" style="width: 600px;"/>

* Fixing this is simple: just use the <code>partitionBy()</code> transformation on <code>userData</code> to hash-partition it at the start of the program 
* Do this by passing <code>spark.HashPartitioner to partitionBy:</code>
        val sc = new SparkContext(...)
        val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
                            .partitionBy(new HashPartitioner(100)) //create 100 Partitions
                            .persist()
* The <code>processNewLogs()</code> method can remain unchanged
* Because we called <code>partitionBy()</code> when building <code>userData</code>, Spark will now know that it is hash-partitioned and calls to <code>join()</code> on it will take advantage of this information
    - When we call <code>userData.join(events)</code> Spark will shuffle only the events RDD, sending events with each particular <code>UserID</code> to the machine that contains the corresponding hash partition of <code>userData</code> as shown below:
    
<img src="figures/fig4_5.png" alt="Drawing" style="width: 600px;"/>

* The result is that a lot less data is communicated over the network, and the program runs significantly faster
* Note: <code> partitionBy()</code> is a transformation, so it always returns a new RDD - it does not change the original RDD in place. Therefore it is important to persist and save as <code>userData</code> the result of <code>partitionBy()</code>, not the original <code>sequenceFile()</code>
* In general make the number of partitions at least as large as the number of cores in your cluster
    - Failure to persist an RDD after it has been transformed with <code>partitionBy()</code> will cause subsequent uses of the RDD to repeat the partitioning of the data
    - This would negate the advantage of <code>partitionBy()</code> resulting in repeated partitioning and shuffling of the data across the network, similar to what occurs without any specificied partitioner 
                    

## Determining an RDD's Partitioner 
* In Scala and Java you can determine how an RDD is partiioned using its <code>partitioner</code> property (or <code>partitioner()</code> method in Java)
    - Returns a <code>scala.Option</code> object which is a Scala class for a container that may or may not contain one item 
    - Call <code>isDefined()</code> on the <code>Option</code> to check whether it has a value
    - Call <code>get()</code> to get this value
    - If present, the value will be a <code>spark.Partitioner</code> object 
    - This is essentially a function telling the RDD which partition each key goes into
* Example: Determining the partitioner of an RDD:
        scala> val pairs = sc.parallelize(List((1,1), (2,2), (3,3))) //create an RDD with no partitioning 
        pairs: spark.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12
        
        scala>pairs.partitioner
        res0: Option[spark.Partitioner] = None //partitioner validates no partitioning 
        
        //create new RDD w/ partitioning
        scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2)) 
                                        .persist() // to actually use partitioned in further operations 
        partitioned: spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:14
        
        scala> partitioned.partitioner 
        res1: Option[spark.Partitioner] = Some(spark.HashPartitioner@5147788d) //validates the partitioning 


        

## Tuning the Level of Parallelism
* Spark will always try to infer a sensible default number of partitions based on te size of your cluster, but in some cases you will want to tune hte level of parallelism for better performance 
* <code>repartition()</code>: shuffles the data across the network to create a new set of partitions
    - Note: repartitioning your data is a fairly expensive operation 
* <code>coalesce()</code>: an optimized version of <code>repartition()</code> that allows avoiding data movement, but only if you are decreasing the number of RDD partitions
    - To know whether you can safely call <code>coalesce()</code>, you can check the size of the RDD using <code>rdd.partitions.size()</code> (Java/Scala) and <code>rdd.getNumPartitions()</code> (Python) to make sure you are coalescing it to fewer partitions than it currently has 

## Operations That Benefit from Partitioning
* Operations that benefit from parittioning: <code>cogroup()</code>, <code>grouWith()</code>, <code>join()</code>,<code>leftOuterJoin()</code>,<code>rightOuterJoin()</code>,<code>groupByKey()</code>, <code>reduceByKey()</code>, <code>comebineByKey()</code>,<code>lookup()</code>
    - All these functions do not change the partitions because they only operate on the values
* For operations that act on a single RDD, such as <code>reduceByKey()</code>, running on a prepartioned RDD will cause all the values for each key to be computed locally on a single machine requring only the final local reduce value to be sent from each worker node by to the master

## Operations That Affect Partitioning
* Spark knows internally how each of its operators affects partitioning and automatically sets the <code>partitioner</code> on RDDs created by operations that partition the data 
* Suppose you call <code>join()</code> to join two RDDs; because the elements with the same key have been hased to the same machine, Spark knows that the result is hash-partitioned and operations like <code>reduceByKey()</code> on te join result are going to be signficantly faster 
* Flipside: for transformations that cannot be guaranteed to produce a known partitioning, the output RDD will not have a partitioner set
    - Example: if you call <code>map()</code> on a hash-partitioned RDD of key/value pairs, the function passed to <code>map()</code> can in theory change the key of each elemen, so the result will not have a <code>partitioner</code> set 
* Operations such as <code>mapValues()</code> and <code>flatMapValues()</codE> guarantees that each tuple's key remains the same so use that shit instead if you don't intend to change the key
* Operations that result in a partitioner being set on the output RDD:
    - <code>cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort()</code>
    - If the parent has a partiioner: <code>mapValues(), flatMapValues(), filter()</code>
* For binary operations <i>which</i> partitioner is set on the output depends on the parent RDDs' partitioners
* By default, it is a hash partitioner, with the number of partitions set to the level of parallelism of the operation
    - Set to whatever the operation decides to set it as 
* If one of the parents has a <code>partitioner</code> set, it will be that partitioner
* If both parents have a <code>partitioner</code> set, it will be the partioner of the first parent

In [5]:
//page rank example skipped - come back to this later

Name: Syntax Error.
Message: 
StackTrace: 