# 创建RDD

创建RDD主要有两种方式，一个是textFile加载本地或者集群文件系统中的数据，

第二个是用parallelize方法将Driver中的数据结构并行化成RDD。

In [1]:
import findspark

#指定spark_home为刚才的解压路径,指定python路径
spark_home = "D:/Install/Spark3/spark-3.0.0-bin-hadoop3.2"
#python对应虚拟环境中解释器
python_path = "D:/Install/Anaconda/envs/xpyspark/python"
findspark.init(spark_home,python_path)

In [2]:
import pyspark 
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("rdd_tutorial").setMaster("local[4]")
sc = SparkContext(conf=conf)

print(pyspark.__version__)

3.0.0


## textFile

In [3]:
#从本地文件系统中加载数据
file = "./data/hello.txt"
rdd = sc.textFile(file,3)
rdd.collect()

['parterner will coming soon', '', 'I will go to china']

In [4]:
#从集群文件系统中加载数据
#file = "hdfs://hadoop101:9870/input/word.txt"
# 也可以省去hdfs://localhost:9000
#rdd = sc.textFile(file,3)

## parallelize

In [5]:
#parallelize将Driver中的数据结构生成RDD,第二个参数指定分区数
rdd = sc.parallelize(range(1,11),2)
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# 常用Action操作

Action操作将触发基于RDD依赖关系的计算。

## collect

In [None]:
rdd = sc.parallelize(range(10),5) 

In [6]:
#collect操作将数据汇集到Driver,数据过大时有超内存风险
all_data = rdd.collect()
all_data

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

## take

In [7]:
#take操作将前若干个数据汇集到Driver，相比collect安全
rdd = sc.parallelize(range(10),5) 
part_data = rdd.take(4)
part_data

[0, 1, 2, 3]

## takeSample

In [8]:
#takeSample可以随机取若干个到Driver,第一个参数设置是否放回抽样
rdd = sc.parallelize(range(10),5) 
sample_data = rdd.takeSample(False,10,0)
sample_data

[7, 8, 1, 5, 3, 4, 2, 0, 9, 6]

## first

In [9]:
#first取第一个数据
rdd = sc.parallelize(range(10),5) 
first_data = rdd.first()
print(first_data)

0


## count

In [10]:
#count查看RDD元素数量
rdd = sc.parallelize(range(10),5)
data_count = rdd.count()
print(data_count)

10


## reduce

In [11]:
#reduce利用二元函数对数据进行规约
rdd = sc.parallelize(range(10),5) 
rdd.reduce(lambda x,y:x+y)

45

## foreach

In [12]:
#foreach对每一个元素执行某种操作，不生成新的RDD
#累加器用法详见共享变量
rdd = sc.parallelize(range(10),5) 
accum = sc.accumulator(0)
rdd.foreach(lambda x:accum.add(x))
print(accum.value)

45


## countByKey

In [13]:
#countByKey对Pair RDD按key统计数量
pairRdd = sc.parallelize([(1,1),(1,4),(3,9),(2,16)]) 
pairRdd.countByKey()

defaultdict(int, {1: 2, 3: 1, 2: 1})

## saveAsTextFile

In [None]:
#saveAsTextFile保存rdd成text文件到本地   
# 需要hadoop环境
text_file = "./data/num.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)

In [None]:
#重新读入会被解析文本
rdd_loaded = sc.textFile(text_file)
rdd_loaded.collect()

# 常用Transformation操作

Transformation转换操作具有懒惰执行的特性，它只指定新的RDD和其父RDD的依赖关系，只有当Action操作触发到该依赖的时候，它才被计算。

## map

In [25]:
#map操作对每个元素进行一个映射转换
rdd = sc.parallelize(range(10),3)
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [26]:
rdd.map(lambda x:x**2).collect()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

## filter

In [27]:
#filter应用过滤条件过滤掉一些数据
rdd = sc.parallelize(range(10),3)
rdd.filter(lambda x:x>5).collect()

[6, 7, 8, 9]

## flatMap

In [28]:
#flatMap操作执行将每个元素生成一个Array后压平
rdd = sc.parallelize(["hello world","hello China"])
rdd.map(lambda x:x.split(" ")).collect()

[['hello', 'world'], ['hello', 'China']]

In [29]:
rdd.flatMap(lambda x:x.split(" ")).collect()

['hello', 'world', 'hello', 'China']

## sample

In [30]:
#sample对原rdd在每个分区按照比例进行抽样,第一个参数设置是否可以重复抽样
rdd = sc.parallelize(range(10),1)
rdd.sample(False,0.5,0).collect()

[1, 4, 9]

## distinct

In [31]:
#distinct去重
rdd = sc.parallelize([1,1,2,2,3,3,4,5])
rdd.distinct().collect()

[4, 1, 5, 2, 3]

## subtract

In [32]:
#subtract找到属于前一个rdd而不属于后一个rdd的元素
a = sc.parallelize(range(10))
b = sc.parallelize(range(5,15))
a.subtract(b).collect()

[0, 1, 2, 3, 4]

## union

In [33]:
#union合并数据
a = sc.parallelize(range(5))
b = sc.parallelize(range(3,8))
a.union(b).collect()

[0, 1, 2, 3, 4, 3, 4, 5, 6, 7]

## intersection

In [34]:
#intersection求交集
a = sc.parallelize(range(1,6))
b = sc.parallelize(range(3,9))
a.intersection(b).collect()

[3, 4, 5]

## cartesian

In [35]:
#cartesian笛卡尔积
boys = sc.parallelize(["LiLei","Tom"])
girls = sc.parallelize(["HanMeiMei","Lily"])
boys.cartesian(girls).collect()

[('LiLei', 'HanMeiMei'),
 ('LiLei', 'Lily'),
 ('Tom', 'HanMeiMei'),
 ('Tom', 'Lily')]

## SortBy

In [36]:
#按照某种方式进行排序
#指定按照第3个元素大小进行排序
rdd = sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
rdd.sortBy(lambda x:x[2]).collect()

[(4, 1, 1), (3, 2, 2), (1, 2, 3)]

## zip

In [37]:
#按照拉链方式连接两个RDD，效果类似python的zip函数
#需要两个RDD具有相同的分区，每个分区元素数量相同

rdd_name = sc.parallelize(["LiLei","Hanmeimei","Lily"])
rdd_age = sc.parallelize([19,18,20])

rdd_zip = rdd_name.zip(rdd_age)
print(rdd_zip.collect())

[('LiLei', 19), ('Hanmeimei', 18), ('Lily', 20)]


## zipWithIndex

In [38]:
#将RDD和一个从0开始的递增序列按照拉链方式连接。
rdd_name =  sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())

[('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]


# 常用PairRDD的转换操作

PairRDD指的是数据为长度为2的tuple类似(k,v)结构的数据类型的RDD,其每个数据的第一个元素被当做key，第二个元素被当做value.

# 缓存操作

如果一个rdd被多个任务用作中间量，那么对其进行cache缓存到内存中对加快计算会非常有帮助。

声明对一个rdd进行cache后，该rdd不会被立即缓存，而是等到它第一次被计算出来时才进行缓存。

可以使用persist明确指定存储级别，常用的存储级别是MEMORY_ONLY和EMORY_AND_DISK。

如果一个RDD后面不再用到，可以用unpersist释放缓存，unpersist是立即执行的。

缓存数据不会切断血缘依赖关系，这是因为缓存数据某些分区所在的节点有可能会有故障，例如内存溢出或者节点损坏。

这时候可以根据血缘关系重新计算这个分区的数据。

# 共享变量

当spark集群在许多节点上运行一个函数时，默认情况下会把这个函数涉及到的对象在每个节点生成一个副本。

但是，有时候需要在不同节点或者节点和Driver之间共享变量。

Spark提供两种类型的共享变量，广播变量和累加器。

广播变量是不可变变量，实现在不同节点不同任务之间共享数据。

广播变量在每个机器上缓存一个只读的变量，而不是为每个task生成一个副本，可以减少数据的传输。

累加器主要是不同节点和Driver之间共享变量，只能实现计数或者累加功能。

累加器的值只有在Driver上是可读的，在节点上不可见。

# 分区操作

分区操作包括改变分区操作，以及针对分区执行的一些转换操作。

glom：将一个分区内的数据转换为一个列表作为一行。

coalesce：shuffle可选，默认为False情况下窄依赖，不能增加分区。repartition和partitionBy调用它实现。

repartition：按随机数进行shuffle，相同key不一定在同一个分区

partitionBy：按key进行shuffle，相同key放入同一个分区

HashPartitioner：默认分区器，根据key的hash值进行分区，相同的key进入同一分区，效率较高，key不可为Array.

RangePartitioner：只在排序相关函数中使用，除相同的key进入同一分区，相邻的key也会进入同一分区，key必须可排序。

TaskContext: 获取当前分区id方法 TaskContext.get.partitionId

mapPartitions：每次处理分区内的一批数据，适合需要分批处理数据的情况，比如将数据插入某个表，每批数据只需要开启一次数据库连接，大大减少了连接开支

mapPartitionsWithIndex：类似mapPartitions，提供了分区索引，输入参数为（i，Iterator）

foreachPartition：类似foreach，但每次提供一个Partition的一批数据