In [1]:
from tempfile import NamedTemporaryFile
import requests
url = "http://api.luftdaten.info/static/v1/data.json"

RDD 创建需要依赖于 SparkContext——它是用于连接 Spark 集群创建 RDD 以及广播集群中变量。

1. [SparkContext 创建 Spark 应用](#SparkContext创建Spark应用)
2. [SparkSession 创建 Spark 应用](#SparkSession创建Spark应用)
3. [查看Spark应用配置信息](#查看Spark应用配置信息)
4. [RDD.DataFrame](#RDD.DataFrame)


### SparkContext创建Spark应用
直接通过 SparkContext 创建，可以使用 SparkConf 对 Spark 应用进行设置， SparkConf 会默认加载 `spark.*` 的 Java 系统属性，也可以通过传入参数修改相关属性。进行单元测试时，可以使用 `loadDefaults=False` 参数来使用系统属性配置以保持测试一致。此外 SparkConf 还有其他方法用于设置，例如 `conf.setMaster("local").setAppName("App")`。

In [2]:
# 通过 SparkContext 创建
from pyspark import SparkContext, SparkConf
import numpy as np
import pandas as pd

# 通过 SparkConf 进行设相关属性，需要以 key-value 形式来进行设置
config = SparkConf() \
        .set("Name", "ContextTest") \
        .setMaster("local") \
        .setAppName("First App")


# 创建 SparkContext
sc = SparkContext(conf=config)

### 使用parallelize创建RDD
[parallelize](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.parallelize) 是用于创建 RDD 的便捷方法，其中 `numSlices` 表示分区数

In [3]:
test = sc.parallelize(range(0, 10, 2), numSlices=3)

In [4]:
test.map(lambda a: a**2).collect()

[0, 4, 16, 36, 64]

### 使用 textFile 创建 RDD
[textFile](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.textFile)可以从 HDFS、本地文件以及其他支持 Hadoop 支持的文件系统 URI读取数据，并且返回字符串的RDD。另一种方法 [wholeTextFiles](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.wholeTextFiles) 可以读取文件内容，并以键（文件路径）和内容形式保存数据。

在使用Spark读取文件时，需要说明以下几点：
1. 如果使用了本地文件系统的路径，那么，必须要保证在所有的worker节点上，也都能够采用相同的路径访问到该文件，比如，可以把该文件拷贝到每个worker节点上，或者也可以使用网络挂载共享文件系统。
2. textFile()方法的输入参数，可以是文件名，也可以是目录，也可以是压缩文件等。比如，textFile("/my/directory"), textFile("/my/directory/.txt"), 以及 textFile("/my/directory/.gz").
3. textFile()方法也可以接受第2个输入参数（可选），用来指定分区的数目。默认情况下，Spark会为HDFS的每个block创建一个分区（HDFS中每个block默认是128MB），block 数量可以多不可少

In [5]:
import pathlib
path = pathlib.Path(".").cwd()

In [6]:
text = sc.textFile(str(path.joinpath("data/README.md")), minPartitions=4)
text.map(lambda x: len(x)).reduce(lambda a, b: a + b)

6279

In [7]:
text.map(lambda x: x.strip()).map(lambda x: len(x)).reduce(lambda a, b: a + b)

6145

In [8]:
text.map(lambda x: x.replace(" ", "")).map(lambda x: len(x)).reduce(lambda a, b: a + b)

5851

In [9]:
text = sc.textFile(str(path.joinpath("data/data.json")), minPartitions=4)
text.map(lambda x:len(x.split("\n"))).collect()

[1]

### RDD 持久化操作
RDD 计算是在内存中进行，在单过程中重复运算会增加运算消耗，通过 `persist()` 方法可以将缓存持久化减少同样过程的计算消耗。出现`persist()`语句的地方，并不会马上计算生成RDD并把它持久化，而是要等到遇到第一个行动操作触发真正计算以后，才会把计算结果进行持久化，持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用。第二次使用 RDD 不需要触发从头到尾的计算，只需要重复使用上面缓存中的rdd
`persist()`的参数表示持久化级别，比如，persist(MEMORY_ONLY)表示将RDD作为反序列化的对象存储于JVM中，如果内存不足，就要按照LRU原则替换缓存中的内容。`persist(MEMORY_AND_DISK)`表示将RDD作为反序列化的对象存储在JVM中，如果内存不足，超出的分区将会被存放在硬盘上。

* 缓存调用 使用`cache()`方法时，会调用`persist(MEMORY_ONLY)`
* 缓存移除 使用`unpersist()` 方法手动地把持久化的RDD从缓存中移除

In [10]:
test = sc.parallelize(range(0, 10, 2), numSlices=3)
print("未执行运算过程且使用 persist 之前的 cache:\n\t", test.cache())
test.persist()

print("第一次执行运算结果: \n\t", test.map(lambda x: x * 2).reduce(lambda a, b: a + b))
print("执行运算之后且使用 persist 的 cache:\n\t", test.cache())

# 释放缓存
test.unpersist()

未执行运算过程且使用 persist 之前的 cache:
	 PythonRDD[11] at RDD at PythonRDD.scala:53
第一次执行运算结果: 
	 40
执行运算之后且使用 persist 的 cache:
	 PythonRDD[11] at RDD at PythonRDD.scala:53


PythonRDD[11] at RDD at PythonRDD.scala:53

### 集群多任务共享变量
在集群的不同节点多任务上并行运行任务，每个任务上各函数变量都产生一个副本。当需要在多个任务之间共享变量，或者在任务（Task）和任务控制节点（Driver Program）之间共享变量。Spark 提供了特殊类型的变量：广播变量（broadcast variables）和累加器（accumulators）。广播变量用来把变量在所有节点的内存之间进行共享。累加器则支持在所有不同节点之间进行累加计算（比如计数或者求和）。

#### 广播变量
广播变量（broadcast variables）允许程序员将一个只读的变量缓存在每台机器上，而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量，进而减少通信的开销。Spark的动作通过一系列的步骤执行，这些步骤由分布式的shuffle操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存，在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据，或者以反序列化形式缓存数据是十分重要的时候，显式地创建广播变量才有用。[Broadcast](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.Broadcast) 变量销毁需要通过 `destroy()` 方法，删除该变量的缓存方法，需要使用 `unpersist()`

In [11]:
# 通过 broadcast 方法创建
broadcastVar = sc.broadcast([1, 2, 3])

# 通过 value 属性获取数据值
broadcastVar.value

[1, 2, 3]

#### 累加器
累加器是仅仅被相关操作累加的变量，因此可以在并行中被有效地支持。它可以被用来实现计数器和总和。Spark原生地只支持数字类型的累加器，编程者可以添加新类型的支持。在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。但是，它们不能读取它的值。只有驱动程序能够读取它的值，通过累加器的value方法。

累机器内置支持的类型为 Int 和 float，但是可以通过继承 AccumulatorParam 类来创建自定义累加器类型。

In [12]:
# 创建累加器
accum = sc.accumulator(0)
# 运算
sc.parallelize([1, 2, 3, 4]).foreach(lambda x : accum.add(x))

# 通过 value 属性获取数据值
accum.value

10

### SparkSession创建Spark应用
SparkSession 是 应用中 DataFrame 和 SQL 的入口，可以用于创建 DataFrame、注册 DataFrame 为 Table（可以对 Table 使用 SQl 语句），也可以用于创建 RDD。SparkSession 创建也可以通过以下两种方式创建：
1. 通过 builder 创建 对 Spark 应用的设置需要通过 SparkSession 下的相关方法和类（builder 可以用于配置后续相关设置，它是 SparkSession 类嵌套类的实例化）或者相关参数完成。注意在使用 conf 的配置方式和 SparkContext 的配置相似
2. SparkSession 直接实例化 通过传入 SparkContext 作为参数进行实例化对象:
    ```{python}
    from pyspark import SparkContext, SparkConf
    config = SparkConf() \
            .setMaster("local") \
            .setAppName("First App")

    sc = SparkContext(conf=config)

    from pyspark.sql import SparkSession
    ss = SparkSession(sc)
    ```

In [13]:
# sparksession 和 sparkcontext 的导入模块不一致，这点需要注意
from pyspark.sql import SparkSession

In [14]:
ss = SparkSession.builder \
    .master("local") \
    .config("Name", "SessionTest") \
    .appName("SessionApp") \
    .getOrCreate()

## 查看Spark应用配置信息
获取 SparkContext 配置信息的方式，可以通过 getConf 方法以及其他 get 方法配合使用查看，例如：`sc.getConf().get("spark.master")` 等，需要注意 Spark 应用的配置信息——基本上配置的非独特属性情况下，配置值都是以 `spark.*` 的形式生成的

In [15]:
# 确认所有配置信息
sc.getConf().getAll()

[('spark.master', 'local'),
 ('Name', 'SessionTest'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.host', 'f6d772bd8388'),
 ('spark.app.id', 'local-1576465041217'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.port', '35459'),
 ('spark.app.name', 'SessionApp'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

In [16]:
# 查询指定配置信息
sc.getConf().get("spark.master")

'local'

## RDD.DataFrame
<img src="http://dblab.xmu.edu.cn/blog/wp-content/uploads/2016/11/DataFrame-RDD.jpg" width=30%>
RDD是分布式的对象的集合，比如，RDD 是以Person为类型数据。但是，Person 类的内部结构对于 RDD 而言未知。RDD 的 DataFrame 是一种以 RDD为基础的分布式数据集，它提供了详细的结构信息——每一列会有相应的名称以及相关的类型，即模式（schema）。和 RDD 一样，DataFrame 的各种变换操作也采用惰性机制，其 DAG 图相当于一个逻辑查询计划，最终，会被翻译成物理查询计划，生成RDD DAG，按照之前介绍的RDD DAG的执行方式去完成最终的计算得到结果。

总之 DataFrame 具有以下相关特征：
1. 输入优化引擎的使用 该类型使用了 Catalyst Optimizer 的输入优化引擎，进行更有效率处理数据。其他语言接口同样可以使用相同引擎
2. 结构化数据 DataFrame 提供了数的模式视图
3. 定制化内存管理 RDD 被曝存在内容中，而DataFrame 是保存在 data off_heap 中（在 Heap 的 main 空间之外，但也属于 RAM）
3. 数据输入输出灵活性 和 RDD 一样支持多种类型数据以及多种数据源
4. 扩展性 可以和大数据工具整合，以处理大数据
5. 可以注册为 SQL table，通过 SQL 语句处理数据

In [17]:
# RDD 数据转换为 DataFrame
ss.createDataFrame(sc.parallelize([[1, 2, 3], [3, 4, 5]]), schema=['a', 'b', 'c']).show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
|  3|  4|  5|
+---+---+---+



In [18]:
# 读取字典，Spark 解析字典数据需要分别获取到列名称和数据
# 和 Pandas 读取字典数据差异较大
data = {'A': [0, 1, 0],
     'B': [1, 0, 1],
     'C': [1, 0, 0]}

ss.createDataFrame(np.array(list(data.values())).T.tolist(),list(data.keys())).show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  0|  1|  1|
|  1|  0|  0|
|  0|  1|  0|
+---+---+---+



### read 方法读取文件
使用 [pyspark.sql.readwriter.DataFrameReader](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader) 对象读取数据（即 `SparkSession.read` 调用该对象）。对于某些明确的数据文件类型（例如 Json、csv 等） 可以直接通过相关方法（例如`SparkSession.read.json()` 、`SparkSession.read.csv()`） 进行读取外部数据，此外还可以通过 `format` 和 `load` 方法加载数据:

```
>>> df = spark.read.format('json').load('python/test_support/sql/people.json')
>>> df.dtypes
[('age', 'bigint'), ('name', 'string')]
```

In [43]:
with open("./data/web_json.json", "w") as file:
    response = requests.get(url)
    file.write(response.text)
    
test = ss.read.json("./data/web_json.json")

In [45]:
test.head()

Row(id=5780359760, location=Row(altitude='172.8', country='RU', exact_location=0, id=22037, indoor=0, latitude='55.106', longitude='36.814'), sampling_rate=None, sensor=Row(id=34508, pin='7', sensor_type=Row(id=9, manufacturer='various', name='DHT22')), sensordatavalues=[Row(id=12275317954, value='4.10', value_type='temperature')], timestamp='2019-12-16 03:21:02')

In [46]:
test.printSchema()

root
 |-- id: long (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- altitude: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- exact_location: long (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- indoor: long (nullable = true)
 |    |-- latitude: string (nullable = true)
 |    |-- longitude: string (nullable = true)
 |-- sampling_rate: string (nullable = true)
 |-- sensor: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- pin: string (nullable = true)
 |    |-- sensor_type: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- manufacturer: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- sensordatavalues: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- value: string (nullable = true)
 |    |    |-- value_type: string (nullable = true)
 |-- timestamp: string (nullab

In [21]:
ss.read.csv("./data/weather.csv", header=True).printSchema()

root
 |-- year: string (nullable = true)
 |-- avg_temp: string (nullable = true)



In [22]:
# 添加 option 选项进行配置读取方式
ss.read.option("header", True).csv("./data/weather.csv").printSchema()

root
 |-- year: string (nullable = true)
 |-- avg_temp: string (nullable = true)



## 保存数据
使用 [pyspark.sql.readwriter.DataFrameWriter](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter) （即 `DataFrame.write` 调用该对象）将 DataFrame 数据保存到外部对象读取数据。某些特殊类型，可以直接调用相应方法保存(例如 [json](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.json) 、[parquet](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.parquet) 等）。在保存数据时，可以选择保存模式（需要使用 [mode](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.mode)）——添加(`"append"`)、覆盖

In [37]:
test = ss.read.json("./data/data.json")
test[['id','timestamp']].show(4)

+----------+-------------------+
|        id|          timestamp|
+----------+-------------------+
|5756852209|2019-12-13 11:10:02|
|5756852208|2019-12-13 11:10:02|
|5756852207|2019-12-13 11:10:02|
|5756852206|2019-12-13 11:10:02|
+----------+-------------------+
only showing top 4 rows



In [48]:
test.write.json("./data/save_data.json")

## 参考
1. [differences between sc.parallelize and sc.textFile](https://stackoverflow.com/questions/44860973/what-are-the-differences-between-sc-parallelize-and-sc-textfile/44861780#44861780)
2. [Spark Data Type](https://spark.apache.org/docs/latest/sql-reference.html#data-types)
3. [What is Pyspark? – Apache Spark with Python](https://intellipaat.com/blog/tutorial/spark-tutorial/pyspark-tutorial/)

#### 分区
RDD是弹性分布式数据集，通常RDD很大，会被分成很多个分区，分别保存在不同的节点上。RDD分区的一个分区原则是使得分区的个数尽量等于集群中的CPU核心（core）数目。
对于不同的Spark部署模式而言（本地模式、Standalone模式、YARN模式、Mesos模式），一般而言：
* 本地模式：默认为本地机器的CPU数目，若设置了local[N],则默认为N。`SparkContext(master="local[2]")` 或者 `SparkContext().setMaster("local[2]")`
* Apache Mesos
* Standalone或YARN


如果需要查询分区数据，可以通过 `<sparkApp>.defaultParallelism` 查询
```
# 通过 SparkConf 进行设相关属性，需要以 key-value 形式来进行设置
config = SparkConf() \
        .set("Name", "ContextTest") \
        .setMaster("local") \
        .setAppName("First App")


# 创建 SparkContext
sc = SparkContext(conf=config)

# 查询 defaultParallelism
sc.defaultParallelism
```