# RDD Programming Guide

In [10]:
import os
os.environ['JAVA_HOME'] = "C:/Program Files/Java/jdk-1.8"
# 配置hadoop路径
os.environ['HADOOP_HOME'] = "D:/hadoop-3.3.4"
# 配置python解释器
os.environ['PYSPARK_PYTHON'] = "D:/Anaconda/envs/DDL/python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = "D:/Anaconda/envs/DDL/python.exe"
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = (SparkSession
         .builder
         .appName("RDD")
         .master("local[2]")
         .config("spark.executor.memory", "512m")
         .getOrCreate())

# Set log level to ERROR
spark.sparkContext.setLogLevel("ERROR")
# 获取 SparkContext
sc = spark.sparkContext
print(spark)
print(sc)

<pyspark.sql.session.SparkSession object at 0x000001E87B757310>
<SparkContext master=local[2] appName=RDD>


## 1. Creating RDDs

**创建 RDD 有两种方法：**
- 并行化驱动程序中的现有集合
- 引用外部存储系统中的数据集，例如共享文件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据源.

<p>

**关于使用 Spark 读取文件**:
- 如果使用本地文件系统上的路径，则**工作节点**上的相同路径也必须可以访问该文件。将文件复制到所有工作节点或使用网络挂载的共享文件系统。
- Spark 的所有基于文件的输入方法（包括 textFile）都支持在**目录、压缩文件和通配符**上运行。例如，您可以使用 `textFile("/my/directory")`、`textFile("/my/directory/*.txt")` 和 `textFile("/my/directory/*.gz")`。
- `textFile` 方法默认情况下为文件的每个块创建一个分区（块在 HDFS 中默认情况下为 128MB），但也可以通过传递更大的值来请求更多分区。请注意，**不能**拥有少于块的分区

<p>

**`SparkContext.wholeTextFiles`**
- `SparkContext.wholeTextFiles` 允许您读取包含**多个小型文本文件的目录**，并将每个文件作为 **(文件名，内容)** 对返回。这与 `textFile` 形成对比，`textFile` 将为每个文件中的 **每一行** 返回一个记录


In [11]:
# 并行化驱动程序中的现有集合
data = [1, 2, 3, 4, 5]
dataRdd = sc.parallelize(data)

# 引用外部存储系统中的数据集
fileRdd = sc.textFile("../data/wordcount/data.txt")

## 2. RDD Operations
<p>

RDDs support two types of operations: ***transformations***, which **create a new dataset** from an existing one, and ***actions***, which **return a value** to the **driver program** after running a computation on the dataset.

<p>

### 2.1. Basics

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also **persist an RDD in memory using the persist (or cache) method**, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for **persisting RDDs on disk, or replicated across multiple nodes**

In [16]:
# 从外部文件定义了一个基本 RDD。此数据集不会加载到内存中或以其他方式进行操作：fileRdd/lines 只是一个指向文件的指针
lines = fileRdd
# lineLengths 定义为 map 转换的结果。同样，由于惰性，lineLengths 不会立即计算
lineLengths = lines.map(lambda s: len(s))
print(lineLengths.take(2))
print("_____________________")
# lineLengths.persist()

# run reduce, which is an action.
# At this point Spark breaks the computation into tasks to run on separate machines
# Each machine runs both its part of the map and a local reduction, returning only its answer to the driver program
totalLength = lineLengths.reduce(lambda a, b: a + b)
print(totalLength)

[77, 77]
_____________________
1025


### 2.2. Passing Functions to Spark
<p>

Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are three recommended ways to do this:

- **Lambda expressions**, for simple functions that can be written as an expression. (Lambdas do not support multi-statement functions or **statements that do not return a value**.)
- 调用 Spark 的函数内部的本地 def，用于更长的代码
- **Top-level functions in a module.**

In [17]:

def fun(s):
    return len(s)
lineLengths1 = lines.map(fun)
totalLength1 = lineLengths1.reduce(lambda a, b: a + b)
print(totalLength1)

406


### 2.3. Working with Key-Value Pairs
<p>

While most Spark operations work on **RDDs containing any type of objects**, a few special operations are **only** available on **RDDs of key-value pairs**. The most common ones are distributed **“shuffle”** operations, such as `grouping` or `aggregating` the elements by a key.

<p>

In Python, these operations work on **RDDs containing built-in Python tuples such as (1, 2)**. Simply create such tuples and then call your desired operation.

- For example, the following code uses the `reduceByKey` operation on **key-value pairs** to count how many times each line of text occurs in a file:



In [15]:
# lines 是一个 RDD（弹性分布式数据集），其中每个元素是文件中的一行文本
# 使用 map 转换每一行文本为一个键值对 (s, 1)，其中 s 是行文本，1 是一个计数值
# 使用 reduceByKey 对相同键的值进行聚合。在这里，它将所有相同的行文本的计数值相加
print(lines.take(3))
print("_______________________________________")
pairs = lines.map(lambda s: (s, 1))
print(pairs.take(3))
print("_______________________________________")
counts = pairs.reduceByKey(lambda a, b: a + b)
print(counts.take(3))
print("_______________________________________")
sortedCounts = counts.sortByKey()
print(sortedCounts.take(3))
print("_______________________________________")
# collect() 方法会将分布在集群中各个工作节点上的数据传输回驱动程序, 并返回一个列表
results = counts.collect()
for result in results:
    print(result)


['spark hadoop hive spark spark hadoop hive spark hadoop hive spark hadoop hive', 'spark hadoop hive spark hadoop hive spark hadoop hive spark hadoop hive spark', 'hadoop hive spark hadoop hive spark hadoop hive spark hadoop hive spark hadoop hive spark hadoop']
_______________________________________
[('spark hadoop hive spark spark hadoop hive spark hadoop hive spark hadoop hive', 1), ('spark hadoop hive spark hadoop hive spark hadoop hive spark hadoop hive spark', 1), ('hadoop hive spark hadoop hive spark hadoop hive spark hadoop hive spark hadoop hive spark hadoop', 1)]
_______________________________________
[('hive spark hadoop hive spark hadoop hive spark hadoop hive spark', 3), ('hadoop hive spark hadoop hive spark hadoop hive', 3), ('spark hadoop hive spark hadoop hive spark hadoop', 2)]
_______________________________________
[('hadoop hive spark hadoop hive spark hadoop hive', 3), ('hadoop hive spark hadoop hive spark hadoop hive spark hadoop hive spark hadoop hive spark had

## 3.  Closure and Shared Variables

### 3.1 Closure

One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster.

<p>

考虑以下简单的 RDD 元素求和，其行为可能会因执行是否在同一个 JVM 中而异。一个常见的例子是在本地模式 (local 模式 (--master = local[n])) 中运行 Spark 与将 Spark 应用程序部署到集群（例如通过 spark-submit 到 YARN）之间的区别

```python
counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)
```
<p>

为了执行作业，Spark 将 RDD 操作的处理分解成tasks，每个task由一个执行器(executor)执行。在执行之前，Spark 会计算任务的**闭包(closure)**。闭包是指执行器在 RDD 上执行计算（在本例中为 `foreach()`）时必须可见的那些变量和方法。此闭包被序列化并发送到每个执行器。
<p>

发送到每个执行器的闭包中的变量现在是**副本**，因此，当在 `foreach` 函数中引用**counter** 时，它不再是驱动程序节点上的**counter**。驱动程序节点的内存中仍然存在一个**counter**，但执行器无法访问它！执行器只能看到来自序列化闭包的副本。因此，**counter** 的最终值将仍然为零，因为对**counter** 的所有操作都引用了序列化闭包中的值。

<p>

在本地模式下，在某些情况下，`foreach` 函数实际上将在与驱动程序相同的 JVM 中执行，并将引用相同的原始**counter**，并且实际上可能会更新它。

### 3.2 Shared Variables -- Broadcast Variables
**Broadcast variables** allow the programmer to keep a **read-only variable** cached on each machine rather than shipping a copy of it with tasks. 广播变量适用于较小的只读数据集，避免在每个任务中重复传输。
<p>
```python
broadcastVar = sc.broadcast([1, 2, 3, 4, 5])
data = broadcastVar.value
```

### 3.3 Shared Variables -- Accumulators

累加器（Accumulators）与广播变量（Broadcast Variables）共同作为Spark提供的两大共享变量，主要用于跨集群的数据节点之间的数据共享，突破数据在集群各个executor不能共享问题。而累加器主要定义在driver节点，
```python
accum = sc.accumulator(0)
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))

```

In [6]:
sc.stop()
spark.stop()