# Advanced Big Data: Spark SQL and DataFrames

------------------------------------
# Author:
- Mohamed NIANG, Data Scientist at Paris-Saclay University
- Penda TOURE, Data Scientist at Paris-Saclay University

# Teacher:
- Youcef Sebiat, Data Engineer / Scientist at CMAP / Lead Data Scientist at l'X Ã‰cole Polytechnique
- e-mail address: youcef.sebiat@polytechnique.edu
-----------------------------------

### 1. Basic manipulations
First, let's create some users:

In [3]:
from datetime import date

# Let's first create some users:
col_names = ["first_name", "last_name", "birth_date", "gender", "country"]
users = [
  ("Alice", "Jones", date(1981, 4, 15), "female", "Canada"),
  ("John", "Doe", date(1951, 1, 21), "male", "USA"),
  ("Barbara", "May", date(1951, 9, 1), "female", "Australia"),
  ("James", "Smith", date(1975, 7, 12), "male", "United Kingdom"),
  ("Gerrard", "Dupont", date(1968, 5, 9), "male", "France"),
  ("Amanda", "B.", date(1988, 12, 16), "female", "New Zeland")
]

users_df = spark.createDataFrame(users, col_names)
display(users_df) # Only works in Databricks. Elswehere, use "df.show()" or "df.toPandas()"

first_name,last_name,birth_date,gender,country
Alice,Jones,1981-04-15,female,Canada
John,Doe,1951-01-21,male,USA
Barbara,May,1951-09-01,female,Australia
James,Smith,1975-07-12,male,United Kingdom
Gerrard,Dupont,1968-05-09,male,France
Amanda,B.,1988-12-16,female,New Zeland


Now it's your turn, create more users (at least 3, with different names) and add to the initial users, saving the result in a new variable.

In [5]:
new_users_df = spark.createDataFrame([
  ("Mohamed", "Niang", date(1995, 9, 24), "male", "Senegal"),
  ("Fernanda", "Tchouacheu", date(1997, 5, 12), "female", "Cameroun"),
  ("Penda", "Toure", date(1996, 11, 13), "female", "Senegal"),
])
all_users_df = users_df.union(new_users_df)
display(all_users_df) # or all_users_df.show()

first_name,last_name,birth_date,gender,country
Alice,Jones,1981-04-15,female,Canada
John,Doe,1951-01-21,male,USA
Barbara,May,1951-09-01,female,Australia
James,Smith,1975-07-12,male,United Kingdom
Gerrard,Dupont,1968-05-09,male,France
Amanda,B.,1988-12-16,female,New Zeland
Mohamed,Niang,1995-09-24,male,Senegal
Fernanda,Tchouacheu,1997-05-12,female,Cameroun
Penda,Toure,1996-11-13,female,Senegal


Now, select only two columns and show the resulting DataFrame, without saving it into a variable.

In [7]:
all_users_df.select("first_name","last_name").show()

Now, register your DataFrame as a table and select the same two columns with a SQL query string

In [9]:
query_string = "SELECT first_name, last_name FROM all_users_df_view"
all_users_df.createTempView('all_users_df_view') # creates a local temporary table accessible by a SQL query
spark.sql(query_string).show()

Now we want to add an unique identifier for each user in the table. There are many strategies for that, and for our example we will use the string `{last_name}_{first_name}`

You can use `all_users_df` since your latest operation did not override its values. Add a new column called `user_id` to your DataFrame and save to a new variable.

**Hint:** The first place to look is in the [functions](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions) package

In [11]:
from pyspark.sql import functions as fn
from pyspark.sql.functions import concat, col, lit

users_with_id = all_users_df.withColumn('user_id', concat(col("first_name"), lit("_"), col("last_name")))
display(users_with_id)

first_name,last_name,birth_date,gender,country,user_id
Alice,Jones,1981-04-15,female,Canada,Alice_Jones
John,Doe,1951-01-21,male,USA,John_Doe
Barbara,May,1951-09-01,female,Australia,Barbara_May
James,Smith,1975-07-12,male,United Kingdom,James_Smith
Gerrard,Dupont,1968-05-09,male,France,Gerrard_Dupont
Amanda,B.,1988-12-16,female,New Zeland,Amanda_B.
Mohamed,Niang,1995-09-24,male,Senegal,Mohamed_Niang
Fernanda,Tchouacheu,1997-05-12,female,Cameroun,Fernanda_Tchouacheu
Penda,Toure,1996-11-13,female,Senegal,Penda_Toure


You can also do the same thing with an User Defined Function by passing a lambda, although it is not recommended when there is already a function in the `functions` package.

Add a new column called `user_id_udf` to the DataFrame, using an UDF that receives two parameters and concatenate them.

In [13]:
from pyspark.sql.types import StringType

concat_udf = fn.udf(lambda l,v: l + "_" + v, StringType())
users_with_id_udf = all_users_df.withColumn('user_id', concat_udf(all_users_df.first_name, all_users_df.last_name))
display(users_with_id_udf)

first_name,last_name,birth_date,gender,country,user_id
Alice,Jones,1981-04-15,female,Canada,Alice_Jones
John,Doe,1951-01-21,male,USA,John_Doe
Barbara,May,1951-09-01,female,Australia,Barbara_May
James,Smith,1975-07-12,male,United Kingdom,James_Smith
Gerrard,Dupont,1968-05-09,male,France,Gerrard_Dupont
Amanda,B.,1988-12-16,female,New Zeland,Amanda_B.
Mohamed,Niang,1995-09-24,male,Senegal,Mohamed_Niang
Fernanda,Tchouacheu,1997-05-12,female,Cameroun,Fernanda_Tchouacheu
Penda,Toure,1996-11-13,female,Senegal,Penda_Toure


Now, let's add another column called `age` with the computed age (in years) of each user, based on a given reference date, and save the resulting DataFrame into a new variable.

**Hint:** You can first compute the age in months, and then divide by 12. A final operation will probably be needed to get an integer number.

In [15]:
from pyspark.sql.functions import datediff

reference_date = date(2017, 12, 31)
age = datediff(lit(reference_date), all_users_df.birth_date)/365

users_with_age = all_users_df.withColumn('age', age.cast('int'))
display(users_with_age)

first_name,last_name,birth_date,gender,country,age
Alice,Jones,1981-04-15,female,Canada,36
John,Doe,1951-01-21,male,USA,66
Barbara,May,1951-09-01,female,Australia,66
James,Smith,1975-07-12,male,United Kingdom,42
Gerrard,Dupont,1968-05-09,male,France,49
Amanda,B.,1988-12-16,female,New Zeland,29
Mohamed,Niang,1995-09-24,male,Senegal,22
Fernanda,Tchouacheu,1997-05-12,female,Cameroun,20
Penda,Toure,1996-11-13,female,Senegal,21


Now, an analytical question: How many users of each gender who are more than 40 years old exist in this data? The solution must be a DataFrame with two columns: `gender` and `count` and two lines, one for each gender.

Bonus: Try to do your solution in a single chain (without intermediate variables)

**Hint:** You will need to filter and aggregate the data.

In [17]:
result = users_with_age.filter(users_with_age.age > 40).groupBy('gender').count()
display(result)

gender,count
female,1
male,3


### 2. Reading files, performing joins, and aggregating

For this section you will use some fake data of two datasets: `Users` and `Donations`. The data is provided in two CSV files *with header* and using *comma* as separator.

The `Users` dataset contains information about about the users. 

The `Donations` dataset contains information about Donations performed by those users.

The first task is to read these files into the appropriate DataFrames.

**Note:** You need to set the option "inferSchema" to true in order to have the columns in the correct types.

_The data for this section has been created using [Mockaroo](https://www.mockaroo.com/)_.

In [19]:
users_from_file = spark.read.format("csv") \
  .option("inferSchema", "True") \
  .option("header", "True") \
  .option("sep", ",") \
  .load("/FileStore/tables/users.csv")

donations = spark.read.format("csv") \
  .option("inferSchema", "True") \
  .option("header", "True") \
  .option("sep", ",") \
  .load("/FileStore/tables/donations.csv")

Now investigate the columns, contents and size of both datasets

In [21]:
# print the column names and types
users_from_file.printSchema()
donations.printSchema()

# print 5 elements of the datasets in a tabular format
users_from_file.select("*").limit(5).show()
donations.select("*").limit(5).show()

# print the number of lines of each dataset
print(users_from_file.count())
print(donations.count())

**Note:** If all the column types shown in the previous results are "string", you need to make sure you passed "inferSchema" as true when reading the CSV files before continuing.

Before using the data, we may want to add some information about the users. 

Add a column containing the age of each user.

In [23]:
from pyspark.sql.functions import datediff, substring

users_from_file = users_from_file.withColumn('age', (datediff(lit(reference_date), users_from_file.birth_date.substr(1,10))/365).cast('int'))
users_from_file.show()

Another useful information to have is the age **range** of each user. Using the `when` function, create the following 5 age ranges:
- "(0, 25]"
- "(25, 35]"
- "(35, 45]"
- "(45, 55]"
- "(55, ~]"

And add a new column to the users DataFrame, containing this information.

**Note:** When building logical operations with Spark DataFrames, it's better to be add parantheses. Example:
```python
df.select("name", "age").where( (df("age") > 20) & (df("age") <= 30) )
```

**Note 2:** If you are having problems with the `when` function, you can make an User Defined Function and do your logic in standard python.

In [25]:
from pyspark.sql import functions as fn

age_range = fn.when((users_from_file.age <= 25), '0-25')\
              .when(((users_from_file.age > 25) & (users_from_file.age <= 35)), '25-35')\
              .when(((users_from_file.age > 35) & (users_from_file.age <= 45)), '35-45')\
              .when(((users_from_file.age > 45) & (users_from_file.age <= 55)), '45-55')\
              .otherwise('55 and more')
                    
users_from_file = users_from_file.withColumn('age_range', age_range)
users_from_file.show()

Now that we have improved our users' DataFrame, the first analysis we want to make is the average donation performed by each gender. However, the gender information and the donation value are in different tables, so first we need to join them, using the `user_id` as joining key.

**Note:** Make sure you are not using the `users` DataFrame from the first part of the lab.

**Note 2:** For better performance, you can [broadcast](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.broadcast) the smaller dataset. But you should only do this when you are sure the DataFrame fits in the memory of the nodes.

In [27]:
joined_df = users_from_file.join(donations, 'user_id')
display(joined_df)

user_id,first_name,last_name,gender,birth_date,country,age,age_range,donation_id,donation_value,date,time,timestamp
63,Tabor,Bramer,Male,1983-05-29T00:00:00.000+0000,Brazil,34,25-35,1,25.41,2017-03-06T00:00:00.000+0000,18:28:20,2017-03-06T18:28:20.000+0000
53,Chris,Climie,Male,1970-06-02T00:00:00.000+0000,France,47,45-55,2,93.32,2017-03-15T00:00:00.000+0000,15:15:09,2017-03-15T15:15:09.000+0000
74,Livia,Lodemann,Female,1967-03-11T00:00:00.000+0000,United States,50,45-55,3,136.04,2017-10-29T00:00:00.000+0000,20:36:23,2017-10-29T20:36:23.000+0000
94,Kendre,Pankhurst.,Female,1941-05-28T00:00:00.000+0000,China,76,55 and more,4,37.25,2017-05-16T00:00:00.000+0000,12:33:58,2017-05-16T12:33:58.000+0000
83,Quincy,Marquess,Male,1997-08-13T00:00:00.000+0000,Brazil,20,0-25,5,89.45,2017-05-09T00:00:00.000+0000,22:45:44,2017-05-09T22:45:44.000+0000
80,Boothe,Endley,Male,1960-03-03T00:00:00.000+0000,China,57,55 and more,6,163.67,2017-07-25T00:00:00.000+0000,21:50:36,2017-07-25T21:50:36.000+0000
19,Sharl,Airy,Female,1941-09-08T00:00:00.000+0000,Brazil,76,55 and more,7,10.69,2017-11-15T00:00:00.000+0000,8:43:54,2017-11-15T08:43:54.000+0000
53,Chris,Climie,Male,1970-06-02T00:00:00.000+0000,France,47,45-55,8,135.09,2017-04-14T00:00:00.000+0000,7:43:37,2017-04-14T07:43:37.000+0000
46,Ethelyn,Raubenheim,Female,1953-07-19T00:00:00.000+0000,Brazil,64,55 and more,9,94.38,2017-03-07T00:00:00.000+0000,10:30:34,2017-03-07T10:30:34.000+0000
90,Farlee,O'Connell,Male,1985-05-15T00:00:00.000+0000,China,32,25-35,10,68.63,2017-02-02T00:00:00.000+0000,14:23:43,2017-02-02T14:23:43.000+0000


Now, use aggregation to find the the min, max and avg donation by gender.

In [29]:
donations_by_gender = joined_df.groupBy('gender').agg(
    fn.min('donation_value').alias('min of donation'),
    fn.max('donation_value').alias('max of donation'),
    fn.avg('donation_value').alias('average of donation'))
donations_by_gender.show()

Now, make the necessary transformations and aggregations to answer to the following questions about the data. Note that some questions are only about the users, so make sure to use the smaller possible dataset when looking for your answers!

**Question 1:** 

a) What's the average, min and max age of the users? 

b) What's the average, min and max age of the users, by gender?

In [31]:
result_1a = users_from_file.agg(
    fn.avg('age').alias('average of age'),
    fn.min('age').alias('min of age'),
    fn.max('age').alias('max of age'))
result_1a.show()

result_1b = users_from_file.groupBy('gender').agg(
    fn.avg('age').alias('average of age'),
    fn.min('age').alias('min of age'),
    fn.max('age').alias('max of age'))
result_1b.show()

**Question 2:**

a) How many distinct country origins exist in the data? Print a DataFrame listing them.

b) What are the top 5 countries with the most users? Print a DataFrame containing the name of the countries and the counts.

In [33]:
result_2a = users_from_file.select("country").distinct()
result_2a.show()

result_2b = users_from_file.groupBy('country').count().orderBy(fn.desc('count')).limit(5)
result_2b.show()

**Question 3:**

What's the number of donations average, min and max donations values by age range?

In [35]:
result_3 = joined_df.groupBy('age_range').agg(
    fn.avg('donation_value').alias('average of donations'),
    fn.min('donation_value').alias('min of donations'),
    fn.max('donation_value').alias('max of donations'))
result_3.show()

**Question 4:**

a) What's the number of donations, average, min and max donation values by user location (country)?

b) What is the number of donations, average, min and max donation values by gender for each user location (contry)? (the resulting DataFrame must contain 5 columns: the gender of the user, their country and the 3 metrics)

In [37]:
result_4a = joined_df.groupBy('country').agg(
    fn.avg('donation_value').alias('average of donations'),
    fn.min('donation_value').alias('min of donations'),
    fn.max('donation_value').alias('max of donations'))
result_4a.show()

result_4b = joined_df.groupBy('gender','country').agg(
    fn.avg('donation_value').alias('average of donations'),
    fn.min('donation_value').alias('min of donations'),
    fn.max('donation_value').alias('max of donations'))
result_4b.show()

**Question 5**

Which month of the year has the largest aggregated donation value?

**Hint**: you can use a function to extract the month from a date, then you can aggregate to find the total donation value.

In [39]:
from pyspark.sql.functions import *

result_5 = joined_df.groupBy(month(joined_df.date.substr(1,10)).alias('month')).agg(fn.sum('donation_value').alias('total donation')).orderBy(fn.desc('total donation'))
result_5.show()

### 3. Window Functions

_This section uses the same data as the last one._

Window functions are very useful for gathering aggregated data without actually aggregating the DataFrame. They can also be used to find "previous" and "next" information for entities, such as an user. [This article](https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html) has a very nice explanation about the concept.

We want to find the users who donated less than a threshold and remove them from the donations dataset. Now, there are two ways of doing that:

1) Performing a traditional aggregation to find the users who donated less than the threshold, then filtering these users from the donations dataset, either with `where(not(df("user_id").isin(a_local_list)))` or with a join of type "anti-join".

2) Using window functions to add a new column with the aggregated donations per user, and then using a normal filter such as `where(aggregated_donation < threshold)`

Let's implement both and compare the complexity:

First, perform the traditional aggregation and find the users who donated less than 500 in total:

In [42]:
bad_users = joined_df.groupBy('user_id').agg(fn.sum('donation_value').alias('aggregated_donation'))
bad_users = bad_users.where(bad_users.aggregated_donation < 500)
bad_users.show()

You should have found around 10 users. Now, perform an "anti-join" to remove those users from `joined_df`.

**Hint:** The `join` operation accepts a third argument which is the join type. The accepted values are: 'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left', 'rightouter', 'right', 'leftsemi', 'leftanti', 'cross'.

In [44]:
good_donations = joined_df.join(bad_users, joined_df.user_id == bad_users.user_id, 'leftanti')
good_donations.count()

Verify if the count of `good_donations` makes sense by performing a normal join to find the `bad_donations`.

In [46]:
bad_donations = joined_df.join(bad_users, joined_df.user_id == bad_users.user_id, 'inner')
bad_donations.count()

If you done everything right, at this point `good_donations.count()` + `bad_donations.count()` = `joined_df.count()`.

But using the join approach can be very heavy and it requires multipe operations. For this kind of problems, Window Functions are better.

In [48]:
good_donations.count() + bad_donations.count() == joined_df.count()

The first step is to create your window specification over `user_id` by using partitionBy

In [50]:
from pyspark.sql import Window

window_spec = Window.partitionBy('user_id')

Then, you can use one of the window functions of the `pyspark.sql.functions` package, appling to the created window_spec by using the `over` method. 

**Hint:** If you are blocked, try looking at the [documentation](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.over) for the `over` method, or searching on StackOverflow.

In [52]:
new_column = fn.sum('donation_value').over(window_spec)

donations_with_total = joined_df.withColumn('aggregated_donation', new_column)
donations_with_total.show()

And now you can just filter on the `total_donated_by_user` column:

In [54]:
good_donations_wf = donations_with_total.filter(donations_with_total.aggregated_donation >= 500)
good_donations_wf.count()

If you done everything right, you should obtain `good_donations_wf.count()` = `good_donations.count()`

In [56]:
good_donations_wf.count() == good_donations.count()

Window functions also can be useful to find the "next" and "previous" operations by using `functions.lead` and `functions.lag`. In our example, we can use it to find the date interval between two donations by a specific user.

For this kind of Window function, the window specification must be ordered by the date. So, create a new window specification partitioned by the `user_id` and ordered by the donation timestamp.

In [58]:
ordered_window = Window.partitionBy('user_id').orderBy(joined_df.timestamp.substr(1,10))

Now use `functions.lag().over()` to add a column with the timestamp of the previous donation of the same user. Then, inspect the result to see what it looks like.

In [60]:
new_column = fn.lag(joined_df.timestamp.substr(1,10), 1).over(ordered_window)

donations_with_lag = good_donations.withColumn('previous_donation', new_column)
donations_with_lag.orderBy("user_id", "timestamp").show()

Finally, compute the average time it took for each user between two of their consecutive donations (in days), and print the 5 users with the smallest averages. The result must include at least the users' id, last name and birth date, as well as the computed average.

In [62]:
new_column2 = fn.lead(joined_df.timestamp.substr(1,10), 1).over(ordered_window)
donations_with_lag = donations_with_lag.withColumn('next_donation', new_column2)
donations_with_lag = donations_with_lag.withColumn('day_duration', (datediff(lit(donations_with_lag.next_donation), donations_with_lag.previous_donation)).cast('int'))
result = donations_with_lag.groupBy('user_id').agg(fn.avg('day_duration').alias('average_duration_in_days')).orderBy(fn.asc('average_duration_in_days'))
donations_with_lag = result.join(donations_with_lag,"user_id",'left').select('user_id','last_name','birth_date','average_duration_in_days')

users_average_between_donations = donations_with_lag.orderBy(fn.asc('average_duration_in_days')).drop_duplicates()
users_average_between_donations.show(5)

Congratulations, you have finished this notebook!