# RDD实验报告

pyspark官方文档:http://spark.apache.org/docs/latest/api/python/

## 1.RDD简介

  Resilient Distributed Datasets，弹性分布式数据集分布在不同的集群节点的内存中。RDD就像一个数组,则数组的每一个元素就是RDD的一个分区,一个RDD可以分布并被运算在多态计算机节点的内存以及硬盘中。


  RDD数据块可以放在磁盘上也可以放在内存中(取决于你的设置),如果出现缓冲失效或丢失,RDD分区可以重新计算刷新,RDD是不能被修改的(只读)但是可以通过API被变换生成新的RDD。

                
  它的运行原理和mapreduce是一样的,只是他们的运行方式不同,mr的运算是内存磁盘交互读写,不能在内存中共享数据,而RDD可以被共享和持久化.因为大数据运算经常是交互式和迭代式的,所以数据的重用性很重要,而mr的磁盘交互读写带来的I/O开销导致数度减慢。


关于spark架构及工作流程推荐阅读:

https://blog.csdn.net/zhumr/article/details/52518506

http://dblab.xmu.edu.cn/blog/1709-2/

pyspark里最核心的模块是SparkContext（简称sc）,最重要的数据载体是RDD。RDD就像一个NumPy array或者一个Pandas Series，可以视作一个有序的item集合。只不过这些item并不存在driver端的内存里，而是被分割成很多个partitions，每个partition的数据存在集群的executor的内存中。

pyspark其实相当于spark应用的driver程序,当获得SparkContext object 即sc ,driver就可以访问spark了。**因此sc可以看成是driver对计算机集群的连接。**


In [1]:
from pyspark import SparkConf
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate(SparkConf())

注:
1.如果直接from pyspark import SparkContext as sc 的话会报错，见 https://stackoverflow.com/questions/47665491/pyspark-throws-typeerror-textfile-missing-1-required-positional-argument-na

2.安装的pyspark需要与spark版本匹配(spark需要与hadoop版本匹配)，见https://stackoverflow.com/questions/54988403/pyspark-method-isbarrier-does-not-exist

In [2]:
lines = sc.textFile("README.md")
lines

README.md MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

那么问题来了，这里的"README.md"是当前目录下的文件么?

我们通过lines.first()或lines.collect()可以发现并不存在README.md这个文件，即lines创建的实例其实类似建立一个指针并没有真正载入。

在定义RDD时，spark并没有将数据立即加入内存，而是了解完成整个操作链之后，只获取所需的数据即可。默认情况下，spark每次运行进行行动操作时，都会对RDD进行重新计算。

为了后续作业，我们将spark目录下的"README.md"复制到当前目录，重命名为"README0.md"。如果你不知道当前目录os.getcwd()可以帮助你。

In [3]:
lines = sc.textFile("README0.md")
lines

README0.md MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [4]:
lines.first()

'# Apache Spark'

根据官方文档，在pyspark里RDD没有contains函数。思考了一下scala里的

lines.filter(line => line.contains("Python"))

可以如下改写

In [5]:
pythonLines = lines.filter(lambda x:x.find("Python")!=-1)
pythonLines

PythonRDD[5] at RDD at PythonRDD.scala:52

filter()是一个**转换操作**，转换已有的RDD实例lines，生成新的RDD实例pythonLines，在此过程中，spark并没有进行实际的计算，称为惰性计算，只有第一次在一个行动操作时，才会计算。

In [6]:
lines

README0.md MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [7]:
pythonLines

PythonRDD[5] at RDD at PythonRDD.scala:52

可以看到lines和pythonLines因为生成的方式不同，在内存中被标记为不同的RDD。

In [8]:
pythonLines.collect()

['high-level APIs in Scala, Java, Python, and R, and an optimized engine that',
 '## Interactive Python Shell',
 'Alternatively, if you prefer Python, you can use the Python shell:']

In [9]:
pythonLines

PythonRDD[5] at RDD at PythonRDD.scala:52

## 2.RDD创建

有两种类型的RDD:

       1. 并行集合:来自与分布式化的数据对象,比如我们上面的代码,python里面的list对象,再比如用户自己键入的数据

        并行化RDD就是通过调用sc的parallelize方法,在一个已经存在的数据集合上创建的(一个Seq对象),集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集,比如上面的代码,演示了如何python中的list创建一个并行集合,并进行分行。

       2. 文件系统数据集读取数据

        spark可以将任何hadoop所支持的存储资源转换称RDD,如本地文件(语言网络文件系统),索引的节点都必须能访问到,HDFS,mongodb,HBase,等。


### 2.1parallelize()

parallelize() 是创建RDD最简单的方法，通过程序中已有的数据集合来创建RDD。

使用sc.parallelize可以把Python list，NumPy array或者Pandas Series,Pandas DataFrame转成Spark RDD。

In [10]:
List = ["meituan","dianping"]
lines = sc.parallelize(List)
lines

ParallelCollectionRDD[6] at parallelize at PythonRDD.scala:194

### 2.2textFile()

textFile() 用于从一个文本文件中读入数据，并创建一个存储字符串的RDD。


In [11]:
inputRDD = sc.textFile("log.txt")
inputRDD

log.txt MapPartitionsRDD[8] at textFile at NativeMethodAccessorImpl.java:0

文本的每一行都会被当做一个item。甚至可以sc.wholeTextFiles读入整个文件夹的所有文件。但是要特别注意，这种读法，RDD中的每个item实际上是一个形如(文件名，文件所有内容)的元组。读入整个文件夹的所有文件。

In [12]:
import os
cwd=os.getcwd()
rdd = sc.wholeTextFiles(cwd)
rdd

org.apache.spark.api.java.JavaPairRDD@649bdfee

In [13]:
rdd.first()

('file:/C:/Users/tianchen/Desktop/大数据/周平/RDD/log.txt',

## 3.RDD操作

有两类对RDD的操作:

        1.转换操作(懒执行): 有 map flatMap groupByKey reduceByKey 等，他们只是将一些指令集而不会马上执行,需要有操作的时候才会真正计算出结果。简而言之通过一个RDD，生成一个新的RDD。

        2.行动操作(立即执行): 有 count take collect 等，他们会返回结果,或者把RDD数据输出。简而言之对RDD计算出一个结果，将结果返回驱动程序或者写入外部存储设备（例如hdfs)。

例:map操作

In [14]:
List = [1,2,3,4,5,6,7,8,9,10]
listRDD = sc.parallelize(List)
listRDD

ParallelCollectionRDD[12] at parallelize at PythonRDD.scala:194

In [15]:
retRDD = listRDD.map(lambda x:x*7)
retRDD

PythonRDD[13] at RDD at PythonRDD.scala:52

In [16]:
retRDD.collect()

[7, 14, 21, 28, 35, 42, 49, 56, 63, 70]

In [17]:
retRDD

PythonRDD[13] at RDD at PythonRDD.scala:52

### 3.1.1filter()

接单元格 In  [11]

In [18]:
errorLines = inputRDD.filter(lambda x:x.find("error")!=-1)
errorLines

PythonRDD[14] at RDD at PythonRDD.scala:52

In [19]:
errorLines.count()

3

In [20]:
errorLines.collect()

['This is the first error.',
 'This is the second error.',

In [21]:
errorLines

PythonRDD[14] at RDD at PythonRDD.scala:52

In [22]:
retRDD = listRDD.filter(lambda x:x% 2 == 0)
retRDD

PythonRDD[16] at RDD at PythonRDD.scala:52

In [23]:
retRDD.collect()

[2, 4, 6, 8, 10]

In [24]:
retRDD

PythonRDD[16] at RDD at PythonRDD.scala:52

### 3.1.2union()

union() 是另一个转换函数，可以合并两个RDD，合并后有重复元素,即并集。
接单元格In  [11]

In [25]:
warningLines = inputRDD.filter(lambda x:x.find("warning")!=-1)
warningLines

PythonRDD[17] at RDD at PythonRDD.scala:52

In [26]:
warningLines.collect()



In [27]:
errorLines

PythonRDD[14] at RDD at PythonRDD.scala:52

In [28]:
badLines = errorLines.union(warningLines)
badLines

UnionRDD[18] at union at NativeMethodAccessorImpl.java:0

In [29]:
badLines.collect()

['This is the first error.',
 'This is the second error.',

可以看到结果中有重复元素

In [30]:
badLines

UnionRDD[18] at union at NativeMethodAccessorImpl.java:0

### 3.2行动操作

行动操作会将结果返回驱动程序，或者写入外部存储设备。注意：每调用一次新的行动操作，RDD都会重新计算，除非将中间结果持久化。

### 3.2.1. count()

见单元格In  [19]

### 3.2.2. first()

见单元格In  [4]

### 3.2.3. take()

In [31]:
errorLines

PythonRDD[14] at RDD at PythonRDD.scala:52

In [32]:
errorLines.collect()

['This is the first error.',
 'This is the second error.',

In [33]:
errorLines.take(2)

['This is the first error.', 'This is the second error.']

In [34]:
type(errorLines.take(2))

list

### 3.2.4. collect()

见单元格In  [8]

### 3.3. 惰性求值

惰性求值是指转化操作并没有立即执行，类似sc.textFile() 这样的读操作也是惰性的。实际上，我们应该把RDD看成通过转化操作构建出来的、记录如何进行计算的指令集。

## 4. 常见的RDD转化操作

### 4.1 map()

见单元格In  [15]

### 4.2. filter()

见单元格In  [11]

### 4.3. flatMap()

flatMap() 接收一个函数作为参数，该函数将每个元素转为一个列表，最终flatMap()并不是返回由上述列表作为元素组成的RDD，而是返回一个包含每个列表所有元素的RDD。

In [35]:
lines = sc.parallelize(["hello world", "hello spark"])
lines

ParallelCollectionRDD[21] at parallelize at PythonRDD.scala:194

In [36]:
tem = lines.flatMap(lambda x:x.split(" "))
tem

PythonRDD[22] at RDD at PythonRDD.scala:52

In [37]:
tem.collect()

['hello', 'world', 'hello', 'spark']

In [38]:
tem

PythonRDD[22] at RDD at PythonRDD.scala:52

In [39]:
tem = lines.map(lambda x:x.split(" "))
tem

PythonRDD[23] at RDD at PythonRDD.scala:52

In [40]:
tem.collect()

[['hello', 'world'], ['hello', 'spark']]

以上可见flatMap和map区别。

### 4.4.1. distinct()

distinct() 函数可以去除RDD中的重复元素。但该操作需要在全网范围内进行混洗，操作开销较大

In [41]:
data1 = sc.parallelize([1, 2, 3, 3])
data2 = sc.parallelize([2, 2, 3, 4, 4, 5])

In [42]:
data1.distinct().collect()

[1, 2, 3]

### 4.4.2. union()

见单元格In  [28]

###  4.4.3. 伪集合操作之intersection()

intersection() 函数用于求两个RDD中都有的元素，即求交集。注意：该函数会去掉重复的元素，单个RDD中的重复元素也会删除，因此需要数据混洗。

In [43]:
data1.intersection(data2).collect()

[2, 3]

###  4.4.4. 伪集合操作之substract()

substract() 用于求位于第一个RDD中而不位于第二个RDD中的元素，即求差集。注意：该函数会混洗、去重。

### 4.4.5. 伪集合操作之cartesian()

cartesian() 函数返回所有可能的(a, b)对，其中，a来自第一个RDD，b来自第二个RDD，即求笛卡尔乘积.

In [44]:
data1.cartesian(data2).collect()

[(1, 2),
 (1, 2),
 (1, 3),
 (1, 4),
 (1, 4),
 (1, 5),
 (2, 2),
 (2, 2),
 (2, 3),
 (2, 4),
 (2, 4),
 (2, 5),
 (3, 2),
 (3, 2),
 (3, 3),
 (3, 4),
 (3, 4),
 (3, 5),
 (3, 2),
 (3, 2),
 (3, 3),
 (3, 4),
 (3, 4),
 (3, 5)]

## 5. 常见的RDD行动操作

### 5.1. reduce() 函数 

reduce() 接收一个函数作为参数，该函数以两个相同类型的元素作为输入，并返回一个同类型的元素，用于求和等。

In [45]:
data = sc.parallelize([1, 2, 3, 4])
data

ParallelCollectionRDD[40] at parallelize at PythonRDD.scala:194

In [46]:
sum = data.reduce(lambda x, y : int(x) + int(y))
sum

10

In [47]:
type(sum)

int

### 5.2. fold() 函数 

fold() 接收一个与reduce() 接收的函数签名相同的函数，再加上一个“初始值”来作为每个分区第一次调用的结果。

In [48]:
from operator import add
sum = data.fold(1, add)
sum

23

In [49]:
data = sc.parallelize([1, 2, 3, 4],2)
data

ParallelCollectionRDD[43] at parallelize at PythonRDD.scala:194

In [50]:
data.glom().collect()

[[1, 2], [3, 4]]

In [51]:
sum = data.fold(1, add)
sum

13

In [52]:
sum = data.fold(2, add)
sum

16

In [53]:
data = sc.parallelize([1, 2, 3, 4],3)
data

ParallelCollectionRDD[47] at parallelize at PythonRDD.scala:194

In [54]:
data.glom().collect()

[[1], [2], [3, 4]]

In [55]:
sum = data.fold(1, add)
sum

14

In [56]:
sum = data.fold(2, add)
sum

18

In [57]:
data = sc.parallelize([1, 2, 3, 4],5)
data

ParallelCollectionRDD[51] at parallelize at PythonRDD.scala:194

In [58]:
data.glom().collect()

[[], [1], [2], [3], [4]]

In [59]:
sum = data.fold(1, add)
sum

16

In [60]:
sum = data.fold(2, add)
sum

22

In [61]:
sum = data.fold(3, add)
sum

28

根据以上可以看出几个规律，
       
       一是fold得到的值和分区数有关，并且每个分区都会加上初始值并且最终求和时候会再加一次初始值。如上行代码28=5*3+1+2+3+4+3。
         
       二是如果不设置分区数，系统默认会安排23-10-1=12个分区，碰巧当前设备就是12核cpu。
       
 
 关于pyspark中fold()函数的源码及讨论，见:
 
 http://spark.apache.org/docs/latest/api/python/_modules/pyspark/rdd.html#RDD.fold
 
 https://stackoverflow.com/questions/29150202/pyspark-fold-method-output
        


### 5.2. aggregate() 函数

aggregate与fold相似又很不同。
seqOp操作会聚合各分区中的元素，然后combOp操作把所有分区的聚合结果再次聚合，两个操作的初始值都是zeroValue. seqOp的操作是遍历分区中的所有元素(T)，zeroValue跟第一个T做操作，结果再作为与第二个T做操作，直到遍历完整个分区。combOp操作是把各分区聚合的结果，再聚合,zeroValue与第一个分区结果聚合，聚合结果相当于新的zeroValue，再与第二个分区结果聚合，一直进行下去。aggregate函数返回一个跟RDD不同类型的值。因此，需要一个操作seqOp来把分区中的元素T合并成一个U，另外一个操作combOp把所有U聚合。

In [62]:
seqOp=(lambda x,y:(x[0]+y,x[1]+1))
combOp=(lambda x,y:(x[0]+y[0],x[1]+y[1]))
x=sc.parallelize([1,2,3,4,5,6],2)
y=x.aggregate((1,2),seqOp,combOp)
print(y)

(24, 12)


seqOp的操作是遍历分区中的所有元素(T):

在分区[1，2，3]中，zeroValue跟第一个T做操作:zeroValue:x[0]=1,x[1]=2;   T:y=1;    (1,2) -> (1+1，2+1) -> (2,3)
 
            结果再作为与第二个T做操作:zeroValue:x[0]=2,x[1]=3;   T:y=2,    (2,3)->  (2+2,3+1)  -> (4,4) 
            
            结果再作为与第三个T做操作:zeroValue:x[0]=4,x[1]=4;   T:y=3,    (4,4)->  (4+3,4+1)  -> (7,5) 

在分区[4，5，6]中，zeroValue跟第一个T做操作:zeroValue:x[0]=1,x[1]=2;   T:y=4;    (1,2) -> (1+4，2+1) -> (5,3)
 
            结果再作为与第二个T做操作:zeroValue:x[0]=5,x[1]=3;   T:y=5,    (5,3)->  (5+5,3+1)  -> (10,4) 
            
            结果再作为与第三个T做操作:zeroValue:x[0]=10,x[1]=4;   T:y=6,   (10,4)-> (10+6,4+1) -> (16,5) 
            
combOp操作是把各分区聚合,,zeroValue与第一个分区结果聚合，聚合结果相当于新的zeroValue:

            zeroValue:x[0]=1,x[1]=2;第一分区结果(7,5)，y[0]=7，y[1]=5。
            
            (1,2) -> (1+7,2+5) -> (8，7)
            
            再与第二个分区结果聚合:
            
            zeroValue:x[0]=8,x[1]=7;第二分区结果(16,5)，y[0]=16，y[1]=5。
            
            (8,7) -> (8+16,7+5) -> (24，12)
            

## RDD练习

### 1.flatMap

In [63]:
List = ["hello you", "hello he", "hello me"]
listRDD = sc.parallelize(List)
wordsRDD = listRDD.flatMap(lambda x:x.split(" "))
for i in wordsRDD.collect():print(i)

hello
you
hello
he
hello
me


### 2.sample

In [64]:
List=range(1,101)
listRDD = sc.parallelize(List)
sampleRDD = listRDD.sample(False, 0.2)
print("sampleRDD count: " + str(sampleRDD.count()))
print("Another sampleRDD count: " + str(sc.parallelize(List).sample(False, 0.2).count()))


sampleRDD count: 25
Another sampleRDD count: 18


### 3.union

In [65]:
list1 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
list2 = [7, 8, 9, 10, 11, 12]
listRDD1 = sc.parallelize(list1)
listRDD2 = sc.parallelize(list2)
unionRDD = listRDD1.union(listRDD2)
for i in unionRDD.collect():print(i)

1
2
3
4
5
6
7
8
9
10
7
8
9
10
11
12


### 4.groupByKey

In [66]:
List = ["hello you", "hello he", "hello me"]
listRDD = sc.parallelize(List)
wordsRDD = listRDD.flatMap(lambda x:x.split(" "))
pairsRDD = wordsRDD.map(lambda x: (x, 1))
for i in pairsRDD.collect():print(i)
gbkRDD= pairsRDD.groupByKey()
print("=============================================")
for i in gbkRDD.collect():print(str(i[0])+"..."+str(i[1]))

('hello', 1)
('you', 1)
('hello', 1)
('he', 1)
('hello', 1)
('me', 1)
hello...<pyspark.resultiterable.ResultIterable object at 0x00000169CC6F8B38>
you...<pyspark.resultiterable.ResultIterable object at 0x00000169CC6F8588>
he...<pyspark.resultiterable.ResultIterable object at 0x00000169CC6F8470>
me...<pyspark.resultiterable.ResultIterable object at 0x00000169CC6F8DD8>


### 5.reduceByKey

group by key 和 reduce by key在底层的区别:
    
https://blog.csdn.net/sf_zhang26/article/details/72870930

In [67]:
retRDD = pairsRDD.reduceByKey((lambda x,y:x+y))
for i in retRDD.collect():print(str(i[0])+"..."+str(i[1]))

hello...3
you...1
he...1
me...1


### 6.sortByKey

In [68]:
List=[
    "1,李  磊,22,175",
    "2,刘银鹏,23,175",
    "3,齐彦鹏,22,180",
     "4,杨  柳,22,168",
     "5,敦  鹏,20,175"
]
listRDD= sc.parallelize(List)
listRDD

ParallelCollectionRDD[79] at parallelize at PythonRDD.scala:194

In [69]:
heightRDD =listRDD.map(lambda x:(x.split(",")[3],x))
heightRDD

PythonRDD[80] at RDD at PythonRDD.scala:52

In [70]:
heightRDD.collect()

[('175', '1,李  磊,22,175'),
 ('175', '2,刘银鹏,23,175'),
 ('180', '3,齐彦鹏,22,180'),
 ('168', '4,杨  柳,22,168'),
 ('175', '5,敦  鹏,20,175')]

In [71]:
retRDD=heightRDD.sortByKey(ascending = False, numPartitions = 1)   # 需要设置1个分区，否则只是各分区内有序
retRDD

PythonRDD[82] at RDD at PythonRDD.scala:52

In [72]:
for i in retRDD.collect():print(i)

('180', '3,齐彦鹏,22,180')
('175', '1,李  磊,22,175')
('175', '2,刘银鹏,23,175')
('175', '5,敦  鹏,20,175')
('168', '4,杨  柳,22,168')


### 6.combineByKey与aggregateByKey

In [73]:
List = ["hello bo bo", "zhou xin xin", "hello song bo"]
lineRDD = sc.parallelize(List)
wordsRDD = lineRDD.flatMap(lambda x:x.split(" "))
pairsRDD = wordsRDD.map(lambda x: (x, 1))
pairsRDD

PythonRDD[84] at RDD at PythonRDD.scala:52

In [74]:
pairsRDD.collect()

[('hello', 1),
 ('bo', 1),
 ('bo', 1),
 ('zhou', 1),
 ('xin', 1),
 ('xin', 1),
 ('hello', 1),
 ('song', 1),
 ('bo', 1)]

主要有三个参数需要自己实现，分别是

createCombiner：实现输入RDD[(K,V)]中V到结果RDD[(K,C)]中C的转换， V和C可能是相同类型，也可能是不同类型，它会创建一个元素列表。

mergeValue：将V合并到C中，它会将当前值添加到元素列表的末尾。

mergeCombiners：对mergeValue产生的C进一步合并，即是reduce操作，它会将两个C合并到一起。

In [75]:
createCombiner = (lambda x: x)
 
mergeVal = (lambda x, y : x + y)
mergeComb = (lambda x, y : x + y)
 
retRDD=pairsRDD.combineByKey(createCombiner, mergeVal, mergeComb)
retRDD.collect()

[('hello', 2), ('bo', 3), ('song', 1), ('zhou', 1), ('xin', 2)]

combineByKey的处理流程如下：


遍历RDD[(K,V)]中每一个元素


1、如果当前K是一个新元素，使用createCombiner()创建K为键的累加器初始值，生成列表('hello', 1),('bo', 1)。


2、如果当前K已经遇到过，使用mergeValue()将当前(K,V)合并进第1步生成的累加器列表,，

      生成[('hello',(1)),('hello',(1)),('bo', 1),('bo', 1),('bo', 1),('song', 1), ('zhou', 1), ('xin', 1),('xin', 1)]

      否则执行第1步
      

3、将相同键值的累加器进行合并，得到[('hello', 2), ('bo', 3), ('song', 1), ('zhou', 1), ('xin', 2)]


因此得到如下结果：

[('hello', 2), ('bo', 3), ('song', 1), ('zhou', 1), ('xin', 2)]


In [76]:
createCombiner = 0
 
mergeVal = (lambda x, y : x + y)
mergeComb = (lambda x, y : x + y)
 
retRDD=pairsRDD.aggregateByKey(createCombiner, mergeVal, mergeComb)
retRDD.collect()

[('hello', 2), ('bo', 3), ('song', 1), ('zhou', 1), ('xin', 2)]

aggregate通过提供零值的方式，避免了combineByKey中的createCombiner步骤(createCombiner本质工作就是遇到第一个key时进行初始化操作，这个初始化不是提供零值，而是对第一个(k,v)进行转换得到c的初始值）。

V和C不同类型的映射见:https://blog.csdn.net/hit0803107/article/details/52808986