# <center>Apache Spark</center>  火花

In [1]:
from pyspark.sql import SparkSession  # 引入包  SparkSession 一些写好的函数
import pyspark.sql.functions as F     # 一系列的函数

## Create a spark session

This is the entry point of spark operations

In [2]:
spark = SparkSession.builder.appName("HelloSpark").getOrCreate()

## Create dataframe from Python array of tuples

In [3]:
name = spark.createDataFrame(
    [(0, "Alex"), (1, "Bob"), (2, "Cherry"), (3, "Dan"), (4, "Ethan"), (5, "Flynn")],
    ["id", "name"])

math = spark.createDataFrame(
    [(0, 95), (1, 98), (2, 73), (3, 54), (4, 68), (5, 98)],
    ["id", "math"])

english = spark.createDataFrame(
    [(0, 90), (1, 80), (2, 85), (3, 68), (4, 65), (5, 97)],
    ["id", "english"])

chinese = spark.createDataFrame(
    [(0, 79), (1, 89), (2, 86), (3, 57), (4, 86), (5, 99)],
    ["id", "chinese"])

physics = spark.createDataFrame(
    [(0, 86), (1, 95), (2, 88), (3, 96), (4, 68), (5, 96)],
    ["id", "physics"])

chemistry = spark.createDataFrame(
    [(0, 67), (1, 71), (2, 85), (3, 68), (4, 95), (5, 95)],
    ["id", "chemistry"])

history = spark.createDataFrame(
    [(0, 73), (1, 80), (2, 91), (3, 57), (4, 78), (5, 99)],
    ["id", "history"])

## Basic Spark operations

In [4]:
name.show()

+---+------+
| id|  name|
+---+------+
|  0|  Alex|
|  1|   Bob|
|  2|Cherry|
|  3|   Dan|
|  4| Ethan|
|  5| Flynn|
+---+------+



In [5]:
english.show()

+---+-------+
| id|english|
+---+-------+
|  0|     90|
|  1|     80|
|  2|     85|
|  3|     68|
|  4|     65|
|  5|     97|
+---+-------+



In [6]:
math.show()

+---+----+
| id|math|
+---+----+
|  0|  95|
|  1|  98|
|  2|  73|
|  3|  54|
|  4|  68|
|  5|  98|
+---+----+



In [7]:
result = name.join(math, 'id')
result.show()

+---+------+----+
| id|  name|math|
+---+------+----+
|  0|  Alex|  95|
|  5| Flynn|  98|
|  1|   Bob|  98|
|  3|   Dan|  54|
|  2|Cherry|  73|
|  4| Ethan|  68|
+---+------+----+



In [8]:
for x in [english, chinese, physics, chemistry, history]:
    result = result.join(x, 'id')

In [9]:
result.show()

+---+------+----+-------+-------+-------+---------+-------+
| id|  name|math|english|chinese|physics|chemistry|history|
+---+------+----+-------+-------+-------+---------+-------+
|  0|  Alex|  95|     90|     79|     86|       67|     73|
|  5| Flynn|  98|     97|     99|     96|       95|     99|
|  1|   Bob|  98|     80|     89|     95|       71|     80|
|  3|   Dan|  54|     68|     57|     96|       68|     57|
|  2|Cherry|  73|     85|     86|     88|       85|     91|
|  4| Ethan|  68|     65|     86|     68|       95|     78|
+---+------+----+-------+-------+-------+---------+-------+



In [10]:
result = result.cache()
result.count()

6

In [11]:
for x in ['math', 'english', 'chinese', 'physics', 'chemistry', 'history']:
    result.describe().select('summary', x).show()

+-------+------------------+
|summary|              math|
+-------+------------------+
|  count|                 6|
|   mean|              81.0|
| stddev|18.633303518163384|
|    min|                54|
|    max|                98|
+-------+------------------+

+-------+------------------+
|summary|           english|
+-------+------------------+
|  count|                 6|
|   mean| 80.83333333333333|
| stddev|12.480651692386365|
|    min|                65|
|    max|                97|
+-------+------------------+

+-------+------------------+
|summary|           chinese|
+-------+------------------+
|  count|                 6|
|   mean| 82.66666666666667|
| stddev|14.151560573543353|
|    min|                57|
|    max|                99|
+-------+------------------+

+-------+------------------+
|summary|           physics|
+-------+------------------+
|  count|                 6|
|   mean| 88.16666666666667|
| stddev|10.778064142816497|
|    min|                68|
|    max|  

In [12]:
biology = spark.createDataFrame(
    [(0, 89), (1, 87), (3, 88), (5, 95)],
    ["id", "biology"])

In [13]:
result_with_biology = result.join(biology, 'id')
result_with_biology.show()

+---+-----+----+-------+-------+-------+---------+-------+-------+
| id| name|math|english|chinese|physics|chemistry|history|biology|
+---+-----+----+-------+-------+-------+---------+-------+-------+
|  0| Alex|  95|     90|     79|     86|       67|     73|     89|
|  1|  Bob|  98|     80|     89|     95|       71|     80|     87|
|  3|  Dan|  54|     68|     57|     96|       68|     57|     88|
|  5|Flynn|  98|     97|     99|     96|       95|     99|     95|
+---+-----+----+-------+-------+-------+---------+-------+-------+



In [14]:
result_with_biology2 = result.join(biology, 'id', how='left')
result_with_biology2.show()

+---+------+----+-------+-------+-------+---------+-------+-------+
| id|  name|math|english|chinese|physics|chemistry|history|biology|
+---+------+----+-------+-------+-------+---------+-------+-------+
|  0|  Alex|  95|     90|     79|     86|       67|     73|     89|
|  5| Flynn|  98|     97|     99|     96|       95|     99|     95|
|  1|   Bob|  98|     80|     89|     95|       71|     80|     87|
|  3|   Dan|  54|     68|     57|     96|       68|     57|     88|
|  2|Cherry|  73|     85|     86|     88|       85|     91|   null|
|  4| Ethan|  68|     65|     86|     68|       95|     78|   null|
+---+------+----+-------+-------+-------+---------+-------+-------+



In [15]:
french = spark.createDataFrame(
    [(0, 75), (2, 86), (5, 99)],
    ["id", "french"])

In [16]:
result3 = result_with_biology2.join(french, 'id', how='left')
result3.show()

+---+------+----+-------+-------+-------+---------+-------+-------+------+
| id|  name|math|english|chinese|physics|chemistry|history|biology|french|
+---+------+----+-------+-------+-------+---------+-------+-------+------+
|  0|  Alex|  95|     90|     79|     86|       67|     73|     89|    75|
|  5| Flynn|  98|     97|     99|     96|       95|     99|     95|    99|
|  1|   Bob|  98|     80|     89|     95|       71|     80|     87|  null|
|  3|   Dan|  54|     68|     57|     96|       68|     57|     88|  null|
|  2|Cherry|  73|     85|     86|     88|       85|     91|   null|    86|
|  4| Ethan|  68|     65|     86|     68|       95|     78|   null|  null|
+---+------+----+-------+-------+-------+---------+-------+-------+------+



In [17]:
result4 = result3.fillna(0)
result4.show()

+---+------+----+-------+-------+-------+---------+-------+-------+------+
| id|  name|math|english|chinese|physics|chemistry|history|biology|french|
+---+------+----+-------+-------+-------+---------+-------+-------+------+
|  0|  Alex|  95|     90|     79|     86|       67|     73|     89|    75|
|  5| Flynn|  98|     97|     99|     96|       95|     99|     95|    99|
|  1|   Bob|  98|     80|     89|     95|       71|     80|     87|     0|
|  3|   Dan|  54|     68|     57|     96|       68|     57|     88|     0|
|  2|Cherry|  73|     85|     86|     88|       85|     91|      0|    86|
|  4| Ethan|  68|     65|     86|     68|       95|     78|      0|     0|
+---+------+----+-------+-------+-------+---------+-------+-------+------+



In [18]:
result5 = result4.withColumn('average', (F.col('math') + 
                                         F.col('english') +
                                         F.col('chinese') +
                                         F.col('physics') +
                                         F.col('chemistry') +
                                         F.col('history') +
                                         F.col('biology') +
                                         F.col('french'))/8)

In [19]:
result5.show()

+---+------+----+-------+-------+-------+---------+-------+-------+------+-------+
| id|  name|math|english|chinese|physics|chemistry|history|biology|french|average|
+---+------+----+-------+-------+-------+---------+-------+-------+------+-------+
|  0|  Alex|  95|     90|     79|     86|       67|     73|     89|    75|  81.75|
|  5| Flynn|  98|     97|     99|     96|       95|     99|     95|    99|  97.25|
|  1|   Bob|  98|     80|     89|     95|       71|     80|     87|     0|   75.0|
|  3|   Dan|  54|     68|     57|     96|       68|     57|     88|     0|   61.0|
|  2|Cherry|  73|     85|     86|     88|       85|     91|      0|    86|  74.25|
|  4| Ethan|  68|     65|     86|     68|       95|     78|      0|     0|   57.5|
+---+------+----+-------+-------+-------+---------+-------+-------+------+-------+



In [20]:
result6 = result5.withColumn('rating', F.when(F.col('average') >= 90, F.lit('A')).otherwise(
                                       F.when(F.col('average') >= 80, F.lit('B')).otherwise(
                                       F.when(F.col('average') >= 60, F.lit('C')).otherwise(
                                       F.lit('D')))))
result6.show()

+---+------+----+-------+-------+-------+---------+-------+-------+------+-------+------+
| id|  name|math|english|chinese|physics|chemistry|history|biology|french|average|rating|
+---+------+----+-------+-------+-------+---------+-------+-------+------+-------+------+
|  0|  Alex|  95|     90|     79|     86|       67|     73|     89|    75|  81.75|     B|
|  5| Flynn|  98|     97|     99|     96|       95|     99|     95|    99|  97.25|     A|
|  1|   Bob|  98|     80|     89|     95|       71|     80|     87|     0|   75.0|     C|
|  3|   Dan|  54|     68|     57|     96|       68|     57|     88|     0|   61.0|     C|
|  2|Cherry|  73|     85|     86|     88|       85|     91|      0|    86|  74.25|     C|
|  4| Ethan|  68|     65|     86|     68|       95|     78|      0|     0|   57.5|     D|
+---+------+----+-------+-------+-------+---------+-------+-------+------+-------+------+



In [21]:
result6.groupBy('rating').count().show()

+------+-----+
|rating|count|
+------+-----+
|     B|    1|
|     D|    1|
|     C|    3|
|     A|    1|
+------+-----+



In [22]:
group_by_result = result6.groupBy('rating').count()

In [23]:
pass_rate = 1 - group_by_result.filter('rating = "D"').count() / result6.count()
pass_rate

0.8333333333333334

In [24]:
print("pass rate = {0:.2f}".format(pass_rate * 100))

pass rate = 83.33


### Spark filters

In [25]:
result6.filter('rating = "A"').show()

+---+-----+----+-------+-------+-------+---------+-------+-------+------+-------+------+
| id| name|math|english|chinese|physics|chemistry|history|biology|french|average|rating|
+---+-----+----+-------+-------+-------+---------+-------+-------+------+-------+------+
|  5|Flynn|  98|     97|     99|     96|       95|     99|     95|    99|  97.25|     A|
+---+-----+----+-------+-------+-------+---------+-------+-------+------+-------+------+



In [26]:
result6.filter('math < 60').show()

+---+----+----+-------+-------+-------+---------+-------+-------+------+-------+------+
| id|name|math|english|chinese|physics|chemistry|history|biology|french|average|rating|
+---+----+----+-------+-------+-------+---------+-------+-------+------+-------+------+
|  3| Dan|  54|     68|     57|     96|       68|     57|     88|     0|   61.0|     C|
+---+----+----+-------+-------+-------+---------+-------+-------+------+-------+------+



## Spark Joins

![spark_joins](./pics/spark_joins.png)

In [27]:
biology.show()

+---+-------+
| id|biology|
+---+-------+
|  0|     89|
|  1|     87|
|  3|     88|
|  5|     95|
+---+-------+



In [28]:
french.show()

+---+------+
| id|french|
+---+------+
|  0|    75|
|  2|    86|
|  5|    99|
+---+------+



### inner

In [29]:
biology.join(french, 'id', how='inner').show()

+---+-------+------+
| id|biology|french|
+---+-------+------+
|  0|     89|    75|
|  5|     95|    99|
+---+-------+------+



### outer, full, fullouter, full_outer

In [30]:
biology.join(french, 'id', how='outer').show()
#biology.join(french, 'id', how='full').show()
#biology.join(french, 'id', how='fullouter').show()
#biology.join(french, 'id', how='full_outer').show()

+---+-------+------+
| id|biology|french|
+---+-------+------+
|  0|     89|    75|
|  5|     95|    99|
|  1|     87|  null|
|  3|     88|  null|
|  2|   null|    86|
+---+-------+------+



### left, leftouter, left_outer

In [31]:
biology.join(french, 'id', how='left').show()
#biology.join(french, 'id', how='leftouter').show()
#biology.join(french, 'id', how='left_outer').show()

+---+-------+------+
| id|biology|french|
+---+-------+------+
|  0|     89|    75|
|  5|     95|    99|
|  1|     87|  null|
|  3|     88|  null|
+---+-------+------+



### right, rightouter, right_outer

In [32]:
biology.join(french, 'id', how='right').show()
#biology.join(french, 'id', how='rightouter').show()
#biology.join(french, 'id', how='right_outer').show()

+---+-------+------+
| id|biology|french|
+---+-------+------+
|  0|     89|    75|
|  5|     95|    99|
|  2|   null|    86|
+---+-------+------+



### leftanti, left_anti

In [33]:
# in left table, not in right table
biology.join(french, 'id', how='leftanti').show()
#biology.join(french, 'id', how='left_anti').show()

+---+-------+
| id|biology|
+---+-------+
|  1|     87|
|  3|     88|
+---+-------+



### leftsemi, left_semi

In [34]:
# only get columns from the left table
biology.join(french, 'id', how='leftsemi').show()
#biology.join(french, 'id', how='left_semi').show()

+---+-------+
| id|biology|
+---+-------+
|  0|     89|
|  5|     95|
+---+-------+



In [35]:
result6.show()

+---+------+----+-------+-------+-------+---------+-------+-------+------+-------+------+
| id|  name|math|english|chinese|physics|chemistry|history|biology|french|average|rating|
+---+------+----+-------+-------+-------+---------+-------+-------+------+-------+------+
|  0|  Alex|  95|     90|     79|     86|       67|     73|     89|    75|  81.75|     B|
|  5| Flynn|  98|     97|     99|     96|       95|     99|     95|    99|  97.25|     A|
|  1|   Bob|  98|     80|     89|     95|       71|     80|     87|     0|   75.0|     C|
|  3|   Dan|  54|     68|     57|     96|       68|     57|     88|     0|   61.0|     C|
|  2|Cherry|  73|     85|     86|     88|       85|     91|      0|    86|  74.25|     C|
|  4| Ethan|  68|     65|     86|     68|       95|     78|      0|     0|   57.5|     D|
+---+------+----+-------+-------+-------+---------+-------+-------+------+-------+------+



In [36]:
result6.write.csv('./data/result6/', header=True, mode='overwrite')

In [37]:
result6.coalesce(1).write.csv('./data/result6_coalesced/', header=True, mode='overwrite')

In [38]:
!cat data/result6_coalesced/*.csv

id,name,math,english,chinese,physics,chemistry,history,biology,french,average,rating
0,Alex,95,90,79,86,67,73,89,75,81.75,B
5,Flynn,98,97,99,96,95,99,95,99,97.25,A
1,Bob,98,80,89,95,71,80,87,0,75.0,C
3,Dan,54,68,57,96,68,57,88,0,61.0,C
2,Cherry,73,85,86,88,85,91,0,86,74.25,C
4,Ethan,68,65,86,68,95,78,0,0,57.5,D


In [39]:
df = spark.read.csv('./data/result6_coalesced/', header=True, inferSchema=True)

In [40]:
df.show()

+---+------+----+-------+-------+-------+---------+-------+-------+------+-------+------+
| id|  name|math|english|chinese|physics|chemistry|history|biology|french|average|rating|
+---+------+----+-------+-------+-------+---------+-------+-------+------+-------+------+
|  0|  Alex|  95|     90|     79|     86|       67|     73|     89|    75|  81.75|     B|
|  5| Flynn|  98|     97|     99|     96|       95|     99|     95|    99|  97.25|     A|
|  1|   Bob|  98|     80|     89|     95|       71|     80|     87|     0|   75.0|     C|
|  3|   Dan|  54|     68|     57|     96|       68|     57|     88|     0|   61.0|     C|
|  2|Cherry|  73|     85|     86|     88|       85|     91|      0|    86|  74.25|     C|
|  4| Ethan|  68|     65|     86|     68|       95|     78|      0|     0|   57.5|     D|
+---+------+----+-------+-------+-------+---------+-------+-------+------+-------+------+



In [41]:
df.createOrReplaceTempView("result")

In [42]:
sqlDF = spark.sql("SELECT * FROM result")
sqlDF.show()

+---+------+----+-------+-------+-------+---------+-------+-------+------+-------+------+
| id|  name|math|english|chinese|physics|chemistry|history|biology|french|average|rating|
+---+------+----+-------+-------+-------+---------+-------+-------+------+-------+------+
|  0|  Alex|  95|     90|     79|     86|       67|     73|     89|    75|  81.75|     B|
|  5| Flynn|  98|     97|     99|     96|       95|     99|     95|    99|  97.25|     A|
|  1|   Bob|  98|     80|     89|     95|       71|     80|     87|     0|   75.0|     C|
|  3|   Dan|  54|     68|     57|     96|       68|     57|     88|     0|   61.0|     C|
|  2|Cherry|  73|     85|     86|     88|       85|     91|      0|    86|  74.25|     C|
|  4| Ethan|  68|     65|     86|     68|       95|     78|      0|     0|   57.5|     D|
+---+------+----+-------+-------+-------+---------+-------+-------+------+-------+------+



In [44]:
spark.sql("SELECT id, name, math FROM result").show()

+---+------+----+
| id|  name|math|
+---+------+----+
|  0|  Alex|  95|
|  5| Flynn|  98|
|  1|   Bob|  98|
|  3|   Dan|  54|
|  2|Cherry|  73|
|  4| Ethan|  68|
+---+------+----+



In [45]:
spark.sql("SELECT id, name, math, average FROM result where name = 'Alex'").show()

+---+----+----+-------+
| id|name|math|average|
+---+----+----+-------+
|  0|Alex|  95|  81.75|
+---+----+----+-------+

