### SparkSession
- SparkSession是Spark应用程序的统一切入点；它是在Spark 2.0中引入的。它充当Spark所有底层功能的连接器，包括RDD、DataFrames和Datasets，提供统一的接口来处理结构化数据。它是您在开发SparkSQL应用程序时创建的第一批对象之一。作为Spark开发人员，您使用SparkSession.builder()方法创建SparkSession.
- SparkSession将几个以前独立的上下文（如SQLContext、HiveContext和StreamingContext）整合到一个切入点中，简化了与Spark及其不同API的交互。它使用户能够执行各种操作，如从各种来源读取数据、执行SQL查询、创建数据帧和数据集，以及有效地对分布式数据集执行操作。
#### 方法
- master()：通过设置可以使spark以不同模式运行，如local、yarn、standalone cluster等。
- appName()：设置spark应用的名称，可以在Spark UI中看到。
- getOrCreate()：如果存在SparkSession，则返回现有的SparkSession，否则创建一个新的SparkSession。

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]").appName("pyspark-example").getOrCreate()
print(spark)
print("spark version:",spark.sparkContext.appName)
# 停止
spark.stop()

<pyspark.sql.session.SparkSession object at 0x7fd550851070>
spark version: pyspark-example


### SparkContext
- Pyspark.SparkContext是PySpark功能的切入点，用于与集群通信以及创建RDD、累加器和广播变量。通过示例学习如何创建PySpark.SparkContext。
- spark驱动程序创建并使用 SparkContext 连接到集群管理器以提交 PySpark 作业，请注意，每个JVM只能创建一个SparkContext，为了首先创建另一个，您需要使用stop（）方法停止现有的SparkContext。

In [6]:
# pyspark2.0开始， 创建SparkSession 会在内部创建一个 SparkContext 并公开要使用的 parkContext 变量。
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("pyspark-example").getOrCreate()
print(spark.sparkContext)
print("spark app name:",spark.sparkContext.appName)
# 停止
spark.sparkContext.stop()

<SparkContext master=local[1] appName=pyspark-example>
spark app name: pyspark-example


In [10]:
# SparkContext的创建
from pyspark import SparkContext,SparkConf
sc = SparkContext("local", "Spark_Example_App")
print(sc.appName)


conf = SparkConf().setMaster("local[1]").setAppName("pyspark-example")
sc = SparkContext.getOrCreate(conf)
print(sc.appName)

Spark_Example_App
Spark_Example_App


In [13]:
print(sc.applicationId)
# print(sc.uiWebUrl)

local-1753713989747


### RDD
- RDD（弹性分布式数据集）是PySpark的核心构建块，它是一个容错的、不可变的、分布式的对象集合，不可变意味着一旦创建了RDD，就不能更改它。RDD内的数据被分割成逻辑分区，允许跨集群内的多个节点进行分布式计算。
- RDD是类似于Python中列表的对象集合；不同之处在于RDD是在分散在多个物理服务器（也称为集群中的节点）上的多个进程上计算的，而Python集合仅在一个进程中存在和处理。
- RDD可以并行处理，并且可以在集群中的多个节点上存储和计算。RDD支持两种类型的操作：转换transformation（如map、filter）和行动action（如count、collect）。

#### RDD特性
- 不可变性：一旦创建，RDD就不能更改。任何操作都会生成一个新的RDD。
- 容错性：RDD在计算过程中会自动处理节点故障。它通过记录转换操作的血统信息来实现容错，这样在节点失败时可以重新计算丢失的数据。
- 懒惰求值：RDD的操作是懒惰的，只有在需要结果时才会执行。这意味着RDD的转换操作不会立即计算，而是记录下转换的血统信息，直到执行行动操作时才会触发实际计算。
- 分区（并行）：RDD的数据被分割成多个分区，这些分区可以在集群中的不同节点上并行处理。每个分区是一个逻辑上的数据块，可以独立地进行计算。

In [18]:
# 创建RDD ：sparkContext.parallelize()
data = [1,2,3,4,5,6,7,8,9,10]
rdd = sc.parallelize(data)
# 查看RDD的分区数
print("Number of partitions:", rdd.getNumPartitions())
print("RDD content:", rdd.collect())
#手动设置分区数
rdd1 = sc.parallelize(data, 3)
print("Number of partitions:", rdd1.getNumPartitions())
print("RDD content:", rdd1.collect())

Number of partitions: 1


                                                                                

RDD content: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Number of partitions: 3
RDD content: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


In [23]:
# 重新分区：repartition()会触发shuffle适合需要均匀数据的场景、coalesce()合并相邻分区，适合减少分区数的场景
rdd2 = rdd1.repartition(2)
print("Number of partitions after repartitioning:", rdd2.getNumPartitions())
print("Repartitioned RDD content:", rdd2.collect())

rdd3 = rdd2.coalesce(1)
print("Number of partitions after coalesce:", rdd3.getNumPartitions())
print("Repartitioned RDD content:", rdd3.collect())


Number of partitions after repartitioning: 2
Repartitioned RDD content: [7, 8, 9, 10, 1, 2, 3, 4, 5, 6]
Number of partitions after coalesce: 1
Repartitioned RDD content: [7, 8, 9, 10, 1, 2, 3, 4, 5, 6]


In [25]:
# Transformations-惰性操作，返回新的rdd,常用操作算子：flatMap(), map(), reduceByKey(), filter(), sortByKey() 
rdd = sc.textFile("./data/test.txt")  # 从文本文件创建RDD
#flatMap()：和map类似，通过先将函数应用于此 RDD 的所有元素，然后展平结果（去掉嵌套），返回一个新的 RDD。
rdd2 = rdd.flatMap(lambda x: x.split(" "))
#map()：将每个输入元素映射到一个输出元素，返回一个新的RDD。
rdd1 = rdd2.map(lambda x: (x,1))
# reduceByKey()：对具有相同键的值进行聚合，返回一个新的RDD。
rdd3 = rdd1.reduceByKey(lambda x, y: x + y)
# sortByKey()：根据键对元素进行排序，返回一个新的RDD。
rdd4 = rdd3.map(lambda x: (x[1], x[0])).sortByKey(ascending=False)
print("Sorted RDD content:", rdd4.collect())

Sorted RDD content: [(27, 'This'), (27, 'eBook'), (27, 'is'), (27, 'for'), (27, 'the'), (27, 'use'), (27, 'of'), (27, 'anyone'), (27, 'anywhere'), (27, 'at'), (27, 'no'), (27, 'cost'), (27, 'and'), (27, 'with'), (18, 'Alice’s'), (18, 'Adventures'), (18, 'in'), (18, 'Wonderland'), (18, 'by'), (18, 'Lewis'), (18, 'Carroll'), (9, 'Project'), (9, 'Gutenberg’s')]


In [32]:
# action-行动操作，触发计算并返回RDD的值
# count()：返回RDD中元素的数量
count = rdd4.count()
print("Count of elements in RDD:", count)
# first()：返回RDD中的第一个元素
first = rdd4.first()
print("First element in RDD:",first[0],first[1] )
# max()：返回RDD中最大元素
max = rdd4.max()
print("Max element in RDD:", max[0],max[1])
# reduce()：对RDD中的所有元素进行聚合操作
reduce_result = rdd4.reduce(lambda x, y: (x[0] + y[0], x[1] + " " + y[1]))
print("Reduced result:", reduce_result[0])
# take(n)：返回RDD中的前n个元素
take_result = rdd4.take(3)
print("First 3 elements in RDD:", take_result)
# collect()：返回RDD中的所有元素
collect_result = rdd4.collect()
print("All elements in RDD:", collect_result)
# saveAsTextFile(path)：将RDD保存到指定路径
rdd4.saveAsTextFile("./data/output.txt")

Count of elements in RDD: 23
First element in RDD: 27 This
Max element in RDD: 27 with
Reduced result: 522
First 3 elements in RDD: [(27, 'This'), (27, 'eBook'), (27, 'is')]
All elements in RDD: [(27, 'This'), (27, 'eBook'), (27, 'is'), (27, 'for'), (27, 'the'), (27, 'use'), (27, 'of'), (27, 'anyone'), (27, 'anywhere'), (27, 'at'), (27, 'no'), (27, 'cost'), (27, 'and'), (27, 'with'), (18, 'Alice’s'), (18, 'Adventures'), (18, 'in'), (18, 'Wonderland'), (18, 'by'), (18, 'Lewis'), (18, 'Carroll'), (9, 'Project'), (9, 'Gutenberg’s')]
