# Spark RDD 基础
<table align="left">
  <td>
    <a target="_blank" href="http://nbviewer.ipython.org/github/ShowMeAI-Hub/awesome-AI-cheatsheets/blob/main/Spark/Spark_RDD_cheatsheet_code.ipynb"><img src="https://raw.githubusercontent.com/jupyter/design/master/logos/Badges/nbviewer_badge.svg" />在nbviewer上查看notebook</a>
  </td>
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/ShowMeAI-Hub/awesome-AI-cheatsheets/blob/main/Spark/Spark_RDD_cheatsheet_code.ipynb"><img src="https://colab.research.google.com/assets/colab-badge.svg" />在Google Colab运行</a>
  </td>
  <td>
    <a target="_blank" href="https://github.com/ShowMeAI-Hub/awesome-AI-cheatsheets/tree/main/Spark/Spark_RDD_cheatsheet_code.ipynb"><img src="https://badgen.net/badge/open/github/color=cyan?icon=github" />在Github上查看源代码</a>
  </td>
  <td>
    <a target="_blank" href="https://github.com/ShowMeAI-Hub/awesome-AI-cheatsheets/Spark/Spark_RDD速查表.pdf"><img src="https://badgen.net/badge/download/pdf/color=white?icon=github"/>下载速查表</a>
  </td>
</table>

## 说明
**notebook by [韩信子](https://github.com/HanXinzi-AI)@[ShowMeAI](https://github.com/ShowMeAI-Hub)**

更多AI速查表资料请查看[速查表大全](https://github.com/ShowMeAI-Hub/awesome-AI-cheatsheets)

PySpark是Spark的PythonAPI，允许Python调用Spark编程模型。

## 配置spark环境

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q www-us.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz  
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"]="/content/spark-2.4.8-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

## 初始化Spark

### SparkContext

In [4]:
from pyspark import SparkContext

In [5]:
sc = SparkContext(master = 'local[2]')

### SparkContext信息获取

In [6]:
sc.version   #获取SparkContext版本

'2.4.8'

In [7]:
sc.pythonVer   #获取Python版本

'3.7'

In [8]:
sc.master   #要连接的MasterURL

'local[2]'

In [9]:
str(sc.sparkHome)   #Spark在工作节点的安装路径

'None'

In [10]:
str(sc.sparkUser())   #获取SparkContext的Spark用户名

'root'

In [11]:
sc.appName   #返回应用名称

'pyspark-shell'

In [12]:
sc.applicationId   #获取应用程序ID

'local-1623220140497'

In [13]:
sc.defaultParallelism   #返回默认并行级别

2

In [14]:
sc.defaultMinPartitions   #RDD默认最小分区数

2

### 配置

In [16]:
from pyspark import SparkConf, SparkContext

conf = (SparkConf() \
        .setMaster("local") \
        .setAppName("My app") \
        .set("spark.executor.memory", "1g"))

sc = SparkContext.getOrCreate(conf = conf)

### 使用Shell

PySpark Shell已经为SparkContext创建了名为 sc 的变量。

In [None]:
$./bin/spark-shell --master local[2]        #命令行启动spark

In [None]:
$./bin/pyspark --master local[4] --py-files code.py     #命令行提交spark脚本任务

用 --master 参数设定 Context 连接到哪个Master 务器，通过传递逗号分隔列表至 --py-files 添加 Python.zip、.egg 或 .py文件到 Runtime 路径。

## 加载数据

### 并行集合

In [17]:
rdd = sc.parallelize([('a',7),('a',2),('b',2)])

In [18]:
rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])

In [19]:
rdd3 = sc.parallelize(range(100))

In [20]:
rdd4 = sc.parallelize([("a",["x","y","z"]), ("b",["p", "r"])])

### 外部数据

使用textFile()函数从HDFS、本地文件或其它支持Hadoop的文件系统里读取文本文件，或使用wholeTextFiles()函数读取目录里文本文件。

In [None]:
textFile = sc.textFile("/my/directory/*.txt")
# 如果是在google colab中可以运行下方代码
# textFile = sc.textFile("sample_data/california_housing_train.csv")

In [None]:
textFile2 = sc.wholeTextFiles("/my/directory/")
# 如果是在google colab中可以运行下方代码
# textFile2 = sc.wholeTextFiles("sample_data/")

## 提取RDD信息

### 基础信息

In [21]:
rdd.getNumPartitions()   #列出分区数

2

In [22]:
rdd.count()   #计算RDD实例数量

3

In [23]:
rdd.countByKey()   #按键计算RDD实例数量

defaultdict(int, {'a': 2, 'b': 1})

In [24]:
rdd.countByValue()   #按值计算RDD实例数量

defaultdict(int, {('a', 2): 1, ('a', 7): 1, ('b', 2): 1})

In [25]:
rdd.collectAsMap()   #以字典形式返回键值

{'a': 2, 'b': 2}

In [26]:
rdd3.sum()   #RDD元素求和

4950

In [27]:
sc.parallelize([]).isEmpty()   #检查RDD是否为空

True

### 汇总

In [28]:
rdd3.max()   #RDD元素的最大值

99

In [29]:
rdd3.min()   #RDD元素的最小值

0

In [30]:
rdd3.mean()   #RDD元素的平均值

49.5

In [31]:
rdd3.stdev()   #RDD元素的标准差

28.86607004772212

In [32]:
rdd3.variance()   #RDD元素的方差

833.25

In [33]:
rdd3.histogram(3)   #分箱(Bin)生成直方图

([0, 33, 66, 99], [33, 33, 34])

In [34]:
rdd3.stats()   #综合统计包括：计数、平均值、标准差、最大值和最小值

(count: 100, mean: 49.5, stdev: 28.86607004772212, max: 99.0, min: 0.0)

## 应用函数

**map与flatmap函数**

In [35]:
rdd.map(lambda x: x+(x[1],x[0])).collect()   #对每个RDD元素执行函数

[('a', 7, 7, 'a'), ('a', 2, 2, 'a'), ('b', 2, 2, 'b')]

In [36]:
rdd5=rdd.flatMap(lambda x: x+(x[1],x[0]))   #对每个RDD元素执行函数，并拉平结果

In [37]:
rdd5.collect()

['a', 7, 7, 'a', 'a', 2, 2, 'a', 'b', 2, 2, 'b']

In [38]:
rdd4.flatMapValues(lambda x: x).collect()   #不改变键，对rdd4的每个键值对执行flatMap函数

[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

## 选择数据

### **获取**

In [39]:
rdd.collect()   #返回包含所有RDD元素的列表

[('a', 7), ('a', 2), ('b', 2)]

In [40]:
rdd.filter(lambda x: "a" in x) .collect()   #提取前两个RDD元素

[('a', 7), ('a', 2)]

In [41]:
rdd.first()   #提取第一个RDD元素

('a', 7)

In [42]:
rdd5.distinct().collect()   #提取前两个RDD元素

[2, 'b', 'a', 7]

### 抽样

In [43]:
rdd3.sample(False, 0.15, 81).collect()   #返回rdd3的采样子集

[3, 4, 27, 28, 35, 41, 43, 49, 51, 55, 64, 65, 66, 67, 85, 88, 89, 92]

### 筛选

In [44]:
 rdd.filter(lambda x: "a" in x) .collect()   #筛选RDD

[('a', 7), ('a', 2)]

In [45]:
rdd5.distinct().collect()   #返回RDD里的唯一值

[2, 'b', 'a', 7]

In [46]:
rdd.keys().collect()   #返回RDD键值对里的键

['a', 'a', 'b']

## 迭代

**foreach函数迭代**

In [47]:
def g(x):
    print(x)

In [48]:
rdd.foreach(g)   #为所有RDD应用函数

## 改变数据形状

### Reduce操作

In [49]:
rdd.reduceByKey(lambda x,y : x+y).collect()   #合并每个键的RDD值

[('b', 2), ('a', 9)]

In [50]:
rdd.reduce(lambda a, b: a + b)   #合并RDD的值

('a', 7, 'a', 2, 'b', 2)

### 分组

In [51]:
rdd3.groupBy(lambda x: x % 2).mapValues(list).collect()   #返回RDD的分组值

[(0,
  [0,
   2,
   4,
   6,
   8,
   10,
   12,
   14,
   16,
   18,
   20,
   22,
   24,
   26,
   28,
   30,
   32,
   34,
   36,
   38,
   40,
   42,
   44,
   46,
   48,
   50,
   52,
   54,
   56,
   58,
   60,
   62,
   64,
   66,
   68,
   70,
   72,
   74,
   76,
   78,
   80,
   82,
   84,
   86,
   88,
   90,
   92,
   94,
   96,
   98]),
 (1,
  [1,
   3,
   5,
   7,
   9,
   11,
   13,
   15,
   17,
   19,
   21,
   23,
   25,
   27,
   29,
   31,
   33,
   35,
   37,
   39,
   41,
   43,
   45,
   47,
   49,
   51,
   53,
   55,
   57,
   59,
   61,
   63,
   65,
   67,
   69,
   71,
   73,
   75,
   77,
   79,
   81,
   83,
   85,
   87,
   89,
   91,
   93,
   95,
   97,
   99])]

In [52]:
rdd.groupByKey().mapValues(list).collect()   #按键分组RDD

[('b', [2]), ('a', [7, 2])]

### 聚合

In [53]:
seqOp = (lambda x,y: (x[0]+y,x[1]+1))

In [54]:
combOp = (lambda x,y:(x[0]+y[0],x[1]+y[1]))

In [66]:
add = (lambda x,y:x+y)

In [55]:
rdd3.aggregate((0,0), seqOp, combOp)   #汇总每个分区里的RDD元素，并输出结果

(4950, 100)

In [60]:
rdd.aggregateByKey((0,0), seqOp, combOp).collect()   #汇总每个RDD的键的值

[('b', (2, 1)), ('a', (9, 2))]

In [67]:
rdd3.fold(0, add)   #汇总每个分区里的RDD元素，并输出结果

4950

In [68]:
rdd.foldByKey(0, add).collect()   #合并每个键的值

[('b', 2), ('a', 9)]

In [59]:
rdd3.keyBy(lambda x: x+x).collect()   #通过执行函数，创建RDD元素的元组

[(0, 0),
 (2, 1),
 (4, 2),
 (6, 3),
 (8, 4),
 (10, 5),
 (12, 6),
 (14, 7),
 (16, 8),
 (18, 9),
 (20, 10),
 (22, 11),
 (24, 12),
 (26, 13),
 (28, 14),
 (30, 15),
 (32, 16),
 (34, 17),
 (36, 18),
 (38, 19),
 (40, 20),
 (42, 21),
 (44, 22),
 (46, 23),
 (48, 24),
 (50, 25),
 (52, 26),
 (54, 27),
 (56, 28),
 (58, 29),
 (60, 30),
 (62, 31),
 (64, 32),
 (66, 33),
 (68, 34),
 (70, 35),
 (72, 36),
 (74, 37),
 (76, 38),
 (78, 39),
 (80, 40),
 (82, 41),
 (84, 42),
 (86, 43),
 (88, 44),
 (90, 45),
 (92, 46),
 (94, 47),
 (96, 48),
 (98, 49),
 (100, 50),
 (102, 51),
 (104, 52),
 (106, 53),
 (108, 54),
 (110, 55),
 (112, 56),
 (114, 57),
 (116, 58),
 (118, 59),
 (120, 60),
 (122, 61),
 (124, 62),
 (126, 63),
 (128, 64),
 (130, 65),
 (132, 66),
 (134, 67),
 (136, 68),
 (138, 69),
 (140, 70),
 (142, 71),
 (144, 72),
 (146, 73),
 (148, 74),
 (150, 75),
 (152, 76),
 (154, 77),
 (156, 78),
 (158, 79),
 (160, 80),
 (162, 81),
 (164, 82),
 (166, 83),
 (168, 84),
 (170, 85),
 (172, 86),
 (174, 87),
 (176, 88

## 数学运算

**RDD运算**

In [69]:
rdd.subtract(rdd2).collect()   #返回在rdd2里没有匹配键的rdd键值对

[('b', 2), ('a', 7)]

In [70]:
rdd2.subtractByKey(rdd).collect()   #返回rdd2里的每个(键，值)对，rdd中没有匹配的键

[('d', 1)]

In [71]:
rdd.cartesian(rdd2).collect()   #返回rdd和rdd2的笛卡尔积

[(('a', 7), ('a', 2)),
 (('a', 7), ('d', 1)),
 (('a', 7), ('b', 1)),
 (('a', 2), ('a', 2)),
 (('b', 2), ('a', 2)),
 (('a', 2), ('d', 1)),
 (('a', 2), ('b', 1)),
 (('b', 2), ('d', 1)),
 (('b', 2), ('b', 1))]

## 排序

**RDD排序**

In [72]:
rdd2.sortBy(lambda x: x[1]).collect()   #按给定函数排序

[('d', 1), ('b', 1), ('a', 2)]

In [73]:
rdd2.sortByKey() .collect()   #RDD按键排序RDD的键值对

[('a', 2), ('b', 1), ('d', 1)]

## 重分区

**repartition函数**

In [74]:
rdd.repartition(4)   #新建一个含4个分区的RDD

MapPartitionsRDD[104] at coalesce at NativeMethodAccessorImpl.java:0

In [75]:
rdd.coalesce(1)   #将RDD中的分区数缩减为1个

CoalescedRDD[105] at coalesce at NativeMethodAccessorImpl.java:0

## 保存

**存储RDD到本地或HDFS**

In [76]:
rdd.saveAsTextFile("rdd.txt")

In [None]:
rdd.saveAsHadoopFile("hdfs://namenodehost/parent/child", 'org.apache.hadoop.mapred.TextOutputFormat')

## 终止SparkContext

**停止SparkContext**

In [77]:
sc.stop()

## 执行脚本程序

**提交脚本执行**

In [None]:
$./bin/spark-submit examples/src/main/python/pi.py