## Spark 入门
---
---

### Spark VS Hadoop
---
| 项目 | Hadoop | Spark |
|------|---------|--------|
| **类型** | 基础平台（包含计算、存储、调度） | 纯计算工具（分布式） |
| **场景** | 海量数据批处理（磁盘迭代计算） | 海量数据的批处理（内存迭代计算、交互式计算）、海量数据流计算 |
| **价格** | 对机器要求低，便宜 | 对内存有要求，相对较贵 |
| **编程范式** | Map + Reduce，API 较为底层，算法适应性差 | RDD 组成 DAG 有向无环图，API 较为顶层，方便使用 |
| **数据存储结构** | MapReduce 中间计算结果在 HDFS 磁盘上，延迟大 | RDD 中间运算结果在内存中，延迟小 |
| **运行方式** | Task 以进程方式维护，任务启动慢 | Task 以线程方式维护，任务启动快，可批量创建提高并行计算能力 |

尽管Spark相较于Hadoop而言有较大优势，但Spark并不会完全替代Hadoop
- 在计算层面，Spark相比较MapReduce有巨大的性能优势，但至今仍有许多计算工具基于MR框架，例如Hive
- Spark仅做计算，而Hadoop生态圈不仅有计算(MR)，也有存储(HDFS)和资源管理调度(YARN), HDFSS和YARN仍是许多大数据体系的核心架构

Spark处理数据与MapReduce处理数据相比，有如下两个不同点：
- Spark处理数据时，可以将中间处理结果数据存储到内存中(内存迭代计算)
- Spark提供了非常丰富的算子(API)，可以做到复杂任务在一个Spark程序中完成

### Spark的多种运行模式
---
- 本地模式(单机、开发和测试)：本地模式就是以一个独立的进程，通过其内部的多个线程来模拟整个Spark运行时的环境

- Standalone模式(集群)：Spark中的各个角色以独立进程的形式存在，并组成Spark集群环境

- Hadoop YARN模式(集群)：Spark中的各个角色运行在YARN的容器内部，并组成Spark集群环境

- Kubernetes模式(容器集群)：Spark中的各个角色运行在Kubernetes的容器内部，并组成Spark集群环境

- 云服务器迷失

### Spark的架构角色
---
<span style="color:#00BFFF; font-weight:bold;">对比YARN的角色：</span>

&emsp;&emsp;资源管理层面

- 集群资源管理者(Master)：ResourceManager
- 单机资源管理者(Worker)：NodeManager

&emsp;&emsp;任务计算层面

- 单任务管理者(Master)：ApplicationMaster
- 单任务执行者(Worker)：Task(容器内计算框架的工作角色)

<span style="color:#00BFFF; font-weight:bold;">Spark的角色：</span>

&emsp;&emsp;资源层面
- Master角色：集群资源管理
- Worker角色：单机资源管理

&emsp;&emsp;任务运行层面
- Driver：单个任务的管理
- Executor(Worker)：单个任务的计算

### Spark Local模式的基本原理
---
<span style="color:#00BFFF; font-weight:bold;">本质上：启动一个JVM Process进程(一个进程里面有多个进程)，执行任务Task
</span>
- Local模式可以限制模拟Spark集群环境的线程数量，即```Local[N]```或```Local[*]```
- 其中N代表可以使用N个线程，每个线程拥有一个cpu core；如果不指定N，则默认是1个线程(该线程有1个core)；通常cpu有几个core，就指定几个线程，最大化利用计算能力、
- 如果是```Local[*]```，则代表 Run Spark Locally with as many worker threads as logical cores on your machin；按照cpu最多的cores设置线程数

<span style="color:#00BFFF; font-weight:bold;">Local下的角色分布：</span>

&emsp;&emsp;资源管理：

- Master：Local进程本身
- Worker：Local进程本身

&emsp;&emsp;任务执行：

- Driver：Local进程本身
- Executor：不存在；没有独立的Executor角色。由Local进程(也就是Driver)内的线程提供计算能力

### StandAlone模式的运行原理
---
StandAlone模式是Spark自带的一个轻量级集群管理器(Cluster Manager)，它不依赖与Hadoop、也不需要YARN等，只需要几台机器(虚拟机)，就能形成一个完整的Spark集群

<span style="color:#00BFFF; font-weight:bold;">Standalone集群在进程上主要有3类进程：</span>
- 主节点Master进程：<br>
&emsp;&emsp; Master角色，管理整个集群资源，并托管运行各个任务的Driver
- 从节点Worker进程<br>
&emsp;&emsp;管理每个机器的资源，接受Master分配的任务，启动对应的Executor(JVM进程)去执行计算

### SparkOnYarn模式的运行原理
---
<span style="color:#00BFFF; font-weight:bold;">什么是SparkOnYarn？</span>

- YARN 负责分配和管理资源
- Spark 负责计算任务本身的调度和执行

<span style="color:#00BFFF; font-weight:bold;">两种运行模式</span>

- Client模式：Driver在本地，AM只负责申请资源与监控，用于调试、测试
- Cluster模式：Driver运行在YARN的Container中， AM既当Driver又当调度者，用于生产、集群运行

<span style="color:#00BFFF; font-weight:bold;">YARN 模式下的角色分布</span>

- Master(ResourceManager)：集群大管家，整个集群的资源管理和分配
- Worker(NodeManager)：单个机器的管家，负责在单个服务器上提供运行的容器，管理当前机器的资源
- Driver：**是整个Spark任务的‘指挥官’**，管理Executor任务的执行和任务分解分配，类似于YARN的ApplicationMaster
- Executor：具体干活的进程，Spark的工作任务(Task)都由Executor来负责执行

### SarkOnYarn(Cluster) 的运行流程
---
<span style="color:#00BFFF; font-weight:bold;">spark-submit提交任务</span>
- ```spark-submit --master yarn```
- Spark会打包应用程序(python文件)发送给YARN的ResourceManager(RM)，相当于对RM说```‘我有一个任务要做，请给我分配一些资源来运行它’```

<span style="color:#00BFFF; font-weight:bold;">启动ApplicationMaster(Driver)</span>

- RM给应用程序分配一个Container(随机一台有资源的机器，比如Hadoop102)
- 这个Contianer相当于‘项目经理’，也就是ApplicationMaster
- 这个AM负责的是整个Spark程序的生命周期管理(包括向RM再次申请更多资源，监控任务进度，汇报执行状态)
- 这个ApplicationMaster(Driver)本身不执行计算，但它依然是一个JVM进程，会占用内存和CPU core

<span style="color:#00BFFF; font-weight:bold;">ApplicationMaster向RM申请Executor的Container</span>

- AM启动后会根据具体的配置情况(如```--num-executor 4```, ```executor-memory 2g```)向RM申请对应数量的Container
- RM根据集群负载情况把这些Container分配到不同的NodeManager上

<span style="color:#00BFFF; font-weight:bold;">NodeManager启动Executor</span>

- NodeManager收到RM的指令后开始启动Java进程
- 每个Executor启动后，会立即向AM报到

<span style="color:#00BFFF; font-weight:bold;">Driver调度任务</span>

- Driver(位于AM内)根据RDD/DAG逻辑切分成若干Task
- Task = RDD的最想小算单元
- 每个Task分配给某个Executor的线程执行

<span style="color:#00BFFF; font-weight:bold;">结果返回与资源回收</span>

- Driver 汇总结果、写出输出
- ApplicationMaster通知RM：任务完成
- YARN回收Container

### Spark程序的提交
---
<span style="color:#00BFFF; font-weight:bold;">提交与资源申请</span>

在```Hadoop103```上执行
```bash
spark-submit --master yarn --deploy-mode cluster /opt/project/test.py
```
- Spark客户端(Hadoop103)把应用程序(test.py)交给ResourceManager(RM, 103)
- RM在有空闲资源的某台NodeManager(可能是102/103/104之一)上分配第一个Container来启动ApplicationMaster(AM)
- AM向RM申请更多Executor容器

<span style="color:#00BFFF; font-weight:bold;">Executor启动与数据本地化</span>

- RM根据个节点可用的vcores/memory，在102/103/104上分配若干Container
- 个节点的NodeManager在被分配的Container中启动Executor进程
```org.apache.spark.executor.CoarseGrainedExecutorBackend```


<span style="color:#00BFFF; font-weight:bold;">创建SparkConf和SparkContext</span>

> ```conf = SparkConf().setAppName("WordCount")``` \
> ```sc = SparkContext(conf=conf)```

- ```SparkConf```用来指定运行参数(如app名称、master模式)
    - 作用：设置参数
    - 示例：
        ```python
        conf = SparkConf() \
            .setAppName("WordCount") \
            .setMaster("yarn") \
            .set("spark.executor.memory", "2g") \
            .set("spark.executor.cores", "2")
        ```


- ```SparkaContext```整个Spark程序的‘入口’，它负责与YARN通信、申请资源、创建Driver、管理Executor、作业调度

&emsp;&emsp;当运行这两行代码时：

- Spark会向YARN的ResourceManager提交一个Application
- ResourceManager分配一个Container启动ApplicationMaster
- ApplicationMaster进一步申请其他容器来运行Executor(默认先启动2个Executor容器)

&emsp;&emsp;到这里，ApplicationMaster与各Executor之间的集群环境已经建立

<span style="color:#00BFFF; font-weight:bold;">从HDFS读取数据</span>

> ```file_rdd = sc.textFile("hdfs://Hadoop102:8020/input/testWordCount.txt")```

- ```textFile()```是一个行动转换(Transformation)，会创建一个HadoopRDD
- Spark会向NameNode102请求文件的元数据(block 位置信息)，然后每个文件块对应一个分区，每个分区由一个Task处理，优先在存有该block的DataNode上启动任务(数据本地化)



In [None]:
# coding:utf8
from pyspark import SparkContext, SparkConf

if __name__ == "__main__":
    conf = SparkConf().setAppName("WordCount")
    sc = SparkContext(conf=conf)

    # 读取文件
    file_rdd = sc.textFile("hdfs://Hadoop102:8020/input/testWordCount.txt")

    # 进行单词计数
    word_rdd = file_rdd.flatMap(lambda line: line.split(" "))

    # 将单词转换为元组对象
    words_with_one_rdd = word_rdd.map(lambda word: (word, 1))

    # 按照单词进行分组聚合
    result_rdd = words_with_one_rdd.reduceByKey(lambda a, b: a + b)

    print(result_rdd.collect())
