## 2.鍵值(key/value)轉換操作

### partitionBy / mapValues / flatMapValues

In [1]:
var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)

In [2]:
//p.71 ::=向前疊加並存入 contains是否有包含
rdd1.mapPartitionsWithIndex{
    (partIndex, iter) => {
        var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
        while(iter.hasNext){
            var part_name = "part_" + partIndex;
            var elem = iter.next()
            if(part_map.contains(part_name)){
                var elems = part_map(part_name)
                elems ::= elem
                part_map(part_name) = elems
            }else{
                part_map(part_name) = List[(Int,String)]{elem}
            }
        }
        part_map.iterator
    }
}.collect

Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C))))

In [3]:
//使用partitionBy重新分區
var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))

In [4]:
rdd2.mapPartitionsWithIndex{
    (partIndex, iter) => {
        var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
        while(iter.hasNext){
            var part_name = "part_" + partIndex;
            var elem = iter.next()
            if(part_map.contains(part_name)){
                var elems = part_map(part_name)
                elems ::= elem
                part_map(part_name) = elems
            }else{
                part_map(part_name) = List[(Int,String)]{elem}
            }
        }
        part_map.iterator
    }
}.collect

Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A))))

In [5]:
var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)

In [6]:
//mapValues對[K,V]中的V進行map操作
rdd1.mapValues(x => "@" + x + "_").collect

Array((1,@A_), (2,@B_), (3,@C_), (4,@D_))

In [7]:
//flatMapValues對[K,V]中的V進行flatMap操作
rdd1.flatMapValues(x => "@" + x + "_").collect

Array((1,@), (1,A), (1,_), (2,@), (2,B), (2,_), (3,@), (3,C), (3,_), (4,@), (4,D), (4,_))

### combineByKey / foldByKey

In [8]:
var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))

In [10]:
//該函數用於將RDD[K,V]轉換成RDD[K,C],這裡的V類型和C類型可以相同也可以不同。
//createCombiner：組合器函數，用於將V類型轉換成C類型，輸入參數為RDD[K,V]中的V,輸出為C
//mergeValue：合併值函數，將一個C類型和一個V類型值合併成一個C類型，輸入參數為(C,V)，輸出為C
//mergeCombiners：合併組合器函數，用於將兩個C類型值合併成一個C類型，輸入參數為(C,C)，輸出為C
//createCombiner: (V) => C
//(v : Int) => v + “_” //在每一个V值后面加上字符_，返回C类型(String)
//mergeValue: (C, V) => C
//(c : String, v : Int) => c + “@” + v //合并C类型和V类型，中间加字符@,返回C(String)
//mergeCombiners: (C, C) => C
//(c1 : String, c2 : String) => c1 + “$” + c2 //合并C类型和C类型，中间加$，返回C(String)其他参数为默认值。
//combineByKey結果與spark-shell不同,請見combineByKey.png

Name: Syntax Error.
Message: 
StackTrace: 

In [11]:
rdd1.combineByKey(
    (v : Int) => v + "_",
    (c : String, v : Int) => c + "@" + v,
    (c1 : String, c2 : String) => c1 + "$" + c2
).collect

Array((B,1_@2), (A,1_@2), (C,1_))

In [12]:
var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))

In [13]:
//將rdd1中每個key對應的V進行累加，注意zeroValue=0,需要先初始化V,映射函數為+操
//作，比如("A",0), ("A",2)，先將zeroValue應用於每個V,得到：("A",0+0), ("A",2+0)，即：
//("A",0), ("A",2)，再將映射函數應用於初始化後的V，最後得到(A,0+2),即(A,2)
rdd1.foldByKey(0)(_+_).collect

Array((B,3), (A,2), (C,1))

In [15]:
//foldByKey結果與spark-shell不同,請見foldByKey.png
rdd1.foldByKey(2)(_+_).collect

Array((B,5), (A,4), (C,3))

In [16]:
rdd1.foldByKey(0)(_*_).collect

Array((B,0), (A,0), (C,0))

In [17]:
rdd1.foldByKey(1)(_*_).collect

Array((B,2), (A,0), (C,1))

### groupByKey / reduceByKey / reduceByKeyLocal

In [18]:
var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))

In [19]:
//groupByKey將RDD[K,V]中每個K對應的V值，合併到一個集合Iterable[V]
rdd1.groupByKey.collect

Array((B,CompactBuffer(1, 2)), (A,CompactBuffer(0, 2)), (C,CompactBuffer(1)))

In [20]:
//reduceByKey將RDD[K,V]中每個K對應的V值根據映射函數來運算
rdd1.reduceByKey(_+_).collect

Array((B,3), (A,2), (C,1))

In [21]:
//use reduceByKey進行重新分區
var rdd2 = rdd1.reduceByKey(new org.apache.spark.HashPartitioner(2),_+_)

In [22]:
rdd2.collect

Array((B,3), (A,2), (C,1))

In [23]:
//reduceByKeyLocal將RDD[K,V]中每個K對應的V值根據映射函數來運算，運算結果映射到一個Map[K,V]中，而不是RDD[K,V]
rdd1.reduceByKeyLocally(_+_)

Map(A -> 2, B -> 3, C -> 1)

### cogroup 

In [24]:
var rdd1 = sc.makeRDD(Array(("A",1),("B",2),("C",3)),2)

In [25]:
var rdd2 = sc.makeRDD(Array(("A","a"),("D","d"),("C","c")),2)

In [26]:
var rdd3 = sc.makeRDD(Array(("A","A"),("E","E")),2)

In [27]:
//cogroup相當於SQL中的全外關聯full outer join，返回左右RDD中的記錄，關聯不上的為空
var rdd4 = rdd1.cogroup(rdd2,rdd3)

In [28]:
rdd4.partitions.size

2

In [29]:
rdd4.collect

Array((B,(CompactBuffer(2),CompactBuffer(),CompactBuffer())), (D,(CompactBuffer(),CompactBuffer(d),CompactBuffer())), (A,(CompactBuffer(1),CompactBuffer(a),CompactBuffer(A))), (C,(CompactBuffer(3),CompactBuffer(c),CompactBuffer())), (E,(CompactBuffer(),CompactBuffer(),CompactBuffer(E))))

### join / leftOuterJoin / roghtOuterJoin / subtractByKey

In [30]:
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)

In [31]:
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)

In [32]:
rdd1.join(rdd2).collect

Array((A,(1,a)), (C,(3,c)))

In [33]:
rdd1.leftOuterJoin(rdd2).collect

Array((B,(2,None)), (A,(1,Some(a))), (C,(3,Some(c))))

In [34]:
rdd1.rightOuterJoin(rdd2).collect

Array((D,(None,d)), (A,(Some(1),a)), (C,(Some(3),c)))

In [35]:
rdd1.subtractByKey(rdd2).collect

Array((B,2))