<p align="center">
   <img src="spark-sql.jpg">
</p>

# Spark Hands-on: Spark SQL

In this notebook, we will focus on the SQL API of Spark. This API introduces the concept of Spark DataFrames (similar to pandas DataFrames) which are easier to manipulate. 
This is also the recommended API by Spark since it benefits from some internal optimization by the Spark engine. Spark SQL allows to manipulate structured data which RDD do not (but RDD is useful for unstructured data !).

*Nota*: DataFrame is a structured object but is not typed. In scala (the language that develops Spark) you will find the Dataset datatype which is typed. In scala a DataFrame is simply a Dataset of "Row" type (e.g. val dataset : Dataset[Row] = spark.read ...).


# 1. Import useful libs and load a Spark Conf

In [1]:
import pyspark
from pyspark.sql import SparkSession


# Pandas
import pandas as pd

# Plot
%matplotlib inline
import matplotlib.pyplot as plt


# Foundry
import foundrywrapper
from foundrywrapper import FoundryWrapper

In [2]:
#Required for foundry, not useful for spark purposes
fw = FoundryWrapper()
fw

In [3]:
spark = (SparkSession.builder 
        .master('spark://spark-master:7077') #master node URL
        .appName('~ Spark hands-on: SQL~') #my App name
        .config('spark.driver.memory', '1g') # memory allocated to the master
        .config('spark.driver.cores', '2') # CPU's allocated to the master
        .config('spark.executor.instances', '2') # how many executors
        .config('spark.executor.memory', '4g') # memory per executor (where the data is stored)
        .config('spark.executor.cores', '2') #CPU's per executor
         .config('spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT',1)\
        .config('spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT',1) \
        .config(conf=fw.spark.get_spark_app_config())
        .getOrCreate())
# Configuring spark to use Foundry
spark = fw.configure_spark(spark) # foundry specific

In [4]:
spark

# 2. Load a dataset

We will use the same dataset as in the RDD notebook, that is the Flight radar 24 dataset.
Dataframe can be inferred from parquet, hive, csv, a rdd or even from a pandas df !

In [5]:
# Showing dataset stats
# Foundry wrapper
file_rid = fw.compass.get_rid('/Shared/Airbus - FlightRadar24 (2014-2018)/data/flight_radar_24_week')
# fw.catalog.get_dataset_view_stats(file_rid)
df = spark.read.parquet(fw.spark.foundryfs_uri(file_rid))



In [6]:
%%time
df.count()

CPU times: user 1.32 ms, sys: 2.79 ms, total: 4.11 ms
Wall time: 3.36 s


658975

As you can see, this data is structured and follow a given schema that we can access

In [7]:
df.printSchema() #helps visualizing what's inside a dataframe

root
 |-- flight_id: long (nullable = true)
 |-- aircraft_id: integer (nullable = true)
 |-- aircraft_registration: string (nullable = true)
 |-- equipment: string (nullable = true)
 |-- departure_airport_code: string (nullable = true)
 |-- scheduled_arrival_airport_code: string (nullable = true)
 |-- arrival_airport_code: string (nullable = true)
 |-- flight_number: string (nullable = true)
 |-- callsign: string (nullable = true)
 |-- msn: integer (nullable = true)
 |-- elapsed_time_seconds: long (nullable = true)
 |-- departure_date_time: timestamp (nullable = true)
 |-- arrival_date_time: timestamp (nullable = true)
 |-- airport_separation: double (nullable = true)
 |-- track_distance: double (nullable = true)
 |-- adsb_start_flight_phase: string (nullable = true)
 |-- adsb_end_flight_phase: string (nullable = true)
 |-- out_time: timestamp (nullable = true)
 |-- off_time: timestamp (nullable = true)
 |-- on_time: timestamp (nullable = true)
 |-- in_time: timestamp (nullable = true)

*Nota*: One could create a dataframe from a rdd by specifying a schema

  See **createDataFrame** function and **StructField** (to create schema). https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#creating-dataframes
  Here is an example to create a DataFrame out of a RDD by specifying the schema:

In [21]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
sc = spark.sparkContext
columns = ["language","users_count"]
data = [("R", 20000), ("Python", 100000), ("Scala",3000)]
rdd = spark.sparkContext.parallelize(data)

schema_field = StructType([StructField(columns[0], StringType(), True), StructField(columns[1], IntegerType(), True)])

rdd_to_df = spark.createDataFrame(rdd, schema_field)
rdd_to_df.show()

+--------+-----------+
|language|users_count|
+--------+-----------+
|       R|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



In [22]:
rdd_to_df.printSchema()

root
 |-- language: string (nullable = true)
 |-- users_count: integer (nullable = true)



# 2. Basic cleaning of rows

This API comes with a lot of built-in functions that allows to easily clean an input dataframe.
For instance you could drop the duplicated rows or the rows containing NA's ...

In [13]:
df = df.drop_duplicates()

In [15]:
df.count()

3764518

# 3. Select, filter, groupBy ...

Basically, we can compute nearly all the functions that applies for a RDD. The structure of a DataFrame allows to do this in a fancy way. Recall that under the hood it will be internally transform to a RDD and still work with the transformations/actions and lazy computing ! ;)

Let's do this data engineering steps:
- Select the following variables: MSN, elapsed_time, departure_airport_code and arrival_airport code.
- Create a new binary variable if the flight duration is greater than a defined value
- Filter only on long flights
- Group the flights by their MSN
- Compute their mean flight duration and the number of flights
- Sort the MSN by their mean

To do this we will use the spark sql functions. There are plenty of functions available. For this execerice we will need **select**, **withColumn** (creates a new variable conditionally to the value on another one), **when**, **otherwise** (used jointly with **withColumn**), **filter**, **groubBy**, **count** and **sort**.

See [pyspark sql documentation](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)  for a comprehensive list of module useful for manipulating DataFrames.

In [8]:
from pyspark.sql import functions as F

In [9]:
%%time
df_selected = df.select(df["msn"], df["elapsed_time_seconds"], df["departure_airport_code"], df["arrival_airport_code"]) #transformation
df_with_binary_var = df_selected.withColumn("is_long_flight", F.when(df_selected["elapsed_time_seconds"] > 5000,1).otherwise(0)) #transformation
df_filtered = df_with_binary_var.filter(df_with_binary_var["is_long_flight"] == 1) #transformation
df_grouped = df_filtered.groupBy("msn").agg(F.mean("elapsed_time_seconds").alias("mean_flight_duration"), F.count("msn").alias("number_of_flights")) # I have a doubt with the aggregation here !
df_sorted = df_grouped.sort(df_grouped["mean_flight_duration"], ascending=False) #transformation
df_sorted.show() # the action that will trigger the computations!

+----+--------------------+-----------------+
| msn|mean_flight_duration|number_of_flights|
+----+--------------------+-----------------+
| 133|   53875.71428571428|                7|
| 136|  52846.142857142855|                7|
| 142|             52765.5|                8|
| 147|             52740.6|                5|
|  39|             48518.0|                6|
|  35|            47651.75|                8|
|  75|  46011.666666666664|                9|
| 474|             45938.3|               10|
|1468|   45644.77777777778|                9|
| 395|             45313.0|                2|
|  74|           45143.875|                8|
| 718|             44583.5|                8|
| 130|             44367.3|               10|
|  22|           44268.625|                8|
| 221|   44189.88888888889|                9|
|  38|            44188.75|                8|
| 151|           43990.125|                8|
|  96|   43805.63636363636|               11|
| 540|             43342.0|       

*Nota*: To see what happens under the hood, you can print the logical execution plan of Spark. Sometimes the optimization engine will change the order of your computation to minimize computation time ! This also allow to check for useless shuffling or error before calling an action that could lead to an unwanted result.

In [10]:
df_sorted.explain() #this is done like this

== Physical Plan ==
*(3) Sort [mean_flight_duration#256 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(mean_flight_duration#256 DESC NULLS LAST, 200), true
   +- *(2) HashAggregate(keys=[msn#145], functions=[avg(elapsed_time_seconds#146L), count(msn#145)])
      +- Exchange hashpartitioning(msn#145, 200), true
         +- *(1) HashAggregate(keys=[msn#145], functions=[partial_avg(elapsed_time_seconds#146L), partial_count(msn#145)])
            +- *(1) Filter (CASE WHEN (elapsed_time_seconds#146L > 5000) THEN 1 ELSE 0 END = 1)
               +- *(1) FileScan parquet [msn#145,elapsed_time_seconds#146L] Batched: true, DataFilters: [(CASE WHEN (elapsed_time_seconds#146L > 5000) THEN 1 ELSE 0 END = 1)], Format: Parquet, Location: InMemoryFileIndex[foundry://sectokenjhub/datasets/ri.foundry.main.dataset.668c21bc-0a7a-4ad7-9cb2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<msn:int,elapsed_time_seconds:bigint>




We could have built it in a single pipeline for better readibility:

*Nota*: We can access a column using df["colname"] or by F.col("colname")!

In [11]:
%%time
# df["is_long_flight"] raises an error at the filtering step. that is weird and probably due to the fact that it does not know that we created a new column before during the pipelining
df_sorted = df.select(df["msn"], df["elapsed_time_seconds"], df["departure_airport_code"], df["arrival_airport_code"]) \
  .withColumn("is_long_flight", F.when(df["elapsed_time_seconds"] > 5000,1).otherwise(0)) \
  .filter((F.col("is_long_flight") == 1)) \
  .groupBy("msn").agg(F.mean("elapsed_time_seconds").alias("mean_flight_duration"), F.count("msn").alias("number_of_flights")) \
  .sort(F.col("mean_flight_duration"), ascending=False)

df_sorted.show()

+----+--------------------+-----------------+
| msn|mean_flight_duration|number_of_flights|
+----+--------------------+-----------------+
| 133|   53875.71428571428|                7|
| 136|  52846.142857142855|                7|
| 142|             52765.5|                8|
| 147|             52740.6|                5|
|  39|             48518.0|                6|
|  35|            47651.75|                8|
|  75|  46011.666666666664|                9|
| 474|             45938.3|               10|
|1468|   45644.77777777778|                9|
| 395|             45313.0|                2|
|  74|           45143.875|                8|
| 718|             44583.5|                8|
| 130|             44367.3|               10|
|  22|           44268.625|                8|
| 221|   44189.88888888889|                9|
|  38|            44188.75|                8|
| 151|           43990.125|                8|
|  96|   43805.63636363636|               11|
| 540|             43342.0|       

### 4. In a SQL way

Most of the data wrangling can be done using a friendly SQL typing which allow to re-use knowledge from SQL typing coupled with the power of Spark. 
To do this, you have to register the dataframe used into a SQL table using **createOrReplaceTempView** function. The outpout of a SQL query is also a Spark DataFrame.

In [12]:
df.createOrReplaceTempView("flight_radar") #cf createOrReplaceGlobalView

In [13]:
df_sorted_sql= spark.sql(""" SELECT msn, COUNT(msn) AS number_of_flights, AVG(elapsed_time_seconds) AS mean_flight_duration
FROM flight_radar
WHERE  elapsed_time_seconds > 5000
GROUP BY msn
ORDER BY mean_flight_duration DESC
LIMIT 20""")

In [14]:
%%time
df_sorted_sql.show()

+----+-----------------+--------------------+
| msn|number_of_flights|mean_flight_duration|
+----+-----------------+--------------------+
| 133|                7|   53875.71428571428|
| 136|                7|  52846.142857142855|
| 142|                8|             52765.5|
| 147|                5|             52740.6|
|  39|                6|             48518.0|
|  35|                8|            47651.75|
|  75|                9|  46011.666666666664|
| 474|               10|             45938.3|
|1468|                9|   45644.77777777778|
| 395|                2|             45313.0|
|  74|                8|           45143.875|
| 718|                8|             44583.5|
| 130|               10|             44367.3|
|  22|                8|           44268.625|
| 221|                9|   44189.88888888889|
|  38|                8|            44188.75|
| 151|                8|           43990.125|
|  96|               11|   43805.63636363636|
| 540|                4|          

### 5. Windowing

Let's say that out of the preceding DataFrame you want to compute the absolute difference of the elapsed_time_flight with the mean computed by the groupBy. 
Since it is grouped, you could not concatenate the column containing the mean with the initial DataFrame. To do that with usual spark functions, you would have to do a **join**.

**Window** allows you to do that in a more efficient way. Note that they are alos very useful for computing difference with an aggregating measure or for instance doing rolling statistics (rolling mean, ...).


In [15]:
from pyspark.sql.window import Window

#We defined the window and partition the data by msn. Otherwise all the data would be retrieved on a single machine !
window = Window \
.partitionBy(df["msn"]) 

windowed_mean_df = df.select(df["msn"], df["elapsed_time_seconds"]) \
.withColumn("mean_elapsed_time", F.avg(df["elapsed_time_seconds"]).over(window))

windowed_mean_df.show(5)

+---+--------------------+------------------+
|msn|elapsed_time_seconds| mean_elapsed_time|
+---+--------------------+------------------+
|148|                 404|18720.560975609755|
|148|               37456|18720.560975609755|
|148|               43212|18720.560975609755|
|148|               16864|18720.560975609755|
|148|               36606|18720.560975609755|
+---+--------------------+------------------+
only showing top 5 rows



This replicates the mean value over all rows. Now it is easy to compute the difference !

In [16]:
%%time
diff_mean_df = windowed_mean_df.withColumn("abs_diff_with_mean_elapsed_time", F.abs(windowed_mean_df["elapsed_time_seconds"] - windowed_mean_df["mean_elapsed_time"])) 
diff_mean_df.show()

+---+--------------------+------------------+-------------------------------+
|msn|elapsed_time_seconds| mean_elapsed_time|abs_diff_with_mean_elapsed_time|
+---+--------------------+------------------+-------------------------------+
|148|                 404|18720.560975609755|             18316.560975609755|
|148|               37456|18720.560975609755|             18735.439024390245|
|148|               43212|18720.560975609755|             24491.439024390245|
|148|               16864|18720.560975609755|             1856.5609756097547|
|148|               36606|18720.560975609755|             17885.439024390245|
|148|               21671|18720.560975609755|             2950.4390243902453|
|148|                 615|18720.560975609755|             18105.560975609755|
|148|                   0|18720.560975609755|             18720.560975609755|
|148|               47011|18720.560975609755|             28290.439024390245|
|148|               36602|18720.560975609755|             17881.

Window are very useful to compute rolling statistics or difference. Let's compute the difference between every elapsed_time flight for each msn. To do that we need to group the data per msn, order them by the departure date and then compute the difference between one flight and the next one.
We will use the **orderBy** function from window and **lag** SQL function which shifts a column by a fixed step.

In [19]:
lagged_df = df.select(df["msn"], df["elapsed_time_seconds"], df["departure_date_time"]) \
.withColumn("lagged_elapsed_time", F.lag(df["elapsed_time_seconds"]).over(Window.partitionBy("msn").orderBy("departure_date_time")))

lagged_df.show(20)

+---+--------------------+-------------------+-------------------+
|msn|elapsed_time_seconds|departure_date_time|lagged_elapsed_time|
+---+--------------------+-------------------+-------------------+
|148|                 404|2015-01-01 04:39:52|               null|
|148|                5687|2015-01-01 06:01:15|                404|
|148|               37456|2015-01-01 06:26:31|               5687|
|148|                5835|2015-01-01 10:16:05|              37456|
|148|               30392|2015-01-01 14:54:06|               5835|
|148|               43212|2015-01-01 17:31:48|              30392|
|148|               24875|2015-01-02 02:38:05|              43212|
|148|               16864|2015-01-02 05:32:50|              24875|
|148|               36606|2015-01-02 10:15:44|              16864|
|148|               13341|2015-01-02 11:24:12|              36606|
|148|               13112|2015-01-02 23:36:02|              13341|
|148|               21671|2015-01-02 23:54:27|              13

*Nota*: We get a null value since we cannot shift the first row !

In [20]:
diff_df = lagged_df.withColumn("diff_elapsed_time", lagged_df["elapsed_time_seconds"] - lagged_df["lagged_elapsed_time"])
diff_df.show(10)

+---+--------------------+-------------------+-------------------+-----------------+
|msn|elapsed_time_seconds|departure_date_time|lagged_elapsed_time|diff_elapsed_time|
+---+--------------------+-------------------+-------------------+-----------------+
|148|                 404|2015-01-01 04:39:52|               null|             null|
|148|                5687|2015-01-01 06:01:15|                404|             5283|
|148|               37456|2015-01-01 06:26:31|               5687|            31769|
|148|                5835|2015-01-01 10:16:05|              37456|           -31621|
|148|               30392|2015-01-01 14:54:06|               5835|            24557|
|148|               43212|2015-01-01 17:31:48|              30392|            12820|
|148|               24875|2015-01-02 02:38:05|              43212|           -18337|
|148|               16864|2015-01-02 05:32:50|              24875|            -8011|
|148|               36606|2015-01-02 10:15:44|              16864

### 6. User-Defined functions (UDF) and pandas UDF

Sometimes you do want to create a new column by applying a custom function that you design and is not covered with the native spark functions. It looks like we would like to do a **map** operation like we did on RDD. However this is not a SQL function and you have to go back to RDD to apply a map function.
User-defined functions are here to help us avoiding this matter. They are simple python function that are wrapped into a **udf** (or **pandas_udf**) and will be assigned to a whole column.

First you have to define a python function that applies to one element. Then you register it to an UDF because UDF applies on column data type. It is then easy to add it using a **withColumn** statement.

*Nota*: **pandas_udf** treats the column as a Pandas Series. It is recommanded to use it for a good interaction with **pandas** python library and Apache Arrow for optimizations.

*Nota2*: Always look at the documentation to see if the function you want might already exist. UDF are conveniant but are not efficient due to serialization issues. pandas_udf using apache arrow does better but still use a lot of communication between JVM and python process.


In [21]:
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
from pyspark.sql.types import LongType

As a toy example, let's say that we would like to tag the short and long flight according to the *elapsed_time_duration* variable. We will define an User-Defined function to do this.

*Question*: How would you do this with built-in spark function?



In [22]:
# Define the function as you would do in classic python
def is_long_flight(f):
    return "short" if f < 5000 else "long"

In [23]:
#Intermediary step: Register the function as an UDF

is_long_flight_UDF = udf(is_long_flight)

In [26]:
# Add a new column with the outputs of is_long_flight

df.withColumn("is_long_flight", is_long_flight_UDF(df["elapsed_time_seconds"])).select("msn", "elapsed_time_seconds","is_long_flight") \
  .where(df["msn"].isNotNull()).show(50)

+----+--------------------+--------------+
| msn|elapsed_time_seconds|is_long_flight|
+----+--------------------+--------------+
|7561|               15433|          long|
| 103|               20102|          long|
|  46|               26893|          long|
|  40|               23682|          long|
| 139|               24688|          long|
|  90|               25582|          long|
| 164|               23870|          long|
|  23|               23804|          long|
|  89|                 438|         short|
| 138|               54886|          long|
|  31|               31949|          long|
|  55|               32175|          long|
|  65|               25322|          long|
|  34|               20434|          long|
| 119|                  80|         short|
|  96|               45562|          long|
|  12|               28335|          long|
|  79|               49171|          long|
|  11|                  10|         short|
|  82|               16760|          long|
|  68|     

Example with **pandas_udf**. Let's use a pandas_udf to compute the cumulative sum of the elapsed_time. This allows to use the *pandas* object types (*Series*) and functions.

In [27]:
def cum_sum(x):
    return x.cumsum() #Series type

In [28]:
# register the function as a pandas_udf

cum_sum_pandas_UDF = pandas_udf(cum_sum, returnType=LongType()) #needs to provide the return type!

In [29]:
df.select(cum_sum_pandas_UDF(df["elapsed_time_seconds"])).show(10)

+-----------------------------+
|cum_sum(elapsed_time_seconds)|
+-----------------------------+
|                          151|
|                         2303|
|                         4078|
|                         6144|
|                         7971|
|                        15456|
|                        24409|
|                        25404|
|                        25624|
|                        26634|
+-----------------------------+
only showing top 10 rows



### 7. Some words about **caching**

When a DataFrame is used accros many queries or with an iterative algorithm, it requires to read it many times which can be painful in termns of computational time, network cost (shuffling) ...
Spark supports pulling datasets into a cluster-wide memory cache. It is good practice to cache what is used repeatedly or hard to compute. Caching is lazy, so you pay the cost to when providing the very first action. But then it might makes a huge gain of performance.

e.g.

````python
data = loading_data(...).cache()
one_use_case = data.groupBy(...).agg(...).show()
another_use_case = data.groupBy(...).agg(...).show()
````

To cache a dataset into memory, juste use the **cache** or **persist** function. **persist** gives you the possibility to store you data either on memory or on disk ...
Don't forget to free the memory of the cluster when the dataset is not needed anymore (using **unpersist**).

In [None]:
df.cache()

In [None]:
df.unpersist()

In [10]:
spark.stop()