In [1]:
# 常用的引用和创建对象实体
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)

# 1, Motivation

# 2, Creating Pair RDDs

Always creat Pair RDDs by using RDD.map()

In [2]:
# Examples
lines = sc.parallelize(["hello world","hi Worriors","for the horde, bool and lighting!!!"])
# create a pair RDD by using the first word as the KEY
pairs = lines.map(lambda x: (x.split(" ")[0],x))
print(pairs.collect())

[('hello', 'hello world'), ('hi', 'hi Worriors'), ('for', 'for the horde, bool and lighting!!!')]


# 3, Transformations on Pair RDDs

Pairs RDDs are allowed to use all the transformations available to standard RDDs. Since Pair RDDs contains tuples, we need to pass functions that operate on tuples rather than on individual elements.

Pair RDDs are also still RDDs, and thus support the same functions as RDDs.

In [3]:
# filter the second elements of lines
result = pairs.filter(lambda keyValue: len(keyValue[1]) < 20)
print(result.collect())

[('hello', 'hello world'), ('hi', 'hi Worriors')]


## 3.1 Aggregations

### Functions:
1. reduceByKey():**similar with reduce()**, runs several parallel reduce operations, one for each key in the dataset, where each operation combines values that have the same key. It returns a new RDD consisting of each key and the reduced value for that key.
2. foldByKey(): **similar with fold()**
3. mapValues(): Usecase in the picture below

<img src="https://raw.githubusercontent.com/ColinsGitCode/JupyterNotebook/c0ba7cc4ceb1651a33bc65f0f3c738d68abbf25f/ipynbFiles/Materials/Usecase1.jpg" />

In [4]:
# examples of picture
# How create a Pais RDD
word = [('panda',0),('pink',3),('pirate',3),('panda',1),('pink',4)] # List insided with tuples
Words = sc.parallelize(word)
print(Words.keys().collect())
print(Words.values().collect())
new_words = Words.mapValues(lambda x:(x,1))
print(new_words.values().collect())
NewWords = new_words.reduceByKey(lambda x,y:(x[0] + y[0], x[1] + y[1]))
print(NewWords.values().collect())
print(NewWords.collect())


['panda', 'pink', 'pirate', 'panda', 'pink']
[0, 3, 3, 1, 4]
[(0, 1), (3, 1), (3, 1), (1, 1), (4, 1)]
[(1, 2), (7, 2), (3, 1)]
[('panda', (1, 2)), ('pink', (7, 2)), ('pirate', (3, 1))]


In [5]:
# Example of WordCount of a file(total number of each word)
File_RDD = sc.textFile("/home/colin/bashrc")
Txt = File_RDD.flatMap(lambda x:x.split(" ")) # this step is important!!!
print(Txt.take(10))
result = Txt.map(lambda x:(x,1)).reduceByKey(lambda x,y: x + y)
print(result.take(10))
Res = Txt.countByValue() # countByValue() count each word, and return a collections.defaultdict
print(type(Res))
print(Res[0])


['', '#', 'see', '/usr/share/doc/bash/examples/startup-files', '(in', 'the', 'package', 'bash-doc)', '#', 'for']
[('', 598), ('#', 73), ('see', 3), ('/usr/share/doc/bash/examples/startup-files', 1), ('(in', 1), ('the', 27), ('package', 1), ('bash-doc)', 1), ('for', 9), ('examples', 1)]
<class 'collections.defaultdict'>
0


## CombineByKey() : the most general of the per-key aggreation functions

**Contains:**
1. createCombiner(): Called when meets a element which has a new key, to create the initial value for the accumulator on that key
2. mergeValue(): Called when meets a element which do not has a new key, to merge the values
3. mergeCombiners(): Called when has 2 or more accumulator for one key, do the values merge for the same keys

**Parameters**: has many parameters to control the steps of the aggreation

In [23]:
# Example: Using combineByKey() tp obtain the averages of each key
print(Words.collect()) # ---> (key,value)
sumCount = Words.combineByKey((lambda x:(x,1)), # Each Element: each value change into (value,1), ---> (key,(value,1))
                              (lambda x,y:(x[0] + y, x[1] + 1)), # Each Partition: When same key: merge the same key values, count the number of same key values,--->(key,(TotalValues,CountNumber)) 
                              (lambda x,y:(x[0] + y[0], x[1] + y[1]))) # Whole RDD: --->(key,(TotalValueWholeRDD,TotalCountNumberWholeRDD))
print(sumCount.collect())

# mapAverage = sumCount.map(lambda key,xy:(key,xy[0]/xy[1])) 
# There is a Error when executed last one sentences,cased by Tuple Parameter Unpacking which has been removed in Python
# the Error can be slove by the webpage : https://stackoverflow.com/questions/40207441/python-spark-combinebykey-average
def pri(key_vals): 
    #print("Key is %s, Values is %d, Count is %d, Average is %f" %(key,val[0],val[1],val[0]/val[1]))
    (key,(total,count)) = key_vals
    return key,(total/count)
sumCount.map(pri).collect()

#mapAverage.collectAsMap()

[('panda', 0), ('pink', 3), ('pirate', 3), ('panda', 1), ('pink', 4)]
[('panda', (1, 2)), ('pink', (7, 2)), ('pirate', (3, 1))]


[('panda', 0.5), ('pink', 3.5), ('pirate', 3.0)]

## Tuning the level of parallelism
1. **Every RDD has a fixed number of partitions that determine the degree of parallelism to use when executing operations on the RDD.**
2. When performing aggregations or grouping operations, we can ask spark to use a specific nunmber of Partitions
3. Spark will always try to infer a sensible default value based on the size pf your cluster, but we can tune it to get better
4. **All the functions in this chapter all can set *number of partions* as a parameter, Example is on the below:**
```
RDD.reduceByKey(lambda x,y: x + y)    # default Parallelism
RDD.reduceByKey(lambda x,y: x + y,10) # set Parallelism by ourself
```
5. Sprak provide the *repartition()* function, which shuffles the data across the network to create a new set of partitions,**but it's a fairly *expensive* operation**, and Spark also provides an optimized version of the operation called *coalesce()*
6. In Python we can use ** *RDD.getNumPartitions()* ** to make sure that we are coalescing it 


In [25]:
sumCount.getNumPartitions()
# run in local just has one partitions

1

# 3.2 Grouping Data
1. With keyed data a common use case is grouping our data by key.
2. If our data is already keyed in the way we want, groupByKey() will group our data using the key in our RDD. On an RDD consisting of keys of type K and values of type V, we get back an RDD of type [K, Iterable[V]].
3. **groupBy() works on unpaired data or data where we want to use a different condition besides equality on the current key. It takes a function that it applies to every element in the source RDD and uses the result to determine the key.**
4. In addition to grouping data from a single RDD, we can group data sharing the same key from multiple RDDs using a function called cogroup()
5. ......

# 3.3 连接

1. jion() --> page:50
2. leftOuterJoin() --> Page:51
3. rightOuterJoin() --> Page:51

In [36]:
# Examples
rdd1 =  sc.parallelize([("foo", 1), ("kaka", 2), ("baz", 3),("bar", 8)])
rdd2 =  sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6),("Colin",7)])

print(rdd1.collect())
print(rdd2.collect())

[('foo', 1), ('kaka', 2), ('baz', 3), ('bar', 8)]
[('foo', 4), ('bar', 5), ('bar', 6), ('Colin', 7)]


In [39]:
# join()只保留两个RDD都存在的键值对
# 当某个键有多个值时，则生成的pair RDD会包括来自两个输入RDD的每一组相对于的记录
rdd3 = rdd1.join(rdd2) 
print(rdd3.collect()) 
# leftOuterJoin()中，源RDD(rdd1)的所有键值都会被保存，每个value都是源RDD(rdd1)和rdd2的值的元组对，若rdd2中没有的键，值则为zone
rdd4 = rdd1.leftOuterJoin(rdd2)
print(rdd4.collect()) 
# rightOuterJoin()中，则以rdd2为中心，但是生成的value元组，***仍以rdd1的值为第一个元素***
rdd5 = rdd1.rightOuterJoin(rdd2)
print(rdd5.collect()) 

[('foo', (1, 4)), ('bar', (8, 5)), ('bar', (8, 6))]
[('foo', (1, 4)), ('kaka', (2, None)), ('baz', (3, None)), ('bar', (8, 5)), ('bar', (8, 6))]
[('foo', (1, 4)), ('bar', (8, 5)), ('bar', (8, 6)), ('Colin', (None, 7))]


# 3.4 数据排序
1. sortByKey() --> page 51
2. Exampel: 已字符串顺序对整数进行自定义排序

`rdd.sortByKey(ascending=True, numPartition=None, keyfunc= lambda x: str(x))`

# 4 Pair RDD的行动操作
1. page:52
2. 常见行动操作：

<img src="https://raw.githubusercontent.com/ColinsGitCode/JupyterNotebook/acbfab7b376c184e490385919642dfcd3c7d46a1/ipynbFiles/Materials/%E5%B8%B8%E8%A7%81%E6%9C%89%E7%94%A8%E7%9A%84Pair_RDD%E8%A1%8C%E5%8A%A8%E6%93%8D%E4%BD%9C.jpg" />