# SparkSql学习

初始化

In [2]:
import findspark
from pyspark import SparkContext, SparkConf
#指定spark_home为刚才的解压路径,指定python路径
spark_home = "/home/parallels/Desktop/work/spark-3.3.2-bin-hadoop3"
python_path = "/home/parallels/anaconda3/bin/python3"
findspark.init(spark_home,python_path)

import pyspark 
from pyspark.sql import SparkSession

#SparkSQL的许多功能封装在SparkSession的方法接口中

spark = SparkSession.builder \
        .appName("test") \
        .config("master","local[4]") \
        .enableHiveSupport() \
        .getOrCreate()

sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/31 11:16:29 WARN Utils: Your hostname, ubuntu-linux-22-04-desktop resolves to a loopback address: 127.0.1.1; using 10.211.55.3 instead (on interface enp0s5)
23/03/31 11:16:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/03/31 11:16:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 一、RDD，DataFrame和DataSet对比
DataFrame参照了Pandas的思想，在RDD基础上增加了schma，能够获取列名信息。

DataSet在DataFrame基础上进一步增加了数据类型信息，可以在编译时发现类型错误。

DataFrame可以看成DataSet[Row]，两者的API接口完全相同。

DataFrame和DataSet都支持SQL交互式查询，可以和 Hive无缝衔接。

DataSet只有Scala语言和Java语言接口中才支持，在Python和R语言接口只支持DataFrame。

DataFrame数据结构本质上是通过RDD来实现的，但是RDD是一种行存储的数据结构，而DataFrame是一种列存储的数据结构。

## 二、创建DataFrame

### 1. 通过toDF方法转换成DataFrame
可以将RDD用toDF方法转换成DataFrame

In [5]:
#将RDD转换成DataFrame
rdd = sc.parallelize([("LiLei",15,88),("HanMeiMei",16,90),("DaChui",17,60)])
df = rdd.toDF(["name","age","score"])
df.show()
df.printSchema()

                                                                                

+---------+---+-----+
|     name|age|score|
+---------+---+-----+
|    LiLei| 15|   88|
|HanMeiMei| 16|   90|
|   DaChui| 17|   60|
+---------+---+-----+

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- score: long (nullable = true)



### 2. 通过createDataFrame方法将Pandas.DataFrame转换成pyspark中的DataFrame

In [3]:
import pandas as pd

pdf = pd.DataFrame([("LiLei",18),("HanMeiMei",17)],columns = ["name","age"])
df = spark.createDataFrame(pdf)
df.show()

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


+---------+---+
|     name|age|
+---------+---+
|    LiLei| 18|
|HanMeiMei| 17|
+---------+---+



                                                                                

In [4]:
# 也可以对列表直接转换
values = [("LiLei",18),("HanMeiMei",17)]
df = spark.createDataFrame(values,["name","age"])
df.show()

+---------+---+
|     name|age|
+---------+---+
|    LiLei| 18|
|HanMeiMei| 17|
+---------+---+



### 3. 通过createDataFrame方法指定schema动态创建DataFrame
可以通过createDataFrame的方法指定rdd和schema创建DataFrame。

这种方法比较繁琐，但是可以在预先不知道schema和数据类型的情况下在代码中动态创建DataFrame.

In [5]:
from pyspark.sql.types import *
from pyspark.sql import Row
from datetime import datetime

schema = StructType([StructField("name", StringType(), nullable = False),
                     StructField("score", IntegerType(), nullable = True),
                     StructField("birthday", DateType(), nullable = True)])

rdd = sc.parallelize([Row("LiLei",87,datetime(2010,1,5)),
                      Row("HanMeiMei",90,datetime(2009,3,1)),
                      Row("DaChui",None,datetime(2008,7,2))])

dfstudent = spark.createDataFrame(rdd, schema)

dfstudent.show()

+---------+-----+----------+
|     name|score|  birthday|
+---------+-----+----------+
|    LiLei|   87|2010-01-05|
|HanMeiMei|   90|2009-03-01|
|   DaChui| null|2008-07-02|
+---------+-----+----------+



### 4. 通过读取文件创建
可以读取json文件，csv文件，hive数据表或者mysql数据表得到DataFrame。

In [7]:
#读取json文件生成DataFrame
df = spark.read.json("../data/people.json")
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [8]:
#读取csv文件
df = spark.read.option("header","true") \
 .option("inferSchema","true") \
 .option("delimiter", ",") \
 .csv("../data/iris.csv")
df.show(5)
df.printSchema()

+-----------+----------+-----------+----------+-----+
|sepallength|sepalwidth|petallength|petalwidth|label|
+-----------+----------+-----------+----------+-----+
|        5.1|       3.5|        1.4|       0.2|    0|
|        4.9|       3.0|        1.4|       0.2|    0|
|        4.7|       3.2|        1.3|       0.2|    0|
|        4.6|       3.1|        1.5|       0.2|    0|
|        5.0|       3.6|        1.4|       0.2|    0|
+-----------+----------+-----------+----------+-----+
only showing top 5 rows

root
 |-- sepallength: double (nullable = true)
 |-- sepalwidth: double (nullable = true)
 |-- petallength: double (nullable = true)
 |-- petalwidth: double (nullable = true)
 |-- label: integer (nullable = true)



In [10]:
#读取csv文件
df = spark.read.format("com.databricks.spark.csv") \
 .option("header","true") \
 .option("inferSchema","true") \
 .option("delimiter", ",") \
 .load("../data/iris.csv")
df.show(5)
df.printSchema()

+-----------+----------+-----------+----------+-----+
|sepallength|sepalwidth|petallength|petalwidth|label|
+-----------+----------+-----------+----------+-----+
|        5.1|       3.5|        1.4|       0.2|    0|
|        4.9|       3.0|        1.4|       0.2|    0|
|        4.7|       3.2|        1.3|       0.2|    0|
|        4.6|       3.1|        1.5|       0.2|    0|
|        5.0|       3.6|        1.4|       0.2|    0|
+-----------+----------+-----------+----------+-----+
only showing top 5 rows

root
 |-- sepallength: double (nullable = true)
 |-- sepalwidth: double (nullable = true)
 |-- petallength: double (nullable = true)
 |-- petalwidth: double (nullable = true)
 |-- label: integer (nullable = true)



In [11]:
#读取parquet文件
df = spark.read.parquet("../data/users.parquet")
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [13]:
#读取hive数据表生成DataFrame

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH '../data/kv1.txt' INTO TABLE src")
df = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
df.show(5)

+---+-----+
|key|value|
+---+-----+
|  0|val_0|
|  0|val_0|
|  0|val_0|
|  2|val_2|
|  4|val_4|
+---+-----+
only showing top 5 rows



In [None]:
#读取mysql数据表生成DataFrame
"""
url = "jdbc:mysql://localhost:3306/test"
df = spark.read.format("jdbc") \
 .option("url", url) \
 .option("dbtable", "runoob_tbl") \
 .option("user", "root") \
 .option("password", "0845") \
 .load()\
df.show()
"""


## 三、DataFrame保存成文件
可以保存成csv文件，json文件，parquet文件或者保存成hive数据表

In [15]:
#保存成csv文件
df = spark.read.format("json").load("../data/people.json")
df.write.format("csv").option("header","true").save("data/people_write.csv")

In [16]:
#先转换成rdd再保存成txt文件
df.rdd.saveAsTextFile("data/people_rdd.txt")

In [17]:
#保存成json文件
df.write.json("data/people_write.json")

In [18]:
#保存成parquet文件, 压缩格式, 占用存储小, 且是spark内存中存储格式，加载最快
df.write.partitionBy("age").format("parquet").save("data/namesAndAges.parquet")
df.write.parquet("data/people_write.parquet")

In [19]:
#保存成hive数据表
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

## DataFrame的API交互

In [20]:
from pyspark.sql import Row
from pyspark.sql.functions import * 

df = spark.createDataFrame(
    [("LiLei",15,"male"),
     ("HanMeiMei",16,"female"),
     ("DaChui",17,"male")]).toDF("name","age","gender")

df.show()
df.printSchema()

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
+---------+---+------+

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)



### 1. Action操作
DataFrame的Action操作包括show,count,collect,,describe,take,head,first等操作。

In [21]:
#show
df.show()

+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
+---------+---+------+



In [22]:
#show(numRows: Int, truncate: Boolean) 
#第二个参数设置是否当输出字段长度超过20时进行截取
df.show(2,False) 

+---------+---+------+
|name     |age|gender|
+---------+---+------+
|LiLei    |15 |male  |
|HanMeiMei|16 |female|
+---------+---+------+
only showing top 2 rows



In [23]:
#count
df.count()

3

In [24]:
#collect
df.collect()

[Row(name='LiLei', age=15, gender='male'),
 Row(name='HanMeiMei', age=16, gender='female'),
 Row(name='DaChui', age=17, gender='male')]

In [26]:
#first
df.first()

Row(name='LiLei', age=15, gender='male')

In [27]:
#head
df.take(2)

[Row(name='LiLei', age=15, gender='male'),
 Row(name='HanMeiMei', age=16, gender='female')]

In [28]:
#head
df.head(2)

[Row(name='LiLei', age=15, gender='male'),
 Row(name='HanMeiMei', age=16, gender='female')]

### 2. 类RDD操作
DataFrame支持RDD中一些诸如distinct,cache,sample,foreach,intersect,except等操作。

可以把DataFrame当做数据类型为Row的RDD来进行操作，必要时可以将其转换成RDD来操作。

In [29]:
df = spark.createDataFrame([("Hello World",),("Hello China",),("Hello Spark",)]).toDF("value")
df.show()

+-----------+
|      value|
+-----------+
|Hello World|
|Hello China|
|Hello Spark|
+-----------+

