# Spark DataFrames and Spark SQL 

#### <font color="red">The below cell is only for setting spark environment on google colabs (no need to run this if your are running spark locally or on a cluster) </font>

In [1]:
# for installing java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# for downloading hadoop for spark
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzipping hadoop for spark
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# setting environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

# installing spark on the current environment
!pip install -q findspark
import findspark
findspark.init()

# mount your google drive to be able to access files from your google drive ! 
from google.colab import drive
drive.mount('/content/drive')

^C


MessageError: ignored

First let us always import pyspark and create a <b>SparkSession</b> from pyspark.sql for working with DataFrame.

SparkSession for DataFrames plays the same role as SparkContext for RDDs.

In [None]:
import pyspark

spark = pyspark.sql.SparkSession.builder.appName("Spark-Dataframe-SQL").getOrCreate()

##  <font color="blue"> 1 - Why Spark Dataframes ? </font>


We saw in our previous chapter on shuffling with RDD that it is common to have multiple solutions to solve the same issue with RDD, but with different performances (e.g. reduceByKey vs groupByKey or filtering data before joins). It is for the programmer to pay attention at choose the right solution.

If we take for instance the last example with filtering and joins, we present two solutions with different performances:

<b> <i> 1) kv_freq_stations.join(kv_subscriptions)
                .filter(lambda tuple:tuple[1][0]=='Capitole' or tuple[1][0]=='Compans')
                .count()
</i></b>              
          

<img src='https://drive.google.com/uc?id=1MiZOuM64aAh_mbnSu0psMsY3PkNTPXlw' />

<b> <i>                
2) kv_freq_stations.filter(lambda id_station:id_station[1]=='Capitole' or id_station[1]=='Compans')
                .join(kv_subscriptions)
                .count()
</i> </b>  

 <img src='https://drive.google.com/uc?id=1VFUJS8OPHHO2UFgK3bK7XRVT1y-HG2R8' />

               
The second solution is better because we minimize data shuffling that will happen with the join transformation, by applying first the filter transformation.

<b>But it would be nice if Spark automatically knew, if we wrote code with the fisrt solution, that it could automatically rewrite our code as in solution 2 before execution ! </b>
    
<font color="green"> This is for example where Spark Dataframes is interesting, but at one condition :  </font> <font color="orange"> you should provide a bit of extra structural information so that Spark can do many optimizations for you!  </font>

<br/>

Spark Dataframes are designed to work with <b>structured data</b>. Structured data are data with a fixed schema (a set of columns with a fixed datatype for each column). Structured data are commonly used in relational databases (e.g. Oracle database, MySQL, PostgreSQL) or with formats such as excel files or csv files. With structured data, Spark have an extra information on the content of data, a Spark dataframe can be seen as a relational database table (with the difference that these dataframes are distributed over a cluster of machines). Thus with Dataframes, before executing transformations or queries (with a well known set of SQL operators) Spark can apply many optimizations plans that exists since decades from the relational database community, and can send data over the network in a smarter way by selecting and serializing only useful parts to send. The below figure show a big view of the difference between an RDD and a Dataframe with the customers frequent stations of our last example.

<img src='https://drive.google.com/uc?id=1JQ7nTw_CCy-ewkA_8zfkg4j2DOiaUiCr' />


Another very important difference between RDD and Dataframe is that with RDD, we can only use functional transformations with functions (map, filter, reduce, etc.) for analyzing our data. However, because Spark Dataframe are similar to relational databases tables, Spark provides a structured language <b>(Spark SQL)</b> very similar to SQL and that can be directly used to query and analyze distributed data in Spark Dataframes. This is very important because many data analysts are familiar with SQL, and they can easily reuse their SQL knowledge to analyze distributed data with Spark Dataframes and Spark SQL!
    
A very simple rule of using RDD vs Dataframe can be:

- RDD can be used mostly for processing unstructured data: data without any structure such as text analysis (e.g. comments from social medias, tweets, texts from log files, etc.)
- Dataframes can be used mostly for processing structured data: data with well known columns and datatypes usually extracted from csv files, excel files, or relational databases. It can also be used for processing semi-structured data: data with partial structure that can be extracted from file formats such as XML or JSON.

    
##  <font color="blue"> 2 - Spark SQL  stack </font>

SQL is a reference language for doing analytics. Spark SQL is the spark interface for distributed processing using SQL syntax on top of Dataframes and RDD. It has three main goals:

- Support <b> relational processing </b> bith within Spark programs (on RDDs) and on external data sources with friendly API: sometimes it's more desirable to express a computation in SQL syntax than with functional APIs and vice versa.
- High performance, achieved by using optimization techniques from research in databases.
- Easily support new data sources such as semi-structured data and external databases.

Spark SQL is a component of the Spark stack as we can see in the below figure.

<img src='https://drive.google.com/uc?id=1Fvx5HbKTdfqUDy1fNjhDfcZL43tk77Ea' />


Spark SQL uses DataFrame as support for applying SQL queries. DataFrames contains structured data with a schema and are built on top of Spark RDD. That means, at the end DataFrames are stored in the clusters as RDD ! But with the additional information on DataFrames content (the schema) Spark uses a dedicated optimizer (Catalyst) to optimize their operators before reaching (transparently) the RDD layer. As we will see next in this chapter, we can analyze Spark DataFrames in two ways:

- with SQL syntax (Spark SQL)
- with DataFrame API functions

Spark SQL and DataFrame API can be used directly from the Spark Console or User Programs (python in this course).

JDBC is just a connector for that can be used if one want one want to get data from external databases into Spark.

As Spark RDD, Spark DataFrames are <b> immutables </b>, <b>lazily evaluated</b>, and <b>distributed</b>

Before starting using Spark SQL, we will first see how to create Spark DataFrames.

##  <font color="blue"> 3 - Creating Spark DataFrames </font>

Spark DataFrames can be created from multiple data sources such as Spark RDDs, structured or semi-structured data files (e.g. csv, json, xml), SQL Databases or NoSQL Databases (see below figure). For each database, they usually provide a specific connector implemented via JDBC. In this section we will look at the first two options (from RDDs and data files).

<img src='https://drive.google.com/uc?id=13v2sYSf8E_gY3MsRYCirSC1v9umH7IVc' />


### <font color="green"> 3.1 Creating a DataFrame from an existing RDD </font>

We know that a DataFrame needs a structure with a schema. For creating a DataFrame from an RDD with can do it in two ways: with a schema reflectively inferred or with a schema explicitly specified.

#### <font color="red"> 3.1.1 with a reflectively inferred schema </font>

In this case we just need to call <b><i>toDF</i></b> method on the RDD as in the following examples:


In [None]:
# we create our previous purchase RDD example

list_purchases = [(100, 'Toulouse', 'Limoges', 22.25),(100, 'Toulouse','Paris', 31.60), (100, 'Paris', 'Orleans', 12.40), (200, 'Toulouse', 'Muret', 8.20), (300, 'Marseille','Paris', 42.10), (300, 'Arles', 'Nimes', 16.20)] 

# we can access a SparContext from SparkSession with spark.SparkContext
kv_rdd_purchases = spark.sparkContext.parallelize(list_purchases)

kv_rdd_purchases.take(2)

In [None]:
# to create the dataframe from the RDD we can specify a list of columns names to the toRDD method

purchases_df = kv_rdd_purchases.toDF(['customer_id','origin','destination','price'])

# for visualizing a sample of rows of our dataframe we can use the show method (by default prints 20 rows)
purchases_df.show()

In [None]:
# What happens if we don't specify the columns names to the toDF method ?

purchases_df2 = kv_rdd_purchases.toDF()

purchases_df2.show()

We can see that if we don't specify column names, spark automatically assign columns names in the form: _1, _2, _3, ... (not easy for readability). It is always a good idea to specify explicit column names.

In [None]:
# Let's print the inferred schema of our purchases_df DataFrame with printSchema() method 

purchases_df.printSchema()

We can see that by inferring the schema, spark infer a data type for each column (depending on the values on that column),
and also automatically set them to be nullable (they can contain no value).

Sometimes we will want to specify explicitly the schema...

#### <font color="red"> 3.1.2 with an explicit schema </font>

In this case, we need to <b>specify the schema and use the <i>createDataFrame</i> method of our SparkSession</b>.

In his simplest form a schema is a StructType with a list of StructField (a StructField represents a column), and each StrucField (column) need 3 information (the name of the column, the data type, and a boolean flag indicating if the column can be nullable or not).

For instance, for our previous purchase RDD, we may want a schema like this:

<i>purchaseSchema = StructType ( [StructField("customer_id", IntegerType(), False), <br/>
                               StructField("origin", StringType(), False),<br/>
                               StructField("destination", StringType(), False),<br/>
                               StructField("price", DoubleType(), False)] ) <br/> </i>

In this explicit schema (compared to the implicit one) we want our customer_id to be of type Integer (it was implicitly long), and all our columns to be not nullable for instance (They was implicitly nullable).

Below, we create this schema, and use it explicitly to create our purchase dataframe from the RDD by using the createDataFrame
method of our SparkSession:

In [None]:
# we first need to import types (e.g. StructType, StructField, IntegerType, etc.)
from pyspark.sql.types import *

# schema creation

purchaseSchema =StructType ( [StructField("customer_id", IntegerType(), False),
                              StructField("origin", StringType(), False),
                              StructField("destination", StringType(), False),
                              StructField("price", DoubleType(), False)] )

# DataFrame creation
purchase_df=spark.createDataFrame(kv_rdd_purchases, purchaseSchema)

# let us show some sample rows
purchase_df.show()

Let's verify that we have now our explicit schema

In [None]:
purchase_df.printSchema()

### <font color="green"> 3.2 Creating a DataFrame from a data file </font>

In most cases DataFrames are created from structured or semi-structured data files (e.g. csv, json, xml) stored for instance on HDFS, cloud storages (e.g. Amazon S3, MS Azure Storage),  or local file systems (e.g. windows, unix).

The methods for reading each data file format are quite similar (see documentation: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html). 

For instance for read csv files, we can call the <b>spark.read.csv</b> method (spark is the SparkSession) as in the below example:

In [None]:
# our previous purchases sample data are stored here in a csv files
# if the first row of file contains columns header, we should specify the parameter header=True
# many other parameters could be specified (e.g. separator, schema, null values, ...) see the documentation

purchase_df = spark.read.csv('drive/MyDrive/PYSPARK-COURSE/purchases_sample.csv', header=True)

purchase_df.show()

In [None]:
purchase_df.printSchema()

We read the csv file without a specific schema, you can notice that for instance the type of customerId is inferred by default with String type!

If we want to read a file with a specific schema, we can set the schema parameter:

In [None]:
purchase_df = spark.read.csv('drive/MyDrive/PYSPARK-COURSE/purchases_sample.csv', schema=purchaseSchema, header=True)

purchase_df.show()

In [None]:
purchase_df.printSchema()

now, the customerId is renamed customer_id and is of type String as specified in the previous <i>purchaseSchema</i>

##  <font color="blue"> 4 - Spark SQL queries on DataFrames </font>

now how to query or analyze DataFrames with Spark SQL ? The answer is quite simple:
    
- first we need to register the DataFrame as a temporary view (a little similar to a table or view of a database)
- second we can apply many common SQL queries on this view

###  <font color="green"> 4.1 - Creating the temporary view from the DataFrame </font>

We just need to call the method <i>createOrReplaceTempView</i> on our DataFrame and specify the name of our view
(that will be used later for queries)

In [None]:
# we can also use the method createTempView if we don't want to replace an already existing view with the same name

purchase_df.createOrReplaceTempView('purchase')

### <font color="green"> 4.2 - Spark SQL queries </font>

The SQL queries available in Spark SQL includes many standard SQL statements such as:
    
- SELECT
- FROM
- WHERE
- GROUP BY
- ORDER BY
- HAVING
- COUNT
- DISTINCT
- JOIN
- (LEFT|RIGHT|FULL) JOINS
- Subqueries ...
- ...

It is also, largely what is available in Hive (HiveQL): <i>Spark SQL is very similar to Hive</i>
    
The full supported syntax of SPARK SQL is provided here:https://docs.datastax.com/en/dse/5.1/dse-dev/datastax_enterprise/spark/sparkSqlSupportedSyntax.html

For executing our query we can just the <b>spark.sql</b> method (spark is the SparkSession) by specifying our query as parameter.

<i>It is also important to remember that Spark SQL is not a relational database engine ! (see again the Spark SQL stack figure). When we execute a query on a DataFrame in Spark SQL, this query transparently optimized (with Catalyst) before transparently transformed under the cover into rdd api transformations and actions (that operate on distributed data in the cluster) !</i>

Below are some query examples in Spark SQL:

In [None]:
# e.g. here we want to get (select) all the rows and columns of our dataframe
# remember: we register our purchase_df into the purchase view in the previous section

purchase_all_df = spark.sql(" SELECT * FROM purchase ")

purchase_all_df.show()

In [None]:
# e.g. here we want to get (select) all the rows but only the customer_id and price columns of our dataframe

purchase_id_price_df = spark.sql(" SELECT customer_id,price FROM purchase ")

purchase_id_price_df.show()

In [None]:
# e.g. here we want to get (select) only the customer_id and price columns and only rows with prices greater than 20 euros

purchase_id_price_gt20_df = spark.sql(" SELECT customer_id,price FROM purchase WHERE price > 20 ")

purchase_id_price_gt20_df.show()

In [None]:
# e.g. as previous, but with prices in ascending order

purchase_id_price_gt20_asc_df = spark.sql(" SELECT customer_id,price FROM purchase WHERE price > 20 ORDER BY price asc ")

purchase_id_price_gt20_asc_df.show()

In [None]:
# e.g. as previous, but with prices in descending order

purchase_id_price_gt20_desc_df = spark.sql(" SELECT customer_id,price FROM purchase WHERE price > 20 ORDER BY price desc ")

purchase_id_price_gt20_desc_df.show()

In [None]:
# e.g. here we want to get (select) all columns columns and only rows where origin is Paris or Marseille 

purchase_paris_marseille_df = spark.sql(" SELECT * FROM purchase WHERE origin='Paris' or origin='Marseille' ")

purchase_paris_marseille_df.show()

In [None]:
# e.g. here we want to get (select) distinct origin stations

purchase_distinct_origin_df = spark.sql(" SELECT DISTINCT origin FROM purchase ")

purchase_distinct_origin_df.show()

In [None]:
# e.g. here we want to count the total number of rows in our dataframe

purchase_count_df = spark.sql(" SELECT COUNT(*) FROM purchase ")

purchase_count_df.show()

In [None]:
# e.g. here we want to count the total number of rows from Toulouse origin in our dataframe

purchase_origin_toulouse_count_df = spark.sql(" SELECT COUNT(*) FROM purchase WHERE origin='Toulouse' ")

purchase_origin_toulouse_count_df.show()

In [None]:
# e.g. here we want to get (select) the average price per customer 
# remember that we use groupByKey or reduceByKey for this same question when using RDD in the previous chapter
# as we have structured data in this case, the following solution with DataFrame is will even be more optimized !

purchase_avg_df = spark.sql(" SELECT customer_id,avg(price) FROM purchase GROUP BY customer_id ")

purchase_avg_df.show()

In [None]:
# e.g. as previously but we want to rename the avg (price) column to mean_price (we use the "as"...)

purchase_avg_df = spark.sql(" SELECT customer_id, avg(price) as mean_price  FROM purchase GROUP BY customer_id ")

purchase_avg_df.show()

In [None]:
# e.g. here we want to get (select) total prices per customer for purchases from Toulouse origin

purchase_origin_toulouse_total_df = spark.sql(" SELECT customer_id, sum(price) as total_price \
                                                FROM purchase \
                                                WHERE origin='Toulouse'\
                                                GROUP BY customer_id ")

purchase_origin_toulouse_total_df.show()

In [None]:
# e.g. as previous we still want to get (select) total prices per customer for purchases from Toulouse origin, but also with 
# only a total prices greater than 50 euros


purchase_origin_toulouse_total_gt_50_df = spark.sql(" SELECT customer_id, sum(price) as total_price \
                                                      FROM purchase \
                                                      WHERE origin='Toulouse'\
                                                      GROUP BY customer_id \
                                                      HAVING total_price > 50")

purchase_origin_toulouse_total_gt_50_df.show()

In [None]:
# e.g. we want the number of journeys from each origin station for each customer, order by origin station (ascending) and number of journeys
# (descending)

purchase_origin_customer_nb_df = spark.sql(" SELECT origin, customer_id, count(*) as nb_journeys \
                                             FROM purchase \
                                             GROUP BY origin, customer_id \
                                             ORDER BY origin asc, nb_journeys desc")

purchase_origin_customer_nb_df.show()

For join examples, we use here sample data from our previous frequent stations and subscription example:

In [None]:
# we read the frequent stations csv file 

frequent_stations_df = spark.read.csv('drive/MyDrive/PYSPARK-COURSE/frequent_stations.csv', header=True)

frequent_stations_df.show()

In [None]:
# we read the subscriptions csv file

subscriptions_df = spark.read.csv('drive/MyDrive/PYSPARK-COURSE/subscriptions.csv', header=True)

subscriptions_df.show()

Before applying queries, we register the two dataframes as temporary view

In [None]:
frequent_stations_df.createOrReplaceTempView('stations')

subscriptions_df.createOrReplaceTempView('subscriptions')

In [None]:
# e.g. we want all informations on customer frequent stations with their corresponding subscription information

subcriptions_frequent_stations_join_df = spark.sql(" SELECT * \
                                                    FROM stations \
                                                    JOIN subscriptions \
                                                    ON stations.customer_id = subscriptions.customer_id ")

subcriptions_frequent_stations_join_df.show()

As you see, we perform  a SQL inner join between frequent stations and customer subscriptions. Note that unlike key/value RDD, there is no key/value notion in DataFrame, we have to specify which columns (or conditions) we should join on. Here the join condition is to retain only rows with the same customer_id in both views. Thus, the customers with ids 104 and 999 are not present in the result.

In [None]:
# e.g. we want all informations on customers subcriptions with or without their corresponding  frequent stations information

subscriptions_left_join_frequent_stations_df = spark.sql(" SELECT * \
                                                           FROM subscriptions \
                                                           LEFT JOIN stations \
                                                           ON stations.customer_id = subscriptions.customer_id ")

subscriptions_left_join_frequent_stations_df.show()

As you see, by using the left join in this case, customer with id 104 also appear in the result with no information (null) 
for his frequent stations.

In [None]:
# e.g. we want all informations on customers frequent stations with or without their corresponding subcriptions information

subscriptions_right_join_frequent_stations_df = spark.sql(" SELECT * \
                                                            FROM subscriptions \
                                                            RIGHT JOIN stations \
                                                            ON stations.customer_id = subscriptions.customer_id ")

subscriptions_right_join_frequent_stations_df.show()

As you see, by using the right join in this case, customer with id 999 also appear in the result with no information (null) for his subscription

In [None]:
# count the total  number of journeys of registered customers that frequently travel to Capitole or Compans stations ?
# remember this was our example at the beginning of the chapter
# By using RDDs, the performance of writing filter before or after the join will have a great impact on performance
# As as have structured data, the following solution with DataFrame and Spark SQL will be automatically optimized by Spark ! (by Catalyst component)

nb_journeys_frequent_capitole_compans_df = spark.sql(" SELECT count(*) as nb_journeys \
                                                       FROM subscriptions \
                                                       JOIN stations \
                                                       ON stations.customer_id = subscriptions.customer_id \
                                                       WHERE stations.frequent_station=='Capitole' or stations.frequent_station=='Compans' ")

nb_journeys_frequent_capitole_compans_df.show()

<i>We use a sample small dataset to show some examples of how to use Spark SQL. But these processing steps will be the same for analyzing Terabytes or Petabytes of structured data in a cluster with hundreds or thousand nodes for instance ! </i>