# <u><p style="text-align: center;">Dataframes</p></u>

### Contents of this notebook
* What a Spark DataFrame is
* How to load data into DataFrames

### Background

As we have seen, RDDs are the building blocks of Spark. RDDs have several advantages but in some cases their use can be problematic. Such cases can occur because Spark does not optimize transformations when we perform them directly to RDDs. Another example is that working with RDDs in some programming languages (including Python) can lead to poor performance. Also, transformation chains with RDDs can be difficult to comprehend since they show how the result will be achieved but not what the result will be.

Spark **DataFrames** were conceived to overcome the aforementioned problems. Similar to RDDs, DataFrames are distributed collections of data. The difference is that DataFrames provide a high-level abstraction over RDDs that allows us to use a query language to manipulate data. This abstraction is a logical plan that represents data and a schema. The logical plan is converted to a physical plan for execution. This conversion brings us closer to **what** we want to do rather than **how** we have to do it, because we let Spark figure out the most efficient way to carry out the operations. DataFrames are generally faster than RDDs, and they perform the same no matter what programming language we use with Spark.

### Code examples

In the following examples we are going to use different transformations and action on dataframes. 

***Example 1:*** creates a DataFrame and performs ordering operations.   
***Example 2:*** loads data into a DataFrame from a file and performs aggregations.   
***Example 3:*** queries a DataFrame using SQL syntax.

Before proceeding to the examples, we are going to initialize Spark:

In [None]:
import os
from pyspark.sql import SparkSession

#'swan_spark_conf' is a configuration provided by a plugin for Jupyter. We further extend this configuration with proxy settings.
swan_spark_conf = swan_spark_conf.setAll([('spark.ui.proxyBase', os.environ['JUPYTERHUB_SERVICE_PREFIX'] + 'proxy/4040')])

#instantiate a SparkSession object with our configuration
spark = SparkSession\
            .builder\
            .config(conf=swan_spark_conf)\
            .appName('Spark DataFrames')\
            .getOrCreate()

#set Spark log level
spark.sparkContext.setLogLevel('ERROR')

#### Example 1: Creating a DataFrame

In our first example we are going to create a DataFrame 'manually' which will contain the data of our cows. The data consist of the name, breed and weight of each cows. From those data we would like to have an overwview of the weight and population of each breed.

So, first we create our DataFrame:

In [None]:
cowsDF = spark.createDataFrame([("Joel", "Angus", 450), 
                               ("Marcia", "Belted Galloway", 320),
                               ("Gregor", "Hereford", 390),
                               ("Anne", "Angus", 400),
                               ("Ravi", "Belted Galloway", 250),
                               ("Marcia", "Belted Galloway", 320)],
                              ("Name", "Breed", "Weight"))

and examine it using the function `show`:

In [None]:
cowsDF.show()

We notice that 'Marcia' has been entered twice in our records so we have to clean our data before we proceed. We can delete duplicate records with the `dropDuplicates` function:

In [None]:
cowsDF = cowsDF.dropDuplicates(["Name", "Breed", "Weight"])
cowsDF.show()

Next, we would like to inspect the weight of our cows from lighter to heavier. To do this we order our DataFrame using the `orderBy` function:

In [None]:
orderedDF = cowsDF.orderBy("Weight")
orderedDF.show()

Now that we have an overview of the weight, we would like to order the weights based on breed. This can be done by combining `groupBy` with `orderBy`:

In [None]:
groupedDF = cowsDF.orderBy(['Breed','Weight'])
groupedDF.show()

Finally, we would like to count how many cows of each breed we have:

In [None]:
countDF = cowsDF.groupBy("Breed").count()
countDF.show()

#### Example 2: *.csv* to DataFrame

DataFrames provide a convenient way to work with tabular data. In this example, we are going to read a file with Spark and convert into a DataFrame. The file contains the minimum and maximum daily temperatures for the years 2010-2015 in De Bilt, Netherlands. 

Then, we are going to find the minimum and maximum temperatures that occured during these years and also count how many days the temperature was below 0 $^\text{o}C$.

So, the first step is to load the data into a DataFrame:

In [None]:
dataDF = spark.read.csv("/home/jovyan/datasets/knmi-debilt.csv", header=True, inferSchema=True)

and then examine how the data look like:

In [None]:
dataDF.show()

Dates are formatted as YYYYMMDD, temperatures are in Celcius degrees.

Next, to find the minimum and maximum temperatures we are going to use **aggregations** over the DataFrame. We can perform aggregations by using the `agg` function. The parameters of `agg` are expressions that indicate the aggregation that we want to perform. To find the maximum temperature a possible solution is:

In [None]:
from pyspark.sql import functions as F

result = dataDF.agg(F.max("Tmax")) #notice that Tmax is the name of the column
result.show()

and similarly for the minimum:

In [None]:
result = dataDF.agg(F.min("Tmin")) #notice that Tmin is the name of the column
result.show()

Now, to find how many days the temperature was below 0 $^\text{o}C$, we are first going to keep only the days with the required temperature by using the `filter` function:

In [None]:
below_zeroDF = dataDF.filter(F.col("Tmin") < 0)

followed by the `count` function:

In [None]:
below_zeroDF.count()

#### Example 3: Performing SQL queries

When working with DataFrames we can also write SQL queries against the DataFrame. Using the previous dataset we are going to preserve only the rows of the year 2012. To do this we are first creating a temporary view of the data:

In [None]:
dataDF.createOrReplaceTempView("data_view")

and then query it using sql syntax:

In [None]:
only_2012DF = spark.sql("SELECT Date, Tmin, Tmax FROM data_view WHERE SUBSTRING(Date,1,4) == 2012")
only_2012DF.show()

<span style="display:none" id="question1">W3sicXVlc3Rpb24iOiAiU3BhcmsgRGF0YUZyYW1lIG9wZXJhdGlvbnMgYXJlIG9wdGltaXplZCBieSBTcGFyay4iLCAidHlwZSI6ICJtdWx0aXBsZV9jaG9pY2UiLCAiYW5zd2VycyI6IFt7ImNvZGUiOiAiVHJ1ZSIsICJjb3JyZWN0IjogdHJ1ZX0sIHsiY29kZSI6ICJGYWxzZSIsICJjb3JyZWN0IjogZmFsc2V9XX1d</span>

<span style="display:none" id="question2">W3sicXVlc3Rpb24iOiAiU3BhcmsgRGF0YUZyYW1lcyBhcmUgYnVpbHQgb24gdG9wIG9mIFJERHMuIiwgInR5cGUiOiAibXVsdGlwbGVfY2hvaWNlIiwgImFuc3dlcnMiOiBbeyJjb2RlIjogIlRydWUiLCAiY29ycmVjdCI6IHRydWV9LCB7ImNvZGUiOiAiRmFsc2UiLCAiY29ycmVjdCI6IGZhbHNlfV19XQ==</span>

<span style="display:none" id="question3">W3sicXVlc3Rpb24iOiAiQ2hvb3NlIHRoZSBjb3JyZWN0IGFuc3dlcnM6IiwgInR5cGUiOiAibXVsdGlwbGVfY2hvaWNlIiwgImFuc3dlcnMiOiBbeyJjb2RlIjogIlNwYXJrIERhdGFGcmFtZXMgYXJlICAgIG5vbi1kaXN0cmlidXRlZCBjb2xsZS0gICBjdGlvbnMgb2YgZGF0YS4iLCAiY29ycmVjdCI6IGZhbHNlLCAiZmVlZGJhY2siOiAiVGhleSBhcmUgZGlzdHJpYnV0ZWQuIn0sIHsiY29kZSI6ICJXZSBjYW4gdXNlIFNRTCBxdWVyaWVzICBkaXJlY3RseSB3aXRoIERhdGFGcmEtICAgbWVzLiIsICJjb3JyZWN0IjogdHJ1ZX0sIHsiY29kZSI6ICJUaGUgcGVyZm9ybWFuY2Ugd2UgZ2V0ICB3aGVuIHVzaW5nIERhdGFGcmFtZXMgaXMgcHJvZ3JhbW1pbmcgbGFuZ3VhZ2UgICAgZGVwZW5kZW50LiIsICJjb3JyZWN0IjogZmFsc2UsICJmZWVkYmFjayI6ICJEYXRhRnJhbWVzIGhhdmUgdGhlIHNhbWUgcGVyZm9ybWFuY2UgcmVnYXJkbGVzcyBvZiB0aGUgbGFuZ3VhZ2UgdXNlZC4ifSwgeyJjb2RlIjogIldoZW4gd29ya2luZyB3aXRoICAgICAgIERhdGFGcmFtZXMgd2UgaGF2ZSB0byAgICAgICAgIGNhcmVmdWxseSB0aGluayB0aGUgICAgIG9yZGVyIG9mIHRoZSBvcGVyYXRpb25zICAgdGhhdCAgd2Ugd2FudCB0byBhcHBseS4iLCAiY29ycmVjdCI6IHRydWV9XX1d</span>

### Practice questions

#### Q1:

In [None]:
from jupyterquiz import display_quiz

display_quiz("#question1")

#### Q2:

In [None]:
display_quiz("#question2")

#### Q3:

In [None]:
display_quiz("#question3")

### More advanced examples:

In this section we are going to see an example on how to improve execution speed when working with DataFrames. This example is not essential to understand the rest of the course. Although, we would recommend you try it as it will help you deepen your knowledge on this topic.

#### Example A1: Caching for speed

Similarly to RDDs, DataFrame operations are divided into transformations and actions. Results will not be computed until we call an action. When we call an action all the previous transformations up to this action are performed and we get a result. However, during this procedure no intermmediate state of the DataFrame is stored. So, if we need one of these states, Spark has to start the computations from the beggining.

Such an example is depicted below. Each circle represents a transformation that a DataFrame has to go through:

<img src="images/transformation_chain.png" alt="drawing" width="170"/>

After transformation B, the transformation chain splits into two branches. We can calculate the chain A->B->C but to calculate A->B->D Spark has to start from A again, as the state B is not stored. For computationally expensive transformations we might not have the time or resources to recompute the same operations. 

For this reason, we can store an intermmediate state using the `cache` function. Spark can then start computations from this state instead of the beggining of the transformation chain.

Below is an example showcasing how the `cache` function can be used. We have data from potato fields in the Netherlands:

In [None]:
potatoesDF = spark.read.csv("/home/jovyan/datasets/potatoes.csv", header=True, inferSchema=True)
potatoesDF.show()

as well as soil consistency data:

In [None]:
soilDF = spark.read.csv("/home/jovyan/datasets/soil.csv", header=True, inferSchema=True)
soilDF.show()

and we want to prepare the dataset for some data analysis tasks. To prepare the dataset we would like to perform the following operations:
* create a planting year column
* match the soil properties to the potato fields

To add the year column in `potatoesDF` we can use the `withColumn` function. Here we also use the `year` and `to_timestamp` functions to process the date format:

In [None]:
potatoesDF = potatoesDF.withColumn("Year", F.year(F.to_timestamp('PlantingDate', 'dd-MM-yy')))

potatoesDF.show()

Next, to match the soil properties to the potato fields we can use the `join` function, to join `potatoesDF` with `soilDF`  on *SoilType* column:

In [None]:
joinedDF = potatoesDF.join(soilDF, "SoilType")
joinedDF.show()

Our dataset is now ready, we let Spark know the transformations that we want to perform and we are going to proceed with our data analysis tasks. If we compare our transformations with the image of this example, adding the year column corresponds to transformation A, and matching the soil properties corresponds to transformation B.

Moving forwards, for our data analysis we have two tasks. The first, which corresponds to transformation C in the image, is to find the average amount of fertilizer applied for each clay percentage in our dataset. The second, which corresponds to transformation D, is to find the the average amount of fertilizer applied to each cultivar in 2020.

The first task can be achieved by first grouping based on the *ClayPercentage* column and then aggregating on *FertilizerApplied* column:

In [None]:
task1DF = joinedDF.groupBy("ClayPercentage")\
                    .agg(F.mean(F.col("FertilizerApplied")))

The second task can be achieved by filtering based on the *Year* column, grouping on *Cultivar* column and aggregating on *FertilizerApplied*:

In [None]:
task2DF = joinedDF.filter(F.col("Year") == 2020)\
                    .groupBy("Cultivar")\
                    .agg(F.mean(F.col("FertilizerApplied")))

Notice here that for both tasks we started working on top of `joinedDF`, so both tasks have the same state as root. 

Until now we haven't perform any actions to get results. So let's apply an action (`show`) to get the results of our first task:

In [None]:
task1DF.show()

Task 1 took quite some time to finish. During this time Spark started from the beggining of the transformation  chain (adding the year column), added the soil properties and then performed the task. So `joinedDF` has been created here.

Now let's get the results of task 2:


In [None]:
task2DF.show()

We notice here that this task also takes quite some time to finish. As we observe on the Spark jobs panel, Spark started computing the transformation chain from the beggining even if we had already computed `joinedDF` for task 1.

Now, we are going to tell Spark to store `joinedDF` when it is computed, so that the result can be available for later use:

In [None]:
joinedDF.cache()

Next, we are going to perform again task 1, to force the computation of `joinedDF`:

In [None]:
task1DF.show()

Task 1 took as much time as it did before but check what happens when we perform task 2 which requires `joinedDF` to be available:

In [None]:
task2DF.show()

For task 2 the computation time was reduced since it already found `joinedDF` in memory. In this way, when we know that our operations have common ancestor states we can speed up our computations.

### Further reading

* [What is a DataFrame in Spark](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes)
* [DataFrame documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html?highlight=dataframe#pyspark.sql.DataFrame)
* [Functions to manipulate DataFrames](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions)
* [SQL syntax for DataFrames](https://spark.apache.org/docs/latest/api/sql/index.html)
* [Caching DataFrames](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.cache.html?highlight=cache#pyspark.sql.DataFrame.cache)