# SparkSQL学习
SparkSQL通过调用SQL语句或DataFrame相关API描述操作

In [1]:
import findspark

spark_home = 'D:\spark3.0.1\spark-3.0.1-bin-hadoop3.2'
python_path = 'D:\Python3\python.exe'

findspark.init(spark_home, python_path)
# 配置spark和python的路径

In [2]:
import pyspark
from pyspark.sql import SparkSession

#SparkSQL的大部分功能封装在SparkSession的方法接口中

spark = SparkSession.builder \
        .appName("test") \
        .config("master","local[4]") \
        .enableHiveSupport() \
        .getOrCreate()

sc = spark.sparkContext

## DataFrame
DataFrame借鉴了pandas的思想，能够按照列进行获取信息；而在RDD部分学习过程时，所有的行为都是List化的，对每个对象进行处理/过滤等任务，是按行处理数据的逻辑。
是不是可以说，DataFrame中有多列属性，类似于多PairRDD，对PairRDD更结构化得处理。
DataFrame相较于RDD加入了schema(模式)的概念，**schema是描述数据结构和字段类型的元数据信息，它用来定义每个列的名称、数据类型和其他属性**。

### DataFrame构建
1. 从RDD构建(toDF)；
2. 从pandas的DataFrame对象构建(createDataFrame)
3. 从list对象直接创建(createDataFrame)
4. 显式设定schema和rdd细节构建(createDataFrame)
5. 从文件创建(json文件、csv文件、hive表数据、mysql表数据)

前四种方法都是小规模数据测试使用，主流方法应为从文件创建

In [3]:
rdd = sc.parallelize([("LiLei",15,88),("HanMeiMei",16,90),("DaChui",17,60)])
# 构建一个普通的RDD，RDD本质上还是按行来处理的

In [5]:
df = rdd.toDF(['name', 'age', 'score'])
# 定义schema中的名称选项，即可从RDD构建DataFrame

In [6]:
df.show()
df.printSchema()
# schema中包括了名称、数据类型和额外属性

+---------+---+-----+
|     name|age|score|
+---------+---+-----+
|    LiLei| 15|   88|
|HanMeiMei| 16|   90|
|   DaChui| 17|   60|
+---------+---+-----+

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- score: long (nullable = true)



In [3]:
import pandas as pd

pdf = pd.DataFrame([("LiLei",18),("HanMeiMei",17)],columns = ["name","age"])

In [8]:
# 从pandas对象构建
df = spark.createDataFrame(pdf)
df.show()

+---------+---+
|     name|age|
+---------+---+
|    LiLei| 18|
|HanMeiMei| 17|
+---------+---+



In [9]:
values = [("LiLei",18),("HanMeiMei",17)]
df = spark.createDataFrame(values,["name","age"])
df.show()

+---------+---+
|     name|age|
+---------+---+
|    LiLei| 18|
|HanMeiMei| 17|
+---------+---+



In [4]:
from pyspark.sql.types import StringType, IntegerType, DateType, StructType, StructField
from pyspark.sql import Row
from datetime import datetime

In [35]:
schema = StructType(
    [
        StructField("name", StringType(), nullable=False),
        StructField("score", IntegerType(), nullable=True),
        StructField("birthday", DateType(), nullable=True)
     ]
)
# 设定schema为Struct类型，每个字段都设定名称、类型和是否可为空

In [36]:
rdd = sc.parallelize(
    [   Row("LiLei",87,datetime(2010,1,5)),
        Row("HanMeiMei",90,datetime(2009,3,1)),
        Row("DaChui",None,datetime(2008,7,2))
     ]
)

In [37]:
dfstudent = spark.createDataFrame(rdd, schema)
dfstudent.show()

+---------+-----+----------+
|     name|score|  birthday|
+---------+-----+----------+
|    LiLei|   87|2010-01-05|
|HanMeiMei|   90|2009-03-01|
|   DaChui| null|2008-07-02|
+---------+-----+----------+



### 通过文件创建

In [38]:
df_json = spark.read.json('data/people.json')
df.show()
# 读取json文件

+---------+---+
|     name|age|
+---------+---+
|    LiLei| 18|
|HanMeiMei| 17|
+---------+---+



In [5]:
df_csv = spark.read\
    .option("header", "true") \
    .option("delimiter", ",") \
    .option("inferSchema", 'true') \
    .csv('data/iris.csv')

df_csv.show(5)
df_csv.printSchema()

# 从csv文件读取 要设置表头、分隔符等

+-----------+----------+-----------+----------+-----+
|sepallength|sepalwidth|petallength|petalwidth|label|
+-----------+----------+-----------+----------+-----+
|        5.1|       3.5|        1.4|       0.2|    0|
|        4.9|       3.0|        1.4|       0.2|    0|
|        4.7|       3.2|        1.3|       0.2|    0|
|        4.6|       3.1|        1.5|       0.2|    0|
|        5.0|       3.6|        1.4|       0.2|    0|
+-----------+----------+-----------+----------+-----+
only showing top 5 rows

root
 |-- sepallength: double (nullable = true)
 |-- sepalwidth: double (nullable = true)
 |-- petallength: double (nullable = true)
 |-- petalwidth: double (nullable = true)
 |-- label: integer (nullable = true)



In [41]:
df_parquet = spark.read.parquet('data/users.parquet')
df_parquet.show()
# 直接从parquet文件读取

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



### DataFrame存储
1. 保存为csv
2. 转为rdd后再保存为txt
3. 保存为json
4. 保存成parquet文件
5. 保存入数据库

In [48]:
df_csv.show(5)
df_csv.printSchema()

+-----------+----------+-----------+----------+-----+
|sepallength|sepalwidth|petallength|petalwidth|label|
+-----------+----------+-----------+----------+-----+
|        5.1|       3.5|        1.4|       0.2|    0|
|        4.9|       3.0|        1.4|       0.2|    0|
|        4.7|       3.2|        1.3|       0.2|    0|
|        4.6|       3.1|        1.5|       0.2|    0|
|        5.0|       3.6|        1.4|       0.2|    0|
+-----------+----------+-----------+----------+-----+
only showing top 5 rows

root
 |-- sepallength: double (nullable = true)
 |-- sepalwidth: double (nullable = true)
 |-- petallength: double (nullable = true)
 |-- petalwidth: double (nullable = true)
 |-- label: integer (nullable = true)



In [7]:
# 直接以csv形式存储
df_csv.write.format('csv').option("header", 'true').save('./data/iris_write.csv')

In [8]:
# 转为rdd再保存为txt文件
df_csv.rdd.saveAsTextFile('/data/iris_write.txt')

In [8]:
# 以json形式存储
df_csv.write.json('./data/iris_write.json')

In [None]:
# 保存成parquet文件 压缩格式，占用存储小，且为spark内存中的存储形式，加载最快
df_csv.write.partitionBy('label').format("parquet").save('data/iris_write.parquet')
# 或者
df_csv.write.parquet('data/iris_write.parquet')

## DataFrame的API操作

In [45]:
from pyspark.sql import Row, Column
import pyspark.sql.functions as F

In [10]:
df = spark.createDataFrame(
    [("LiLei",15,"male"),
     ("HanMeiMei",16,"female"),
     ("DaChui",17,"male")
     ]
).toDF("name", 'age', 'gender')

df.show()
df.printSchema()

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
+---------+---+------+

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)



### Action操作
Action类操作主要是对信息的展现，包括show、count、collect、describe、take、head、first等

In [11]:
# show(numRows, truncate:Boolean)方法
df.show()
# 全显式
df.show(2, truncate=True)
# 仅显示两行

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
+---------+---+------+

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
+---------+---+------+
only showing top 2 rows



In [12]:
# count() 返回DataFrame的行数
df.count()

3

In [13]:
# collect() 应该类似于RDD中的collect 将数据整合展示
df.collect()

[Row(name='LiLei', age=15, gender='male'),
 Row(name='HanMeiMei', age=16, gender='female'),
 Row(name='DaChui', age=17, gender='male')]

In [14]:
# first 最上面一行  take(n) 显示前面n个 head(n) 显示顶部n个
df.first()
df.take(2)
df.head(2)

[Row(name='LiLei', age=15, gender='male'),
 Row(name='HanMeiMei', age=16, gender='female')]

### 类RDD操作
DataFrame支持RDD中的一些诸如distinct、cache、sample、foreach、intersect、except等操作。
**可以把DataFrame当做数据类型为Row的RDD**
这类方法似乎有些鸡肋

In [15]:
df.show()

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
+---------+---+------+



In [21]:
# map方法 先转换为RDD
rdd = df.rdd.map(lambda x:Row(x[0], x[1], x[2].upper()))
rdd.collect()

[<Row('LiLei', 15, 'MALE')>,
 <Row('HanMeiMei', 16, 'FEMALE')>,
 <Row('DaChui', 17, 'MALE')>]

In [25]:
rdd.toDF(["name", 'age', 'upper_gender']).show()

+---------+---+------------+
|     name|age|upper_gender|
+---------+---+------------+
|    LiLei| 15|        MALE|
|HanMeiMei| 16|      FEMALE|
|   DaChui| 17|        MALE|
+---------+---+------------+

+---------+---+------------+
|     name|age|upper_gender|
+---------+---+------------+
|    LiLei| 15|        MALE|
|HanMeiMei| 16|      FEMALE|
|   DaChui| 17|        MALE|
+---------+---+------------+



In [26]:
# 过滤filter功能 也要转为RDD
df_filtered = df.rdd.filter(lambda x:x[1]>=16).toDF(['name', 'age', 'gender'])
df_filtered.show()

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
+---------+---+------+



In [29]:
# distinct功能 不用转为RDD 直接可以去重
# 行中每一列均相同才会去重

df = spark.createDataFrame(
    [("LiLei",15,"male"),
     ("HanMeiMei",16,"female"),
     ("DaChui",17,"male"),
     ("LiLei",15,"male"),
     ("DaChui",18,"male")
     ]
).toDF("name", 'age', 'gender')

df.distinct().show()

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|   DaChui| 17|  male|
|HanMeiMei| 16|female|
|    LiLei| 15|  male|
|   DaChui| 18|  male|
+---------+---+------+



In [30]:
df = spark.createDataFrame(
    [("LiLei",15,"male"),
     ("HanMeiMei",16,"female"),
     ("DaChui",17,"male"),
     ]
).toDF("name", 'age', 'gender')

In [32]:
# 采样功能
df_sample = df.sample(0.1,0)

df_sample.show()

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|HanMeiMei| 16|female|
+---------+---+------+



In [38]:
# 两个DataFrame的集合运算

df2 = spark.createDataFrame(
    [("Xiaoming",80,"male"),
     ("Xiaohong",81,"female"),
     ("Xiaozhang",82,"male"),
     ]
).toDF("name", "score", "gender")

In [37]:
# 当两个DataFrame的schema有多列时 似乎不能进行集合运算

df_intersect = df.intersect(df2)
df_intersect.show()

+----+---+------+
|name|age|gender|
+----+---+------+
+----+---+------+



In [39]:
df3 = spark.createDataFrame(
    [("Xiaoming",80,"male"),
     ("Xiaohong",81,"female"),
     ("Xiaozhang",82,"male"),
     ]
).toDF("name", "age", "gender")

In [40]:
df_intersect = df.intersect(df3)
df_intersect.show()

+----+---+------+
|name|age|gender|
+----+---+------+
+----+---+------+



### 类Pandas操作
1. 广播操作
2. 增加/删除/重命名列(withColumn, drop, withColumnRenamed)
3. 排序(sort orderBy)
4. 异常值处理(na.drop na.fill na.replace)
5. 过滤处理 (replace替换、dropDuplicate去重
6. 简单函数 agg聚合操作 describe汇总信息

In [42]:
df = spark.createDataFrame([
("LiLei",15,"male"),
("HanMeiMei",16,"female"),
("DaChui",17,"male"),
("RuHua",16,None)
]).toDF("name","age","gender")

df.show()
df.printSchema()

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
|    RuHua| 16|  null|
+---------+---+------+

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)



In [43]:
# 广播操作 和 增加列 withColumn操作

df_new = df.withColumn('birthday', 2020-df['age'])
# 和pandas一样对列名的操作是广播出去的 即维度为(1,)的对象与维度为(Row_num, 1)的对象的运算

df_new.show()

+---------+---+------+--------+
|     name|age|gender|birthday|
+---------+---+------+--------+
|    LiLei| 15|  male|    2005|
|HanMeiMei| 16|female|    2004|
|   DaChui| 17|  male|    2003|
|    RuHua| 16|  null|    2004|
+---------+---+------+--------+



In [49]:
# 删除列 drop
df_dropped = df.drop("gender")
df_dropped.show()

+---------+---+
|     name|age|
+---------+---+
|    LiLei| 15|
|HanMeiMei| 16|
|   DaChui| 17|
|    RuHua| 16|
+---------+---+



In [50]:
# 重命名列
df_renamed = df.withColumnRenamed('gender', 'sex')
df_renamed.show()

+---------+---+------+
|     name|age|   sex|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
|    RuHua| 16|  null|
+---------+---+------+



In [52]:
# 排序 sort(排序规则)
sort_rule = df['age'].desc()
print(type(sort_rule))
# 生成的column对象 作为排序方式

df_sorted = df.sort(sort_rule)
df_sorted.show()
# sort方法调用排序方式

<class 'pyspark.sql.column.Column'>
+---------+---+------+
|     name|age|gender|
+---------+---+------+
|   DaChui| 17|  male|
|    RuHua| 16|  null|
|HanMeiMei| 16|female|
|    LiLei| 15|  male|
+---------+---+------+



In [56]:
# 也可以直接按照某列名或多个列进行排序
df.sort('age').show()

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|    RuHua| 16|  null|
|   DaChui| 17|  male|
+---------+---+------+



In [57]:
# orderby方法可以按照多个字段进行排序
df_ordered = df.orderBy(df['age'].desc(), df['name'].asc())
df_ordered.show()

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|   DaChui| 17|  male|
|HanMeiMei| 16|female|
|    RuHua| 16|  null|
|    LiLei| 15|  male|
+---------+---+------+



In [58]:
# drop去掉空值
df_not_na = df.na.drop()
df_not_na.show()

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
+---------+---+------+



In [59]:
# na.fill 填充空值
df_fill = df.na.fill("female")
df_fill.show()

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
|    RuHua| 16|female|
+---------+---+------+



In [65]:
# replace方法可以直接替换
df_replaced2 = df.replace({"":"female", "RuHua":"SiYu"})
df_replaced2.show()
# 空值没替换成功

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
|     SiYu| 16|  null|
+---------+---+------+



In [66]:
df2 = df.unionAll(df)
df2.show()
# 使用unionAll进行连接 由于是unionAll所以不会自动去重

df_unique = df.dropDuplicates()
df_unique.show()
# dropDuplicates进行去重

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
|    RuHua| 16|  null|
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
|    RuHua| 16|  null|
+---------+---+------+

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    RuHua| 16|  null|
|   DaChui| 17|  male|
|HanMeiMei| 16|female|
|    LiLei| 15|  male|
+---------+---+------+



In [67]:
# 简单聚合操作agg: 使用简单的名称描述方法
df_agg = df.agg({"name": "count", "age":"max"})
df_agg.show()

+-----------+--------+
|count(name)|max(age)|
+-----------+--------+
|          4|      17|
+-----------+--------+



In [68]:
# 经典的describe方法获得表细节

df_desc = df.describe()
df_desc.show()

+-------+------+-----------------+------+
|summary|  name|              age|gender|
+-------+------+-----------------+------+
|  count|     4|                4|     3|
|   mean|  null|             16.0|  null|
| stddev|  null|0.816496580927726|  null|
|    min|DaChui|               15|female|
|    max| RuHua|               17|  male|
+-------+------+-----------------+------+



### 类SQL表操作 ☆
类SQL操作是最常用且与SQL表最为密切的处理方式
将DataFrame看做数据库表，或者本身就是从数据库读取出的数据构建的DataFrame。
这样用pyspark可以执行SQL类任务。  