弹性分布式数据集（Resient Distributed Dataset,RDD）,不可变的分布式对象集合
<br> 1 每个RDD都被分为多个分区，这些分区运行在不同节点上。
<br> 2 RDD可以包含Python,Java,Scala中任意类型的对象，甚至是用户自定义的对象。
<br>  Spark中所有对数据的操作：
<br> 1 创建RDD
<br> 2 转化已有RDD,定义新的RDD
<br> 3 对需要重用的中间结果RDD执行persist()操作
<br> 4 调用RDD行动操作进行一次并行计算求值

创建RDD
    <br> 1 读取外部数据集
    <br> 2 驱动器程序中对一个对象集合进行并行化

In [1]:
#引入并初始化pyspark
import findspark
findspark.init()

In [2]:
#python中初始化spark
from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster("local").setAppName("my app")#集群的URL，应用名
sc = SparkContext(conf = conf)

In [3]:
#创建一个名为README.MD的RDD，弹性分布式数据集
lines = sc.textFile("README.MD")

创建出来的RDD支持两种类型的操作：
<br> 转化操作(transformation)，由一个RDD生成一个新的RDD，比如map(),filter()
<br> 行动操作(action)，对RDD计算出一个结果，会触发实际计算，比如count(),first()

In [5]:
#转化操作(transformation)
dayLines = lines.filter(lambda line: "day" in line)#filter传入的是一个函数
dayLines

PythonRDD[2] at RDD at PythonRDD.scala:49

In [6]:
# 行动操作(action)
dayLines.first()

u'The baby eagle liked the nest. It was the only world he had ever known. It was warm and comfortable, had a great view, and even better, he had all the food and love and attention that a great mother eagle could provide. Many times each day the mother would swoop2 down from the sky and land in the nest and feed the baby eagle delicious morsels3 of food. She was like a god to him, he had no idea where she came from or how she worked her magic.'

Spark是惰性计算RDD，只有第一次行动操作中到时，才会真正计算（节省存储空间）
<br>  默认情况下，Spark的RDD会在每次进行进行操作时重新计算，如果想要多个行动操作重用同一个RDD，则可以使用RDD.persist()将该RDD缓存下来（磁盘或内存中）

In [8]:
#把RDD持久化到内存中
dayLines.persist()
print dayLines.count()
print dayLines.first()

4
The baby eagle liked the nest. It was the only world he had ever known. It was warm and comfortable, had a great view, and even better, he had all the food and love and attention that a great mother eagle could provide. Many times each day the mother would swoop2 down from the sky and land in the nest and feed the baby eagle delicious morsels3 of food. She was like a god to him, he had no idea where she came from or how she worked her magic.


In [9]:
#SparkContext.parallelize()方法创建RDD，只在开发原型和测试时使用，需要将数据都放在一台机器内存中
lines = sc.parallelize(["pandas","i like pandas"])
lines

ParallelCollectionRDD[7] at parallelize at PythonRDD.scala:184

In [12]:
for dayline in dayLines.take(5):#RDD.take(5)只返回5个元素
    print dayline
#RDD.collect()返回RDD所有元素
dayLines.collect()

The baby eagle liked the nest. It was the only world he had ever known. It was warm and comfortable, had a great view, and even better, he had all the food and love and attention that a great mother eagle could provide. Many times each day the mother would swoop2 down from the sky and land in the nest and feed the baby eagle delicious morsels3 of food. She was like a god to him, he had no idea where she came from or how she worked her magic.
Until one day, the mother stopped coming to the nest.
Then one day the mother eagle appeared at the top of the mountain cliff, with a big bowl of delicious food and she looked down at her baby. The baby looked up at the mother and cried "Why did you abandon me? I'm going to die any minute. How could you do this to me?"
A few days later, "I'm going to end it all," he said. "I give up. It is time for me to die."


[u'The baby eagle liked the nest. It was the only world he had ever known. It was warm and comfortable, had a great view, and even better, he had all the food and love and attention that a great mother eagle could provide. Many times each day the mother would swoop2 down from the sky and land in the nest and feed the baby eagle delicious morsels3 of food. She was like a god to him, he had no idea where she came from or how she worked her magic.',
 u'Until one day, the mother stopped coming to the nest.',
 u'Then one day the mother eagle appeared at the top of the mountain cliff, with a big bowl of delicious food and she looked down at her baby. The baby looked up at the mother and cried "Why did you abandon me? I\'m going to die any minute. How could you do this to me?"',
 u'A few days later, "I\'m going to end it all," he said. "I give up. It is time for me to die."']

3.4向Spark传递函数

In [16]:
print "lines:",lines.collect()
filterLines1 = lines.filter(lambda x: "pandas" in x)
print "filterLines1:",filterLines1.collect()
def containPandas(x):
    return "pandas" in x
filterLines2 = lines.filter(containPandas)
print "filterLines2:",filterLines2.collect()


lines: ['pandas', 'i like pandas']
filterLines1: ['pandas', 'i like pandas']
filterLines2: ['pandas', 'i like pandas']


3.5常见转化操作和行动操作

3.5.1不限定RDD类型的操作

3.5.1 (1)元素级转化操作
<br> map 将函数应用到RDD中的每个元素，将返回值构成新的RDD
<br> filter 将函数应用到RDD中的每个元素，将返回迭代器中的所有内容构成新的RDD
<br> flatMap 将函数应用于RDD的每个元素，通过filter函数的返回元素组成RDD

In [20]:
rddTest = sc.parallelize([1,2,3,4])
print rddTest.collect()
rddTestMap = rddTest.map(lambda x: x**2)
print rddTestMap.collect()
rddTestFilter = rddTest.filter(lambda x: x>2)
print rddTestFilter.collect()

[1, 2, 3, 4]
[1, 4, 9, 16]
[3, 4]


In [23]:
# flatMap()输入的每一个元素，输出多个元素，常用于字符串分隔
rddTest2 = sc.parallelize(["hello world","yeah"])
rddTest2.collect()
rddFlatFlatMap = rddTest2.flatMap(lambda line:line.split())
rddFlatFlatMap.collect()

['hello', 'world', 'yeah']

3.5.1 (2)伪集合操作

<br> distinct去重
<br> union并集
<br> intersection交集
<br> subtract减
<br> cartesian笛卡尔积

In [29]:
rddTest3 = sc.parallelize([1,2,3,3])
rddTest4 = sc.parallelize([3,4,5])
rddTest5 = rddTest3.distinct()
print "rddTest5:",rddTest5.collect()
print rddTest3.union(rddTest4).collect()
print rddTest3.intersection(rddTest4).collect()
print rddTest3.subtract(rddTest4).collect()
print rddTest3.cartesian(rddTest4).collect()

rddTest5: [1, 2, 3]
[1, 2, 3, 3, 3, 4, 5]
[3]
[2, 1]
[(1, 3), (1, 4), (1, 5), (2, 3), (2, 4), (2, 5), (3, 3), (3, 4), (3, 5), (3, 3), (3, 4), (3, 5)]


3.5.1 (3)行动操作
<br> collect() 返回RDD中所有元素
<br> count() RDD中元素的个数
<br> countByValue() RDD中各元素出现的次数
<br> take(num) 从RDD中返回num个元素
<br> top(num) 从RDD中返回最前面的num个元素
<br> takeSample(num) 从RDD中返回任意一些元素
<br> reduce() 并行整合RDD中所有元素，类似于sum()
<br> fold()与reduce()类似，区别在于每个分区第一次调用时加上个初始值，作为结果
<br> foreach()，对RDD中所有元素使用给定的函数，但是无返回值；将数据存到数据库，发送数据等

In [None]:
rddTest6 = sc.parallelize([1,2,3,3])
print rddTest6.collect()
print rddTest6.count()
print rddTest6.countByValue()
print rddTest6.take(2)
print rddTest6.top(2)
print rddTest6.takeSample(False,1)

In [55]:
func1 = lambda x: x+1
rddTest6.foreach(func1)

In [40]:
# reduce()接收一个函数作为参数，此函数操作两个相同元素类型的RDD数据，并返回一个同样类型的新元素
sum = rddTest6.reduce(lambda x,y: x + y)
print sum
#fold()与reduce()类似，区别在于每个分区第一次调用时加上个初始值，作为结果
sum2 = rddTest6.fold(1,lambda x,y: x + y)
print sum2

9
11


In [48]:
#map,reduce组合方式求列表平均值
rddTest7 = rddTest6.map(lambda x:(x,1))
print rddTest7.collect()
turple_result = rddTest7.reduce(lambda x,y:(x[0]+y[0],x[1]+y[1]))
print turple_result
from __future__ import division
print "average:",turple_result[0]/turple_result[1]
# 参考：https://stackoverflow.com/questions/51236850/how-to-find-an-average-for-a-spark-rdd
# val pair = sc.parallelize(1 to 100)
# .map(x => (x, 1))
# .reduce((x, y) => (x._1 + y._1, x._2 + y._2))
# val mean = pair._1 / pair._2

[(1, 1), (2, 1), (3, 1), (3, 1)]
(9, 4)
average: 2.25


In [52]:
#aggregate()函数计算列表平均值
#aggragate函数解释参考：https://stackoverflow.com/questions/28240706/explain-the-aggregate-functionality-in-spark
#函数原型：aggregate(zeroValue, seqOp, combOp)
#函数功能：aggregate() lets you take an RDD and generate a single value that is of a different type than what was stored in the original RDD.
#seqOp是每个partitions的操作
#例子：Compute the sum of a list and the length of that list. Return the result in a pair of (sum, length)
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
#comOp将各partitions的内容聚合
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
tuple_result2 = rddTest6.aggregate((0,0),seqOp,combOp)
print "average:",tuple_result2[0]/tuple_result2[1]

 average: 2.25


3.5.2不同类型RDD相互转化
<br> python所有的函数都实现在基本的RDD类中，但是如果操作对应的RDD数据类型不正确，就会导致运行时错误。

3.6 持久化缓存
<br> 1 Spark RDD 是惰性求值的，每次行动操作都会重算RDD及所有依赖，为了避免重复运算同一个RDD，可以将数据持久化。
<br> 2 Spark有不同的持久化级别，只内存；内存+磁盘；只磁盘等
<br> 3 unpersist()方法可以手动把持久化的数据从缓存中删除。