**[RDD 编程](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)**

我们这一章介绍 Spark 对数据的核心抽象——弹性分布式数据集(Resilient Distributed Dataset,简称 RDD)。在 Spark中，对 RDD 的操作一共三种：创建 RDD、转化已有 RDD、调用 RDD 进行求值。

Spark 会自动将 RDD 中的数据分发到集群上，并将操作并行化执行。

Spark 中的 RDD 是一个**对不可变的、分区的 elements 支持并行操作的集合**，每个 RDD 都被分为多个分区，这些分区运行在集群中的不同节点上。 RDD 可以包含 Python、Scala、Java 中任意类型的对象，包括用户自定义的对象。

为这一章开启一个 `context`:

In [1]:
import findspark
# 找到spark的安装目录
spark_path = 'C:\spark232_hadoop27'
findspark.init(spark_path,edit_rc=True)
import pyspark
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster('local').setAppName('Chen_App')
try:
    sc.stop()
except:
     print('there is no sparkcntext on running')   
sc = SparkContext(conf=conf)

there is no sparkcntext on running


Exception: Java gateway process exited before sending its port number

***
# 创建 RDD

用户可以通过两种方式来创建 RDD:

* 读取一个外部数据集；


* 在驱动程序里分发 driver program 中的对象集合(比如 `list`和`set`),在 driver program 中对数据集合进行并行化。

## 外部文件读取

实践中比较常用的方式是从外部存储中读取数据来创建 RDD。

比如下面的示例：读取文本文件来作为一个字符串 RDD 的示例。

In [6]:
lines = sc.textFile(spark_path + '\README.md')
lines

C:\spark232_hadoop27\README.md MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

## parallelize()

`parallelize(c, numSlices=None)`:Distribute a local Python collection to form an RDD. 

* c:Python type collection,Using xrange is recommended if the input represents a range for performance;


* numSlices:the number of partitions of the new RDD

将程序中一个已有集合或者用户自己定义的对象传递给 SparkContext 中的 `parallelize()` 这种方法很适合在学习 Spark 时应用，可以快速创建 RDD 并且进行其他方法的学习和实践；但实际工程中应用的并不多，这种方式需要把整个数据集先放在一台机器的内存中，这个明显违背了 Spark 的初衷。

In [7]:
## parallelize() 方法
para_rdd = sc.parallelize(['pandas', 'i like pandas'])
para_rdd

ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:194

***
# RDD 操作类型

创建出来以后，RDD 支持两种类型的操作：**转化(trasformation)操作** 和 **行动(action)操作**。

* 转化操作是返回一个新的 RDD 的操作，比如 `map()`和 `filter()`;


* 行动操作会对 RDD 计算出一个结果，并将结果返回到 driver program，或者存储到外部存储系统(如 HDFS)中,比如 `count()`和 `first()`。

Spark 对待 trasformation 和 action 的方式不一样：

* 从返回结果来看，trasformation 返回的是 RDD，action 返回的是其他数据类型；


* 从 driver program 调用操作的角度讲，前者是惰性计算的，后者是立即计算，后面的“惰性求值”环节会具体介绍。

## Trasformation Operation

RDD 的转化操作是**返回新的 RDD 的操作**，之后会讲到转化出来的 RDD 是**惰性求值**的，只有在行动操作中才会被计算。

In [8]:
# 转化操作示例
python_lines = lines.filter(lambda x: 'Python' in x)
print(python_lines)
print(lines)

PythonRDD[3] at RDD at PythonRDD.scala:52
C:\spark232_hadoop27\README.md MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0


我们注意到 `lines`和`python_lines`是两个不同的RDD，你之后可以继续使用 `lines`这个RDD 进行其他转化操作或者行动操作。

比如我们选出包含 `Spark` 的行并且将两者通过 `union()` 方法合并。

In [9]:
spark_lines = lines.filter(lambda x:'Spark' in x)
both_lines = spark_lines.union(python_lines)

`union()` 操作的是两个 RDD，转化操作可以操作任意数量的输入 RDD。要获得上面等价的效果，更好的方法是下面这样的方式：

In [10]:
both_lines0 = lines.filter(lambda x: 'Spark' in x or 'Python' in x)

***

**谱系图(lineage graph)**

通过转化操作从已有 RDD 生成 新的 RDD 时，Spark 会使用谱系图(lineage graph)来记录不同 RDD 之间的依赖关系，Spark 根据谱系图来按需计算 RDD，另外也可以依靠谱系图在持久化的 RDD 丢失部分数据时恢复所丢失的数据。

下图是上面的操作所对应的谱系图

<img src='figure/lineage0.png'>

## Action Operation

行动操作需要生成实际的输出，会强制执行那些求值必须用到的RDD转化操作。

In [11]:
# 进行 count() 统计计数以及通过 take()提取10个元素
print('input had Python and Spark string nums is {}'.format(both_lines.count()))
for words in both_lines.take(5):
    print(words)

input had Python and Spark string nums is 23
# Apache Spark
Spark is a fast and general cluster computing system for Big Data. It provides
rich set of higher-level tools including Spark SQL for SQL and DataFrames,
and Spark Streaming for stream processing.
You can find the latest Spark documentation, including a programming


## 惰性求值

RDD 的转化操作都是惰性求值的，这意味着我们对 RDD 调用转化操作(比如 `map()`)时，操作不会立即执行。相反， Spark 会在内部记录下所要求执行的操作的相关信息，形成一个指令列表，也就是我们前面提到的谱系图（把数据读取到 RDD 的操作同样是惰性的）。

所以我们也不应该把 RDD 看成是存放着特定数据的数据集，因为实际上当我们敲下转化操作对应的代码并按回车时，实际行为是在谱系图中添加了一个“转化指令”，只有当 Action Operation 发生时，才会请求进行计算。

**惰性求值的优点在于**可以把一些操作合并一起来减少计算数据的步骤。

## 持久化

默认情况下， Spark 的 RDD 是惰性求值的。如果我们希望多次使用同一个 RDD，如果简单地对RDD调用行动操作， Spark 会每次重新计算 RDD 以及其所有依赖。这个在迭代算法中消耗很大，因为迭代过程会多次使用同一组数据。

为了避免多次计算同一个 RDD，可以让 Spark 对数据持久化。当我们让 Spark 持久化存储一个 RDD 时，计算出 RDD 的节点会保存它们所求出的分区数据。如果一个持久化数据的节点发生故障， Spark 会在需要用到缓存的数据时重新计算丢失的数据分区。如果希望节点故障的情况下不拖累计算速度，也可以把数据备份到多个节点上。

出于不同的目的，可以为 RDD 选择不同的持久化级别。 

python 中可以使用 `RDD.persist(storageLevel=StorageLevel(False, True, False, False, 1))`或者 `RDD.cache()` 让 Spark 把这个 RDD 缓存下来，可以通过参数设置将数据缓存到不同的地方，可以是磁盘上或者内存中，使用结束后可以通过 `RDD.unpersist()` 释放持久化。

### 存储级别 pyspark.StorageLevel()

`class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication=1)`:

    Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory in a JAVA-specific serialized format, and whether to replicate the RDD partitions on multiple nodes. Also contains static constants for some commonly used storage levels, MEMORY_ONLY. Since the data is always serialized on the Python side, all the constants use the serialized formats.

* DISK_ONLY = StorageLevel(True, False, False, False, 1)


* MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)


* MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)


* MEMORY_ONLY = StorageLevel(False, True, False, False, 1)


* MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)


* OFF_HEAP = StorageLevel(True, True, True, False, 1)


* 参数 replication 取 n 就可以得到备份为 n 份的存储级别，比如 MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)



***
[java 中序列化的作用和好处](https://blog.csdn.net/u013161431/article/details/73525543)

**参数解释：**

* MEMORY_ONLY : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够，部分数据分区将不再缓存，在每次需要用到这些数据时重新进行计算。MEMORY_ONLY是`persist()`默认的级别；


* MEMORY_AND_DISK : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够，将未缓存的数据分区存储到磁盘，在需要使用这些分区时从磁盘读取；


* MEMORY_ONLY_SER : 将 RDD 以序列化的 Java 对象的形式进行存储（每个分区为一个 byte 数组）。这种方式会比反序列化对象的方式节省很多空间，尤其是在使用 fast serializer时会节省更多的空间，但是在读取时会增加 CPU 的计算负担；


* MEMORY_AND_DISK_SER : 类似于 MEMORY_ONLY_SER ，但是溢出的分区会存储到磁盘，而不是在用到它们时重新计算；


* DISK_ONLY : 只在磁盘上缓存 RDD；


* OFF_HEAP（实验中）: 类似于 MEMORY_ONLY_SER ，但是将数据存储在 off-heap memory，这需要启动 off-heap 内存。

In [12]:
# 设置不同的存储级别
from pyspark import StorageLevel
DISK_ONLY = StorageLevel(True, False, False, False, 1)
DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
MEMORY_ONLY = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)
OFF_HEAP = StorageLevel(True, True, True, False, 1)

***

### 如何选择不同存储级别

Spark 存储级别的选择，核心问题是在**内存使用率和 CPU 效率之间进行权衡**。

<img src='figure/storagelevel.jpg'>

建议按下面的过程进行存储级别的选择 :

* 如果内存可以全部存储 RDD，可以使用默认存储级别 MEMORY_ONLY。默认存储级别可以最大程度提高 CPU 的效率,在 RDD 上的操作以最快的速度运行；


* 如果内存不能全部存储 RDD，可以使用 MEMORY_ONLY_SER，并挑选一个快速序列化库将对象序列化，以节省内存空间。使用这种存储级别，计算速度仍然很快。


* 除非计算该数据集代价特别高，或者需要过滤大量数据的情况下，尽量不要将溢出的数据存储到磁盘。因为重新计算这个数据分区的耗时与从磁盘读取的耗时差不多。


* 如果想快速还原故障，建议使用多副本存储级别。所有的存储级别都通过重新计算丢失的数据的方式，提供了完全容错机制，但是多副本级别在发生数据丢失时不需要重新计算对应的数据分区，可以让任务继续运行。

参考 [Spark 持久化](https://zhuanlan.zhihu.com/p/50089958)

***
**python 中的序列化**

python 中会始终序列化要持久化的数据（`deserialized`设置为 `False`），所以持久化级别默认值就是以序列化后的对象存储在 JVM 堆空间。当我们把数据写到磁盘或者堆外存储上时，也总是使用序列化后的数据。

### persist() and unpersist()

* `persist(storageLevel=StorageLevel(False, True, False, False, 1))`:

    Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY).
    
    
* `unpersist()`:Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.

In [13]:
# 为 RDD 选择相应的存储级别
python_lines = lines.filter(lambda x: 'Python' in x)
python_lines.persist(storageLevel=MEMORY_ONLY_2)
print(python_lines.is_cached)
print(python_lines.count())
python_lines.unpersist()
print(python_lines.is_cached)

True
3
False


### cache()

`cache()`:Persist this RDD with the default storage level (MEMORY_ONLY).

注：`cache()` 不能设置存储级别，默认为 MEMORY_ONLY

 # RDD 常用转化操作   

## 针对各个元素的转化操作

### map()

`map(f, preservesPartitioning=False)`：Return a new RDD by applying a function to each element of this RDD.

* `f` 表示自定义函数；
* `preservesPartitioning`表示是否保留父RDD的partitioner分区信息。

In [14]:
# 求平方
nums = sc.parallelize([1,2,3,4])
print(nums.map(lambda x:x*x).collect())

[1, 4, 9, 16]


### filter()

`filter(f)`:Return a new RDD containing only the elements that satisfy a predicate

In [15]:
print(nums.filter(lambda x:x!=1).collect())

[2, 3, 4]


### flatMap()

`flatMap(f, preservesPartitioning=False)`:Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.



In [16]:
# 将行数据切分为单词

lines = sc.parallelize(['hello world', 'hi'])
print('flatMap result is ',lines.flatMap(lambda line:line.split(' ')).collect())
print('map result is ',lines.map(lambda line:line.split(' ')).collect())

flatMap result is  ['hello', 'world', 'hi']
map result is  [['hello', 'world'], ['hi']]


### groupBy()

`groupBy(f, numPartitions=None, partitionFunc=<function portable_hash>)`:Return an RDD of grouped items.

In [17]:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
rdd.groupBy(lambda x:x%2).mapValues(list).collect()

[(0, [2, 8]), (1, [1, 1, 3, 5])]

### glom()

`glom()`:Return an RDD created by coalescing all elements within each partition into a list.

In [18]:
sc.parallelize(range(5),2).glom().collect()

[[0, 1], [2, 3, 4]]

从上面结果可知， `glom()` 收集不同分区的元素时，结果受到上一个 RDD 分区的影响。

### sortby()

`sortBy(keyfunc, ascending=True, numPartitions=None)`:Sorts this RDD by the given keyfunc

In [19]:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()

[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

In [20]:
sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()

[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

In [21]:
sc.parallelize(tmp).sortBy(lambda x: x[1],ascending=False, numPartitions=2).glom().collect()

[[('2', 5), ('d', 4)], [('1', 3), ('b', 2), ('a', 1)]]

## 伪集合操作

RDD 本身不是严格意义上的集合，但它也支持许多数学上的集合操作，这些操作要求 RDD 是相同的数据类型。

In [22]:
r1 = sc.parallelize(['coffee', 'coffee', 'panda', 'monkey', 'tea'])
r2 = sc.parallelize(['coffee', 'monkey', 'kitty'])

### distinct()

`distinct(numPartitions=None)`:Return a new RDD containing the distinct elements in this RDD.

* `numPartitions`:the number of partitions of the new RDD

RDD 最长缺失的集合属性是元素的唯一性，因为常常有重复的元素。`RDD.distinct()`转化操作可以生成一个只包含不同元素的 RDD。**不过注意， `distinct()` 操作的开销很大，因为它需要将所有数据通过网络进行混洗(shuffle)，以确保每个元素都只有一份。**

In [23]:
r1.distinct().collect()

['panda', 'coffee', 'monkey', 'tea']

### union()

`union(rdds)`:Build the union of a list of RDDs.

In [24]:
print(sc.union([r1, r2]).collect())
print(r1.union(r2).collect())

['coffee', 'coffee', 'panda', 'monkey', 'tea', 'coffee', 'monkey', 'kitty']
['coffee', 'coffee', 'panda', 'monkey', 'tea', 'coffee', 'monkey', 'kitty']


### intersection()

`intersection(other)`:Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.

In [25]:
print(r1.intersection(r2).collect())

['coffee', 'monkey']


### subtract()

`subtract(other, numPartitions=None)`:Return each value in self that is not contained in other.

In [26]:
r1.subtract(r2).collect()

['panda', 'tea']

### cartesian()

`cartesian(other)`:Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in self and b is in other.

注意：大规模RDD的笛卡尔积开销巨大。

In [27]:
user_rdd = sc.parallelize(['user1','user2', 'user3'])
roomseq_rdd = sc.parallelize([1101,1102,1103])
user_rdd.cartesian(roomseq_rdd).collect()

[('user1', 1101),
 ('user1', 1102),
 ('user1', 1103),
 ('user2', 1101),
 ('user2', 1102),
 ('user2', 1103),
 ('user3', 1101),
 ('user3', 1102),
 ('user3', 1103)]

# RDD 常用行动操作

### collect()

`collect()`:Return a list that contains all of the elements in this RDD.

**Note**: This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

In [28]:
r1.collect()

['coffee', 'coffee', 'panda', 'monkey', 'tea']

### take()

`take(num)`:

    Take the first num elements of the RDD.

    It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

In [29]:
sc.parallelize([1,3,4,2],numSlices=2).take(3)

[1, 3, 4]

### top()

`top(num, key=None)`:Get the top N elements from an RDD.

**Note:**
* This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
* It returns the list sorted in descending order.

In [30]:
print(sc.parallelize([10, 4, 2, 12, 3],3).top(2),sc.parallelize([10, 4, 2,6, 12, 3],3).top(3, key=str))

[12, 10] [6, 4, 3]


按照字符串的排序结果请参见 [ASCII对应表](https://baike.baidu.com/item/ASCII/309296#3)

### reduce()

`reduce(f)`:Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally.

In [31]:
sc.parallelize([1, 2, 3, 4, 5],4).reduce(lambda x,y:x+y)

15

### fold()

`fold(zeroValue, op)`:

    Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value.”
    
注：如果 `zeroValue`不是某种运算对应的单位元，则`parallelize`中参数 `numSlices`的不同会影响结果的不同，如下所示：

In [32]:
for unit in [0,1]:
    for numslices in [2,3,4]:
        result = sc.parallelize([1, 2, 3, 4, 5],numSlices=numslices).fold(unit,lambda x,y:x+y)
        print('zeroValues is {};numSlices is {}; result is {}'.format(unit, numslices, result))

zeroValues is 0;numSlices is 2; result is 15
zeroValues is 0;numSlices is 3; result is 15
zeroValues is 0;numSlices is 4; result is 15
zeroValues is 1;numSlices is 2; result is 18
zeroValues is 1;numSlices is 3; result is 19
zeroValues is 1;numSlices is 4; result is 20


### aggreage()

`aggregate(zeroValue, seqOp, combOp)`:

    Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral “zero value.”

    The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U

从数据类型角度看：

* zeroValue: U，在实际操作中 zeroValue 的值应该是关于某种运算的"zero value"，例如对于加法的"zero value"是0，乘法的"zero value"是1 ；否则`parallelize(c,numSlices=None)`中 numSlices 参数的不同会影响最终结果；


* seqOp: (U, T)，将RDD中每个分区中的T类型元素聚合成U类型；


* combOp: (U, U)，将之前每个分区聚合后的U类型聚合成U类型；


* 特别注意seqOp和combOp都会使用zeroValue的值，zeroValue的类型为U。

In [33]:
nums = sc.parallelize([1,2,3,4],numSlices=4)
sumcount = nums.aggregate((1,1),
                          (lambda acc,value:(acc[0]+value, acc[1]+1)), 
                          (lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1])))
sumcount[0]/float(sumcount[1])

1.6666666666666667

`seqop`是一个函数，aggregate 将这个函数以及`zeroValue`分发到各个分区,`combOp` 是一个合并函数，以`zeroValue`为初始值将`seqop`产生的结果累计求和：

<img src='figure/aggregate.png'>

result = 15/9 = 5/3

In [34]:
nums = sc.parallelize([1,2,3,4],numSlices=1)
sumcount = nums.aggregate((1,1),
                          (lambda acc,value:(acc[0]+value,acc[1]+1)), 
                          (lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1])))
sumcount[0]/float(sumcount[1])

2.0

结果为 2 的原因如下：

**step1 : `seqOp()`函数迭代**

|循环次数|	输入值	|结果|
|---    |  ---   |--- |
|初始   |	-     |(1,1)|
|第1次遍历|(1,1), 1|(2,2)|
|第2次遍历|(2,2), 2	|(4,3)|
|第3次遍历|(4,3), 3	|(7,4)|
|第4次遍历|(7,4), 4	|(11,5)|

**step2 : `combOp()`函数迭代合并**

|循环次数|	输入值	|结果|
|---    |  ---   |--- |
|初始   |	-     |(1,1)|
|第1次遍历|(1,1), (11,5)|(12,6)|

result = 12/6 = 2

下面演示当 "zero value" 分别为(0,0)和(1,1)时，参数`numSilices`变化时，aggregate 函数结果的不同

In [35]:
for zeroValue in [(0,0), (1,1)]:
    for numslices in [1,2,3]:
        nums = sc.parallelize([1,2,3,4],numSlices=numslices)
        sumcount = nums.aggregate(zeroValue,
                                  (lambda acc,value:(acc[0]+value,acc[1]+1)), 
                                  (lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1])))
        result = sumcount[0]/float(sumcount[1])
        print('zeroValue is {};numsilices is {};result is {}'.format(zeroValue, numslices, result))

zeroValue is (0, 0);numsilices is 1;result is 2.5
zeroValue is (0, 0);numsilices is 2;result is 2.5
zeroValue is (0, 0);numsilices is 3;result is 2.5
zeroValue is (1, 1);numsilices is 1;result is 2.0
zeroValue is (1, 1);numsilices is 2;result is 1.8571428571428572
zeroValue is (1, 1);numsilices is 3;result is 1.75


从结果可知，**当 `zeroValue` 是关于运算本身的单位元时，`numSlices` 的变化不会影响结果；否则会不同**

In [36]:
# zeroValue 的维度也是可以自己调整的
nums = sc.parallelize([1,2,3,4],numSlices=1)
sumcount = nums.aggregate((1,2,1),
                          (lambda acc,value:(acc[0]+value, 0,acc[2]+1)), 
                          (lambda acc1,acc2:(acc1[0]+acc2[0],0,acc1[2]+acc2[2])))
print('sumcount is {};mean is {}'.format(sumcount,sumcount[0]/float(sumcount[2])))

sumcount is (12, 0, 6);mean is 2.0


### count()

`count()`:Return the number of elements in this RDD.

In [37]:
sc.parallelize([2, 3, 4])

ParallelCollectionRDD[107] at parallelize at PythonRDD.scala:194

### countByValue():

`countByValue()`:Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.

In [38]:
rdd=sc.parallelize([1, 2, 1, 2, 2], 2).countByValue()
rdd

defaultdict(int, {1: 2, 2: 3})

In [39]:
rdd[1]

2

### foreach()

`foreach(f)`:Applies a function to all elements of this RDD.

In [3]:
def f(x):print(x)
sc.parallelize([1, 2, 3, 4, 5]).foreach(f)

注意:

* 如果对RDD执行foreach，只会在Executor端有效，而并不是Driver端。比如：`rdd.foreach(f)`只会在Executor的stdout中打印出来，Driver端是看不到的;


* 不要使用foreach或者map方法打印数据。在一台机器上，这个操作是没有问题的。但是如果在集群上，不一定会打印出全部的数据。可以使用collect方法将RDD放到调用节点上。所以`rdd.collect().foreach(println)`是可以打印出数据的，但是可能数据量过大，会导致OOM。所以最好的方式还是使用take方法：`rdd.take(100).foreach(println)`。

# 键值对 RDD 概念及其创建

键值对 RDD 是 Spark 中常见的数据类型，通常用来进行聚合计算。一般要通过一些初始 ETL(extract、trasformation、load)操作来将数据转化为键值对形式。

我们也会讨论 RDD 在各节点上的高级特性：分区。有时候使用可控的分区方式把常被一起访问的数据放到同一个节点上，可以大大减少应用的通信开销，这会带来明显的性能提升。为分布式数据集选择正确的分区方式类似于为本地数据集选择合适的数据结构——在这两种情况下，数据分布都会及其明显地影响程序的性能表现。

我们之后称 键值对RDD 为 pair RDD。

在 Spark 中有许多种创建 RDD 的方式：

* 存储键值对的数据格式在读取时直接返回由其键值对数据组成的 pair RDD；


* 可以调用 `map()` 函数将普通的 RDD 转化为 一个由二元组组成的 pair RDD；


* 在内存中数据集创建 pair RDD 时，只需要对这个由 二元组组成的集合调用 `sc.parallelize()` 方法就可以。

In [40]:
# 重新开启一个 sparkcontext
import findspark
# 找到spark的安装目录
spark_path = 'C:\spark232_hadoop27'
findspark.init(spark_path,edit_rc=True)
import pyspark
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster('local').setAppName('Chen_App')
try:
    sc.stop()
except:
     print('there is no sparkcntext on running')   
sc = SparkContext(conf=conf)

In [41]:
# python 中使用第一个单词作为 key 来创建出一个 pair RDD
lines = sc.parallelize(['hello world', 'hi'])
pairs = lines.map(lambda x:(x.split(" ")[0], x))

In [42]:
print(pairs.top(3))
print(pairs.count())

[('hi', 'hi'), ('hello', 'hello world')]
2


# Pair RDD 的转化操作 

Pair RDD 可以使用所有标准 RDD 上的可用转化操作，因为 pair RDD 包含二元组，所有如果需要传递函数时应当操作二元组而不是独立的元素。

***
## 聚合操作

当数据集以键值对的形式组织在一起的时候，聚合具有相同键的元素进行一些统计是很常见的操作，这些操作返回 RDD，所以它们是转化操作而不是行动操作。

### mapValues()

`mapValues(f)`:Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD’s partitioning.

In [43]:
x_pair = sc.parallelize([('panda', 0), ('pink', 3), ('pirate', 3), ('panda', 1), ('pink', 4)])

In [44]:
x_pair.mapValues(lambda x:(x,1)).collect()

[('panda', (0, 1)),
 ('pink', (3, 1)),
 ('pirate', (3, 1)),
 ('panda', (1, 1)),
 ('pink', (4, 1))]

### combineByKey()

`combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash>)`:

    Generic function to combine the elements for each key using a custom set of aggregation functions.

    Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C.

Users provide three functions:

* createCombiner, which turns a V into a C (e.g., creates a one-element list)
* mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
* mergeCombiners, to combine two C’s into a single one (e.g., merges the lists)

Note:

    To avoid memory allocation, both mergeValue and mergeCombiners are allowed to modify and return their first argument instead of creating a new C.

    In addition, users can control the partitioning of the output RDD.
    
     V and C can be different – for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).

`combineByKey()` 会遍历分区中的所有元素，因此每个元素的键要么还没遇到过，要么就和之前的某个元素键相同。如果是新元素，`combineByKey()` 会使用一个叫作 `createCombiner()` 的函数来创建那个键所对应的累加器的初始值。要注意的是，这一过程会在每个分区中第一次出现各个键时发生，而不是整个 RDD 中第一次出现一个键时发生。

如果这是一个在处理当前分区之前已经遇到的键，会使用 `mergeValue()` 方法将该键的累加器对应的当前值与这个新的值合并；

由于每个分区是独立处理的，因此对于同一个键可以有多个累加器，如果有两个或者更多的分区都有对应同一个键的累加器，就需要使用用户所提供的 `mergeCombiners()` 方法将各个分区的结果进行合并。

In [45]:
x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
def to_list(a):
    return [a]

def append(a, b):
    a.append(b)
    return a

def extend(a, b):
    a.extend(b)
    return a

x.combineByKey(to_list, append, extend).collect()

[('a', [1, 2]), ('b', [1])]

### reduceByKey()

`reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash>)`:

    Merge the values for each key using an associative and commutative reduce function.

    This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.

    Output will be partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified. Default partitioner is hash-partition.

In [46]:
x1_rdd = x_pair.mapValues(lambda x:(x,1)).reduceByKey(lambda x,y:(x[0] + y[0], x[1] + y[1]),3)

In [47]:
x1_rdd.collect()

[('pirate', (3, 1)), ('panda', (1, 2)), ('pink', (7, 2))]

In [48]:
# 计算每个 key对应的平均值
x1_rdd.mapValues(lambda x:x[0]/float(x[1])).collect()

[('pirate', 3.0), ('panda', 0.5), ('pink', 3.5)]

In [49]:
x1_rdd.getNumPartitions()

3

## 数据分组

对于有 key 的数据，常见的一个需求是根据键进行分组。下面介绍的 `groupByKey()` 会将一个 `{K:V}` 组成的 RDD 变成 `{K: Iterable[V]}` 类型的 RDD，value 是迭代器，需要`map`类型的操作之后通过 `collect()`才能返回值。

### groupByKey()：对单个RDD 进行分组

`groupByKey(numPartitions=None, partitionFunc=<function portable_hash>)`:

    Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.



In [50]:
rdd_gbk = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd_gbk.groupBy(lambda x: x[0] == 'a').mapValues(list).collect()

[(False, [('b', 1)]), (True, [('a', 1), ('a', 1)])]

In [51]:
rdd_gbk.groupByKey().mapValues(list).collect()

[('a', [1, 1]), ('b', [1])]

### cogroup()：对两个 RDD 合并分组

`cogroup(other, numPartitions=None)`:

    For each key k in self or other, return a resulting RDD that contains a tuple with the list of values for that key in self as well as other.
    
    如果一个 RDD 对另一个 RDD 的key 没有对应记录，对应的迭代器为空。

In [52]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
[(k, list(map(list,v))) for k,v in x.cogroup(y).collect()]

[('b', [[4], []]), ('a', [[1], [2]])]

### groupWith(): 对多个 RDD 合并分组

`groupWith(other, *others)`:Alias for cogroup but with support for multiple RDDs.

In [53]:
w = sc.parallelize([("a", 5), ("b", 6)])
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
z = sc.parallelize([("b", 42)])
[(k, list(map(list,v))) for k,v in w.groupWith(x,y,z).collect()]

[('b', [[6], [4], [], [42]]), ('a', [[5], [1], [2], []])]

## 数据排序

数据排序有时候是必要的，如果键本身是有序的，可以对这种键值对 RDD 排序;或者我们可以提供自定义的方式进行排序。

### sortByKey()

`sortByKey(ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda>>)`:Sorts this RDD, which is assumed to consist of (key, value) pairs.

In [54]:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sc.parallelize(tmp).sortByKey(True, 1).collect()

[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

In [55]:
sc.parallelize(tmp).sortByKey(True, 2).collect()

[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

In [56]:
sc.parallelize(tmp).sortByKey(True, 2).glom().collect()

[[('1', 3), ('2', 5), ('a', 1)], [('b', 2), ('d', 4)]]

In [57]:
tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])

# lower() 方法转换字符串中所有大写字符为小写
sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()

[('a', 3),
 ('fleece', 7),
 ('had', 2),
 ('lamb', 5),
 ('little', 4),
 ('Mary', 1),
 ('was', 8),
 ('white', 9),
 ('whose', 6)]

## 连接

将若干个有键的数据连接在一起是常用的操作，连接方式多种多样：右外连接、左外连接、交叉连接、内连接。

### join():内连接

`join(other, numPartitions=None)`:Return an RDD containing all pairs of elements with matching keys in self and other.

    Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other.

    Performs a hash join across the cluster.

In [58]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
x.join(y).collect()

[('a', (1, 2)), ('a', (1, 3))]

### leftOuterJoin():左外连接

`leftOuterJoin(other, numPartitions=None)`:Perform a left outer join of self and other.

    For each element (k, v) in self, the resulting RDD will either contain all pairs (k, (v, w)) for w in other, or the pair (k, (v, None)) if no elements in other have key k.

    Hash-partitions the resulting RDD into the given number of partitions.

In [59]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2),("c", 2)])
x.leftOuterJoin(y).collect()

[('b', (4, None)), ('a', (1, 2))]

### rightOuterJoin()::右外连接

`rightOuterJoin(other, numPartitions=None)`:Perform a right outer join of self and other.

    For each element (k, w) in other, the resulting RDD will either contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) if no elements in self have key k.

    Hash-partitions the resulting RDD into the given number of partitions.

In [60]:
x.rightOuterJoin(y).collect()

[('c', (None, 2)), ('a', (1, 2))]

### fullOuterJoin():全外连接

`fullOuterJoin(other, numPartitions=None)`:Perform a right outer join of self and other.

    For each element (k, v) in self, the resulting RDD will either contain all pairs (k, (v, w)) for w in other, or the pair (k, (v, None)) if no elements in other have key k.

    Similarly, for each element (k, w) in other, the resulting RDD will either contain all pairs (k, (v, w)) for v in self, or the pair (k, (None, w)) if no elements in self have key k.

    Hash-partitions the resulting RDD into the given number of partitions.

In [61]:
x.fullOuterJoin(y).collect()

[('c', (None, 2)), ('b', (4, None)), ('a', (1, 2))]

## 并行度调优

每个 RDD 都有固定数目的分区，分区数决定了 RDD 上执行操作时的并行度。

在执行聚合或分组操作时，可以要求 Spark 使用给定的分区数。 Spark 始终尝试根据集群的大小推断出一个有意义的默认值，但是有时候需要自己来对并行度进行调优来获取更好的性能表现。

很多 RDD 操作都有类似 `numPartitions` 的参数用来指定分组结果或聚合结果的 RDD 的分区数，比如上面的 `reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash>)`

### getNumPartitions()

`getNumPartitions()`:Returns the number of partitions in RDD

In [62]:
sc.parallelize([1,2,3,4],3).getNumPartitions()

3

### coalesce()

`coalesce(numPartitions, shuffle=False)`:Return a new RDD that is reduced into numPartitions partitions.

In [63]:
sc.parallelize([1,2,3,4],3).coalesce(4,False).getNumPartitions()

3

### repartition()

`repartition(numPartitions)`:Return a new RDD that has exactly numPartitions partitions.

    Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a shuffle.

`reparatition()` 会把数据通过网络进行混洗，并创建出新的分区集合。**切记，对数据进行重新分区是代价相对比较大的操作。** Spark 有一个优化版的 `reparatition()` 叫作 `coalesce()`，如上所示。

从源码中可以看出repartition方法其实就是调用了coalesce方法中shuffle为true的情况(默认shuffle是fasle).

In [64]:
sc.parallelize([1,2,3,4],3).repartition(2).getNumPartitions()

2

### repartitionAndSortWithinPartitions()

`repartitionAndSortWithinPartitions(numPartitions=None, partitionFunc=<function portable_hash>, ascending=True, keyfunc=<function RDD.<lambda>>)`:

    Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys.

In [65]:
rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])
rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, True)
rdd2.glom().collect()

[[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]

# Pair RDD 的行动操作

和转化操作一样，支持普通 RDD 的行动操作也支持 Pair RDD，也有独属于Pair RDD 的行动操作。

### countByKey():

`countByKey()`:Count the number of elements for each key, and return the result to the master as a dictionary.

In [66]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.countByKey()

defaultdict(int, {'a': 2, 'b': 1})

In [67]:
rdd.countByKey()['a']

2

### collectAsMap()

`collectAsMap()`:Return the key-value pairs in this RDD to the master as a dictionary.

**Note:**
   
    this method should only be used if the resulting data is expected to be small, as all the data is loaded into the driver’s memory.

In [68]:
m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
m

{1: 2, 3: 4}

### lookup()

`lookup(key)`:Return the list of values in the RDD for key _key_. This operation is done efficiently if the RDD has a known partitioner by only searching the partition that the key maps to.

In [69]:
l = range(1000)
rdd = sc.parallelize(zip(l, l), 10)
rdd.lookup(42)

[42]

# 数据分区

在分布式程序中，通信的代价很大，因此控制数据分布来获得最少的网络传输可以极大提升整体性能。Spark 可以通过控制 RDD 分区方式来减少通信开销。分区并不是对所有应用有好处的，如果有些 RDD 只需要被扫描一次，我们完全没必要对其预先进行分区处理。**只有当数据集多次使用时，分区才会有帮助。**

## 获取 RDD 的分区方式

Python API 没有提供查询分区方式的方法，但是 Spark 内部仍然会利用已有的分区消息。

## 从分区中获利的操作

Spark 许多操作都引入了将数据根据键跨节点进行混洗的过程，所有这些操作都会从数据分区中获益。就 Spark 1.0 而言，能够从数据分区获益的操作有：
`cogroup()`、`groupWith()`、`join()`、`leftOuterJoin()`、`rightOuterJoin()`、`groupByKey()`、`reduceByKey()`、`combineByKey()`、`lookup()`。

按照操作涉及到的RDD数量来对比**网络开销**：
* 单个RDD的操作：对于像 `reduceByKey()` 这样只作用于单个 RDD 的操作，运行在未分区的 RDD 上会导致每个键的所有对应值都在每台机器上进行**本地计算**，只需要把本地最终归约出的结果值从各个工作节点传回主节点，所以原本的网络开销就不大；


* 二元操作：对于像 `cogroup()`、`join()`这样的二元操作，预先进行数据分区会导致其中至少一个 RDD(使用已知分区器的那个RDD)不发生数据混洗。

    如果两个 RDD 使用**同样**的分区方式，并且缓存在同样的机器上，或者其中一个 RDD 还没有被计算出来，那么跨节点的数据混洗就不会发生了。

**具体原理待补充**

## 影响分区方式的操作

下面列出了所有会为生成的结果 RDD 设好分区方式的操作：`cogroup()`、`groupWith()`、`join()`、`leftOuterJoin()`、`rightOuterJoin()`、`groupByKey()`、`reduceByKey()`、`combineByKey()`、`partitionby()`、`sort()`、`mapValues()`(如果父 RDD 有分区方式的话)、`flatMapValues()`(如果父 RDD 有分区方式的话)、以及 `filter()`(如果父 RDD 有分区方式的话);

其他操作生成的结果不会存在特定的分区方式。


对于二元操作，输出数据的分区方式取决于父 RDD 的分区方式，默认情况下，结果会采用哈希分区，分区的数量和操作的并行度一样。不过，如果其中一个父RDD 已经设置过分区方式，结果就会采用那种分区方式；如果两个父 RDD 都设置过分区方式，结果 RDD 会采用第一个父 RDD 的分区方式。

## 自定义分区方式

在 Python 中，不需要扩展 Partitioner 类，而是把一个特定的哈希函数作为一个额外的参数传递给 `RDD.partitionBy()` 函数。

注意，这里你所传过去的哈希函数会被与其他 RDD 的分区函数区分开来。如果你想要对多个 RDD 使用相同的分区方式，就应该使用同一个函数对象，比如一个全局函数，而不是为每个 RDD 创建一个新的函数对象。

### partitionBy()

`partitionBy(numPartitions, partitionFunc=<function portable_hash>)`:Return a copy of the RDD partitioned using the specified partitioner.

In [73]:
## 对 numPartitions 取余，按照余数作为 key 来分区
pairs = sc.parallelize([1,2,3,4,2,4,1]).map(lambda x: (x, x))
pairs.partitionBy(3).glom().collect()

[[(3, 3)], [(1, 1), (4, 4), (4, 4), (1, 1)], [(2, 2), (2, 2)]]

In [74]:
try:
    sc.stop()
except:
     print('there is no sparkcntext on running') 