## 创建SparkSession

通过名为SparkSession的驱动器来控制Spark的应用程序，需要创建一个SparkSession实例来在集群中执行用户定义的操作，每一个Spark应用程序都需要一个SparkSession与之对应。

也就是说，SparkSession就是在创建一个驱动器进程。

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark

创建一个Spark DataFrame，可以把它想成一个将数据存储在多台计算机上的电子表格

当创建DataFrame的时候，大部分时候是不需要手动操作分区，只需要指定数据的高级转换操作，然后让Spark决定此工作如何在集群上运行即可。

In [2]:
myRange = spark.range(1000).toDF('number')
myRange.show(5)

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
+------+
only showing top 5 rows



注意！Spark的核心数据结构在计算过程中是保持不变的，这意味着他们在创建之后无法更改!!

我们对于它的操作更像sql里面生成一个虚拟表，只是对他进行“转换”，在我们调用动作操作之前，Spark并不会真的执行转换操作。

In [3]:
divisBy2 = myRange.where('number % 2 = 0')

## 转换操作

分类：
窄依赖关系(narrow dependency),每个输入分区仅决定一个输出分区的转换，一个分区最多只会对一个输出分区有影响。

宽依赖关系(wide dependency)，每个输入分区决定了多个输出分区，会在整个集群中执行互相交换分区数据的功能，称为洗牌(shuffle)，会将结果写入磁盘。

## 惰性评估

惰性评估(lazy evaluation)的意思是等到绝对需要时才执行计算。在Spark中，当用户表达一些对数据的操作时，不是立即修改数据，而是建立一个作用到原始数据的转换计划（只是记录下操作路径）。

## 动作操作
动作操作是我们能够建立逻辑转换计划，为了触发计算，我们需要运行一个动作操作(action)，一个动作指示Spark在一系列转换操作后计算得到一个结果。

有三类动作：

在控制台中查看数据

在某个语言中将数据汇集为原生对象

写入输出数据源

## 完整的例子

In [4]:
flightData2015 = spark.read.option('inferSchema', 'true').option('header', 'true').csv('./data/flight-data/csv/2015-summary.csv')
flightData2015.show(5)
flightData2015.printSchema()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



注意这些并没有指定行数，因为读取数据也是一种转换操作，属于惰性操作。spark的Dataframe会根据数据来猜测每列应该是什么类型。

In [5]:
flightData2015.sort('count')
flightData2015.show(5)
flightData2015.take(5)
# 发现并没有改变，因为sort也属于转换，并没有实际对原数据进行改动。

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]

虽然我们不知道发生了什么，但是可以利用explain函数来进行观察，阅读解释计划。解释计划从上往下看，上面是最终结果，下面是数据源

In [6]:
flightData2015.sort('count').explain()

== Physical Plan ==
*(2) Sort [count#20 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#20 ASC NULLS FIRST, 200)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#18,ORIGIN_COUNTRY_NAME#19,count#20] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/D:/Jupyter Notebook Project/Pyspark/Spark-The-Definitive-Guide-master/dat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>


通过动作来触发计划的执行。默认情况下，shuffle会输出200哥shuffle分区，暂时将此值设置为5。

In [7]:
# show 和 take 都一样，都属于在控制台上查看数据。要连续调用才可以。
spark.conf.set('spark.sql.shuffle.patitions', '5')
flightData2015.sort('count').show(5)
flightData2015.sort('count').take(5)
# 但是如果我们再倒回去发现并没有对原数据进行一个改变
flightData2015.show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



## Sql操作
想要使用Spark SQL， 可以将任何的DataFrame注册为数据表或者临时表，并使用纯SQL对它进行查询！！！这两者并没有性能差异！！！wow~

可以看下面的输出发现exlpain执行的过程都是一样的，所以并没有差别。

In [8]:
# 创建临时表
flightData2015.createOrReplaceTempView('flight_data_2015')
# sql查询
sqlWay = spark.sql('''
select DEST_COUNTRY_NAME, count(1)
from flight_data_2015
group by DEST_COUNTRY_NAME
''')
# dataframe查询
dataframeWay = flightData2015.groupBy('DEST_COUNTRY_NAME').count()

sqlWay.explain()
dataframeWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#18], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#18, 200)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#18], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#18] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/D:/Jupyter Notebook Project/Pyspark/Spark-The-Definitive-Guide-master/dat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#18], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#18, 200)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#18], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#18] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/D:/Jupyter Notebook Project/Pyspark/Spark-The-Definitive-Guide-master/dat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY

调用函数来进行简化操作

In [14]:
from pyspark.sql.functions import max

flightData2015.select(max('count')).show()
flightData2015.columns

+----------+
|max(count)|
+----------+
|    370002|
+----------+



['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [26]:
# 找到count前五的信息
# maxsql = spark.sql('''
# select DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count
# from flight_data_2015
# order by count desc
# limit 5
# ''')
# maxsql.take(5)


maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

# 这里show会出错，所以改用take，不明白为什么show会出错，注意take必须传入选几行的参数，问题解决，是因为jdk版本的问题，之前版本太高，降了之后就行了！
# maxSql.show()
maxSql.take(5)

[Row(DEST_COUNTRY_NAME='United States', destination_total=411352),
 Row(DEST_COUNTRY_NAME='Canada', destination_total=8399),
 Row(DEST_COUNTRY_NAME='Mexico', destination_total=7140),
 Row(DEST_COUNTRY_NAME='United Kingdom', destination_total=2025),
 Row(DEST_COUNTRY_NAME='Japan', destination_total=1548)]

In [42]:
from pyspark.sql.functions import desc

# 这个地方用show也会出问题，对比起来还是sql更好写
flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "destination_total").sort(desc("destination_total")).limit(5).take(5)

[Row(DEST_COUNTRY_NAME='United States', destination_total=411352),
 Row(DEST_COUNTRY_NAME='Canada', destination_total=8399),
 Row(DEST_COUNTRY_NAME='Mexico', destination_total=7140),
 Row(DEST_COUNTRY_NAME='United Kingdom', destination_total=2025),
 Row(DEST_COUNTRY_NAME='Japan', destination_total=1548)]

In [44]:
flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "destination_total").sort(desc("destination_total")).limit(5).explain()

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#467L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#18,destination_total#467L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#18], functions=[sum(cast(count#20 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#18, 200)
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#18], functions=[partial_sum(cast(count#20 as bigint))])
         +- *(1) FileScan csv [DEST_COUNTRY_NAME#18,count#20] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/D:/Jupyter Notebook Project/Pyspark/Spark-The-Definitive-Guide-master/dat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>
