# Spark SQL and DataFrames

### 1. Basic manipulations
First, let's create some users:

In [0]:
from datetime import date

# Let's first create some users:
col_names = ["first_name", "last_name", "birth_date", "gender", "country"]
users = [
  ("Alice", "Jones", date(1981, 4, 15), "female", "Canada"),
  ("John", "Doe", date(1951, 1, 21), "male", "USA"),
  ("Barbara", "May", date(1951, 9, 1), "female", "Australia"),
  ("James", "Smith", date(1975, 7, 12), "male", "United Kingdom"),
  ("Gerrard", "Dupont", date(1968, 5, 9), "male", "France"),
  ("Amanda", "B.", date(1988, 12, 16), "female", "New Zeland")
]

users_df = spark.createDataFrame(users, col_names)
display(users_df) # Only works in Databricks. Elswehere, use "df.show()" or "df.toPandas()"

first_name,last_name,birth_date,gender,country
Alice,Jones,1981-04-15,female,Canada
John,Doe,1951-01-21,male,USA
Barbara,May,1951-09-01,female,Australia
James,Smith,1975-07-12,male,United Kingdom
Gerrard,Dupont,1968-05-09,male,France
Amanda,B.,1988-12-16,female,New Zeland


Now it's your turn, create more users (at least 3, with different names) and add to the initial users, saving the result in a new variable.

In [0]:
from pyspark.sql import Row
new_users_df = [
    Row(first_name="Donald",last_name="Trump",birth_date=date(1958, 4, 15),gender="male",country="Russia"),
    Row(first_name="Ilgin",last_name="Erole",birth_date=date(1961, 3, 3),gender="female",country="Turkey"),
    Row(first_name="Quitin",last_name="Kock",birth_date=date(1981, 6, 11),gender="male",country="South Africa"),
] 
new_users_df=spark.createDataFrame(new_users_df)
all_users_df=users_df.union(new_users_df)
display(all_users_df) # or all_users_df.show()

first_name,last_name,birth_date,gender,country
Alice,Jones,1981-04-15,female,Canada
John,Doe,1951-01-21,male,USA
Barbara,May,1951-09-01,female,Australia
James,Smith,1975-07-12,male,United Kingdom
Gerrard,Dupont,1968-05-09,male,France
Amanda,B.,1988-12-16,female,New Zeland
Donald,Trump,1958-04-15,male,Russia
Ilgin,Erole,1961-03-03,female,Turkey
Quitin,Kock,1981-06-11,male,South Africa


Now, select only two columns and show the resulting DataFrame, without saving it into a variable.

In [0]:
all_users_df.select("first_name","last_name").show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|     Alice|    Jones|
|      John|      Doe|
|   Barbara|      May|
|     James|    Smith|
|   Gerrard|   Dupont|
|    Amanda|       B.|
|    Donald|    Trump|
|     Ilgin|    Erole|
|    Quitin|     Kock|
+----------+---------+



Now, register your DataFrame as a table and select the same two columns with a SQL query string

In [0]:
query_string = "SELECT first_name, last_name FROM temp"
all_users_df.createOrReplaceTempView("temp") # creates a local temporary table accessible by a SQL query
spark.sql(query_string).show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|     Alice|    Jones|
|      John|      Doe|
|   Barbara|      May|
|     James|    Smith|
|   Gerrard|   Dupont|
|    Amanda|       B.|
|    Donald|    Trump|
|     Ilgin|    Erole|
|    Quitin|     Kock|
+----------+---------+



Now we want to add an unique identifier for each user in the table. There are many strategies for that, and for our example we will use the string `{last_name}_{first_name}`

You can use `all_users_df` since your latest operation did not override its values. Add a new column called `user_id` to your DataFrame and save to a new variable.

**Hint:** The first place to look is in the [functions](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions) package

In [0]:
from pyspark.sql import functions as fn
from pyspark.sql.functions import concat_ws,col

users_with_id = all_users_df.select(concat_ws('_',all_users_df.last_name,all_users_df.first_name).alias("user_id"),"first_name", "last_name", "birth_date", "gender", "country")
display(users_with_id)

user_id,first_name,last_name,birth_date,gender,country
Jones_Alice,Alice,Jones,1981-04-15,female,Canada
Doe_John,John,Doe,1951-01-21,male,USA
May_Barbara,Barbara,May,1951-09-01,female,Australia
Smith_James,James,Smith,1975-07-12,male,United Kingdom
Dupont_Gerrard,Gerrard,Dupont,1968-05-09,male,France
B._Amanda,Amanda,B.,1988-12-16,female,New Zeland
Trump_Donald,Donald,Trump,1958-04-15,male,Russia
Erole_Ilgin,Ilgin,Erole,1961-03-03,female,Turkey
Kock_Quitin,Quitin,Kock,1981-06-11,male,South Africa


You can also do the same thing with an User Defined Function by passing a lambda, although it is not recommended when there is already a function in the `functions` package.

Add a new column called `user_id_udf` to the DataFrame, using an UDF that receives two parameters and concatenate them.

In [0]:
from pyspark.sql.types import StringType
def concu(col_1, col_2):
        return "{}_{}".format(col_1, col_2)
concat_udf = fn.udf(concu,StringType())
users_with_id_udf = all_users_df.withColumn("user_id", concat_udf(all_users_df['last_name'],all_users_df['first_name']))
display(users_with_id_udf)

first_name,last_name,birth_date,gender,country,user_id
Alice,Jones,1981-04-15,female,Canada,Jones_Alice
John,Doe,1951-01-21,male,USA,Doe_John
Barbara,May,1951-09-01,female,Australia,May_Barbara
James,Smith,1975-07-12,male,United Kingdom,Smith_James
Gerrard,Dupont,1968-05-09,male,France,Dupont_Gerrard
Amanda,B.,1988-12-16,female,New Zeland,B._Amanda
Donald,Trump,1958-04-15,male,Russia,Trump_Donald
Ilgin,Erole,1961-03-03,female,Turkey,Erole_Ilgin
Quitin,Kock,1981-06-11,male,South Africa,Kock_Quitin


Now, let's add another column called `age` with the computed age (in years) of each user, based on a given reference date, and save the resulting DataFrame into a new variable.

**Hint:** You can first compute the age in months, and then divide by 12. A final operation will probably be needed to get an integer number.

In [0]:
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType
reference_date = date(2017, 12, 31)

users_with_age = users_with_id.withColumn("age",fn.round(fn.months_between(fn.lit(reference_date),col("birth_date"))/lit(12),2))
display(users_with_age)

user_id,first_name,last_name,birth_date,gender,country,age
Jones_Alice,Alice,Jones,1981-04-15,female,Canada,36.71
Doe_John,John,Doe,1951-01-21,male,USA,66.94
May_Barbara,Barbara,May,1951-09-01,female,Australia,66.33
Smith_James,James,Smith,1975-07-12,male,United Kingdom,42.47
Dupont_Gerrard,Gerrard,Dupont,1968-05-09,male,France,49.64
B._Amanda,Amanda,B.,1988-12-16,female,New Zeland,29.04
Trump_Donald,Donald,Trump,1958-04-15,male,Russia,59.71
Erole_Ilgin,Ilgin,Erole,1961-03-03,female,Turkey,56.83
Kock_Quitin,Quitin,Kock,1981-06-11,male,South Africa,36.55


Now, an analytical question: How many users of each gender who are more than 40 years old exist in this data? The solution must be a DataFrame with two columns: `age` and `count` and two lines, one for each gender.

Bonus: Try to do your solution in a single chain (without intermediate variables)

**Hint:** You will need to filter and aggregate the data.

In [0]:
from pyspark.sql.functions import count
result = users_with_age.filter(users_with_age.age>lit(40)).groupBy('gender').count()
display(result)

gender,count
male,4
female,2


### 2. Reading files, performing joins, and aggregating

For this section you will use some fake data of two datasets: `Users` and `Donations`. The data is provided in two CSV files *with header* and using *comma* as separator.

The `Users` dataset contains information about about the users. 

The `Donations` dataset contains information about Donations performed by those users.

The first task is to read these files into the appropriate DataFrames.

**Note:** You need to set the option "inferSchema" to true in order to have the columns in the correct types.

_The data for this section has been created using [Mockaroo](https://www.mockaroo.com/)_.

In [0]:

path1="dbfs:/FileStore/shared_uploads/akhilesh.rajagopalan@ut-capitole.fr/users-1.csv"
path2="dbfs:/FileStore/shared_uploads/akhilesh.rajagopalan@ut-capitole.fr/donations-1.csv"
users_from_file = spark.read.csv(path1, sep=",", header=True,inferSchema="true")
donations = spark.read.csv(path2, sep=",", header=True,inferSchema="true")

Now investigate the columns, contents and size of both datasets

In [0]:
# print the column names and types
for col in users_from_file.dtypes:
    print(col[0]+" , "+col[1])


user_id , int
first_name , string
last_name , string
gender , string
birth_date , string
country , string


In [0]:
for col in donations.dtypes:
    print(col[0]+" , "+col[1]) 

donation_id , int
user_id , int
donation_value , double
date , string
time , string
timestamp , string


In [0]:
# print 5 elements of the datasets in a tabular format
users_from_file.limit(5).show()


+-------+----------+---------+------+----------+-------------+
|user_id|first_name|last_name|gender|birth_date|      country|
+-------+----------+---------+------+----------+-------------+
|      1|   Janifer|   Lagadu|Female|1940-08-09|United States|
|      2|     Bruis|  Danilov|  Male|1966-05-28|       Brazil|
|      3|      Omar| Spinozzi|  Male|1977-08-28|        China|
|      4|       Mag|  Kennedy|Female|1963-11-11|        China|
|      5|    Gerome|   Lushey|  Male|1990-06-08|       Brazil|
+-------+----------+---------+------+----------+-------------+



In [0]:
donations.limit(5).show()

+-----------+-------+--------------+----------+--------+-------------------+
|donation_id|user_id|donation_value|      date|    time|          timestamp|
+-----------+-------+--------------+----------+--------+-------------------+
|          1|     63|         25.41|2017-03-06|18:28:20|2017-03-06 18:28:20|
|          2|     53|         93.32|2017-03-15|15:15:09|2017-03-15 15:15:09|
|          3|     74|        136.04|2017-10-29|20:36:23|2017-10-29 20:36:23|
|          4|     94|         37.25|2017-05-16|12:33:58|2017-05-16 12:33:58|
|          5|     83|         89.45|2017-05-09|22:45:44|2017-05-09 22:45:44|
+-----------+-------+--------------+----------+--------+-------------------+



In [0]:
# print the number of lines of each dataset
users_from_file.count()


Out[16]: 100

In [0]:
donations.count()

Out[20]: 1000

**Note:** If all the column types shown in the previous results are "string", you need to make sure you passed "inferSchema" as true when reading the CSV files before continuing.

Before using the data, we may want to add some information about the users. 

Add a column containing the age of each user.

In [0]:
users_from_file = users_from_file.withColumn("age",fn.round(fn.months_between(fn.current_date(),col("birth_date"))/lit(12),2))
users_from_file.show()


+-------+----------+----------+------+----------+-------------+-----+
|user_id|first_name| last_name|gender|birth_date|      country|  age|
+-------+----------+----------+------+----------+-------------+-----+
|      1|   Janifer|    Lagadu|Female|1940-08-09|United States|81.47|
|      2|     Bruis|   Danilov|  Male|1966-05-28|       Brazil|55.67|
|      3|      Omar|  Spinozzi|  Male|1977-08-28|        China|44.42|
|      4|       Mag|   Kennedy|Female|1963-11-11|        China|58.22|
|      5|    Gerome|    Lushey|  Male|1990-06-08|       Brazil|31.64|
|      6|   Lazarus|    Andrus|  Male|1983-04-22|       France|38.77|
|      7|     Adora|  Bartunek|Female|1971-06-04|       France|50.65|
|      8|    Gladys|   Gulland|Female|1959-07-07|       Brazil|62.56|
|      9|  Pearline|   Kitcher|Female|1944-06-01|        China|77.66|
|     10|     Jolie|    Diggin|Female|1944-07-31|      Nigeria| 77.5|
|     11|    Devlen|     Devil|  Male|1955-01-11|United States|67.05|
|     12|     Sibel|

Another useful information to have is the age **range** of each user. Using the `when` function, create the following 5 age ranges:
- "(0, 25]"
- "(25, 35]"
- "(35, 45]"
- "(45, 55]"
- "(55, ~]"

And add a new column to the users DataFrame, containing this information.

**Note:** When building logical operations with Spark DataFrames, it's better to be add parantheses. Example:
```python
df.select("name", "age").where( (df("age") > 20) & (df("age") <= 30) )
```

**Note 2:** If you are having problems with the `when` function, you can make an User Defined Function and do your logic in standard python.

In [0]:
from pyspark.sql.functions import when as w
from pyspark.sql.functions import *
user_from_file = users_from_file.withColumn("age_range",
                                             w((users_from_file['age']>0) & (users_from_file['age']<=25),"0-25")
                                             .w((users_from_file['age']>25) & (users_from_file['age']<=35),"25-35")
                                             .w((users_from_file['age']>35) & (users_from_file['age']<=45),"35-45")
                                             .w((users_from_file['age']>45) & (users_from_file['age']<=55),"45-55")
                                             .w((users_from_file['age']>55),"55-~"))

users_from_file.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
[0;32m<command-647001413252403>[0m in [0;36m<module>[0;34m[0m
[1;32m      2[0m [0;32mfrom[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mfunctions[0m [0;32mimport[0m [0;34m*[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m user_from_file = users_from_file.withColumn("age_range",
[0;32m----> 4[0;31m                                              [0mw[0m[0;34m([0m[0;34m([0m[0musers_from_file[0m[0;34m[[0m[0;34m'age'[0m[0;34m][0m[0;34m>[0m[0;36m0[0m[0;34m)[0m [0;34m&[0m [0;34m([0m[0musers_from_file[0m[0;34m[[0m[0;34m'age'[0m[0;34m][0m[0;34m<=[0m[0;36m25[0m[0;34m)[0m[0;34m,[0m[0;34m"0-25"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      5[0m                                              [0;34m.[0m[0mw[0m[0;34m([0m[0;34m([0m[0musers_from_f

In [0]:
from pyspark.sql.functions import udf
age_range = udf(lambda age: 
                       '0-25' if (age > 0 and age <= 25) else
                       '25-35' if (age > 25 and age <= 35) else
                       '35-45' if (age > 35 and age <= 45) else
                       '45-55' if (age > 45 and age <= 55) else
                       '55-~' if (age > 55) else '')

users_from_file = users_from_file.withColumn('age_range', age_range(users_from_file.age))
users_from_file.show()

+-------+----------+----------+------+----------+-------------+-----+---------+
|user_id|first_name| last_name|gender|birth_date|      country|  age|age_range|
+-------+----------+----------+------+----------+-------------+-----+---------+
|      1|   Janifer|    Lagadu|Female|1940-08-09|United States|81.47|     55-~|
|      2|     Bruis|   Danilov|  Male|1966-05-28|       Brazil|55.67|     55-~|
|      3|      Omar|  Spinozzi|  Male|1977-08-28|        China|44.42|    35-45|
|      4|       Mag|   Kennedy|Female|1963-11-11|        China|58.22|     55-~|
|      5|    Gerome|    Lushey|  Male|1990-06-08|       Brazil|31.64|    25-35|
|      6|   Lazarus|    Andrus|  Male|1983-04-22|       France|38.77|    35-45|
|      7|     Adora|  Bartunek|Female|1971-06-04|       France|50.65|    45-55|
|      8|    Gladys|   Gulland|Female|1959-07-07|       Brazil|62.56|     55-~|
|      9|  Pearline|   Kitcher|Female|1944-06-01|        China|77.66|     55-~|
|     10|     Jolie|    Diggin|Female|19

Now that we have improved our users' DataFrame, the first analysis we want to make is the average donation performed by each gender. However, the gender information and the donation value are in different tables, so first we need to join them, using the `user_id` as joining key.

**Note:** Make sure you are not using the `users` DataFrame from the first part of the lab.

**Note 2:** For better performance, you can [broadcast](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.broadcast) the smaller dataset. But you should only do this when you are sure the DataFrame fits in the memory of the nodes.

In [0]:
joined_df = users_from_file.join(donations,'user_id')
display(joined_df)

user_id,first_name,last_name,gender,birth_date,country,age,age_range,donation_id,donation_value,date,time,timestamp
63,Tabor,Bramer,Male,1983-05-29,Brazil,38.67,35-45,1,25.41,2017-03-06,18:28:20,2017-03-06 18:28:20
53,Chris,Climie,Male,1970-06-02,France,51.66,45-55,2,93.32,2017-03-15,15:15:09,2017-03-15 15:15:09
74,Livia,Lodemann,Female,1967-03-11,United States,54.88,45-55,3,136.04,2017-10-29,20:36:23,2017-10-29 20:36:23
94,Kendre,Pankhurst.,Female,1941-05-28,China,80.67,55-~,4,37.25,2017-05-16,12:33:58,2017-05-16 12:33:58
83,Quincy,Marquess,Male,1997-08-13,Brazil,24.46,0-25,5,89.45,2017-05-09,22:45:44,2017-05-09 22:45:44
80,Boothe,Endley,Male,1960-03-03,China,61.91,55-~,6,163.67,2017-07-25,21:50:36,2017-07-25 21:50:36
19,Sharl,Airy,Female,1941-09-08,Brazil,80.39,55-~,7,10.69,2017-11-15,8:43:54,2017-11-15 8:43:54
53,Chris,Climie,Male,1970-06-02,France,51.66,45-55,8,135.09,2017-04-14,7:43:37,2017-04-14 7:43:37
46,Ethelyn,Raubenheim,Female,1953-07-19,Brazil,68.53,55-~,9,94.38,2017-03-07,10:30:34,2017-03-07 10:30:34
90,Farlee,O'Connell,Male,1985-05-15,China,36.71,35-45,10,68.63,2017-02-02,14:23:43,2017-02-02 14:23:43


Now, use aggregation to find the the min, max and avg donation by gender.

In [0]:
donations_by_gender = joined_df.groupBy('gender').min('donation_value')
donations_by_gender.show()

+------+-------------------+
|gender|min(donation_value)|
+------+-------------------+
|Female|               1.25|
|  Male|               1.12|
+------+-------------------+



In [0]:
donations_by_gender = joined_df.groupBy('gender').max('donation_value')
donations_by_gender.show()

+------+-------------------+
|gender|max(donation_value)|
+------+-------------------+
|Female|             199.87|
|  Male|             199.93|
+------+-------------------+



In [0]:
donations_by_gender = joined_df.groupBy('gender').mean('donation_value')
donations_by_gender.show()

+------+-------------------+
|gender|avg(donation_value)|
+------+-------------------+
|Female|  97.62667400881057|
|  Male| 100.23606227106244|
+------+-------------------+



Now, make the necessary transformations and aggregations to answer to the following questions about the data. Note that some questions are only about the users, so make sure to use the smaller possible dataset when looking for your answers!

**Question 1:** 

a) What's the average, min and max age of the users? 

b) What's the average, min and max age of the users, by gender?

In [0]:

result_1a = joined_df.agg(fn.min('age'),fn.max('age'),fn.mean('age'))
result_1a.show()

result_1b = joined_df.groupBy('gender').agg(fn.min('age'),fn.max('age'),fn.mean('age'))
result_1b.show()

+--------+--------+-----------------+
|min(age)|max(age)|         avg(age)|
+--------+--------+-----------------+
|   23.37|   81.67|51.69778999999999|
+--------+--------+-----------------+

+------+--------+--------+------------------+
|gender|min(age)|max(age)|          avg(age)|
+------+--------+--------+------------------+
|Female|   23.41|   81.67|55.301674008810565|
|  Male|   23.37|   79.82| 48.70115384615388|
+------+--------+--------+------------------+



**Question 2:**

a) How many distinct country origins exist in the data? Print a DataFrame listing them.

b) What are the top 5 countries with the most users? Print a DataFrame containing the name of the countries and the counts.

In [0]:
from pyspark.sql.functions import desc
result_2a = joined_df.select('country').distinct()
result_2a.show()

result_2b = joined_df.groupBy('country').count().sort(desc("count")).limit(5)
result_2b.show()

+-------------+
|      country|
+-------------+
|      Germany|
|       France|
|United States|
|        China|
|      Nigeria|
|       Brazil|
|        Japan|
|  New Zealand|
+-------------+

+-------------+-----+
|      country|count|
+-------------+-----+
|        China|  563|
|       Brazil|  180|
|       France|  120|
|United States|   50|
|        Japan|   37|
+-------------+-----+



**Question 3:**

What's the number of donations average, min and max donations values by age range?

In [0]:
result_3 = joined_df.groupBy('age_range').agg(fn.min('donation_value'),fn.max('donation_value'),fn.mean('donation_value'))
result_3.show()

+---------+-------------------+-------------------+-------------------+
|age_range|min(donation_value)|max(donation_value)|avg(donation_value)|
+---------+-------------------+-------------------+-------------------+
|    35-45|               1.25|             199.87|   97.8137442922374|
|    45-55|               2.03|             199.93| 101.11534759358287|
|     55-~|               1.19|             199.91|   99.8850582750583|
|     0-25|               4.33|             198.15|  105.3863888888889|
|    25-35|               1.12|             199.28|  93.62031007751939|
+---------+-------------------+-------------------+-------------------+



**Question 4:**

a) What's the number of donations, average, min and max donation values by user location (country)?

b) What is the number of donations, average, min and max donation values by gender for each user location (contry)? (the resulting DataFrame must contain 5 columns: the gender of the user, their country and the 3 metrics)

In [0]:
result_4a = joined_df.groupBy('country').agg(fn.min('donation_value'),fn.max('donation_value'),fn.mean('donation_value'))
result_4a.show()

result_4b = joined_df.groupBy('country','gender').agg(fn.min('donation_value'),fn.max('donation_value'),fn.mean('donation_value'))
result_4b.show()

+-------------+-------------------+-------------------+-------------------+
|      country|min(donation_value)|max(donation_value)|avg(donation_value)|
+-------------+-------------------+-------------------+-------------------+
|      Germany|              17.81|             180.48|  95.64157894736842|
|       France|               5.79|             199.93|  95.63199999999999|
|United States|               1.36|             198.15|           108.3796|
|        China|               1.25|             199.32|  97.62353463587931|
|      Nigeria|              14.47|             192.63|  81.22600000000001|
|       Brazil|               1.12|             199.91| 105.70655555555568|
|        Japan|               1.19|             199.76|  95.63270270270273|
|  New Zealand|              12.98|             172.51|             107.93|
+-------------+-------------------+-------------------+-------------------+

+-------------+------+-------------------+-------------------+-------------------+
|   

**Question 5**

Which month of the year has the largest aggregated donation value?

**Hint**: you can use a function to extract the month from a date, then you can aggregate to find the total donation value.

In [0]:
from pyspark.sql.functions import month

result_5 = joined_df.withColumn('month',month(joined_df.date)).groupBy('month').agg(fn.sum('donation_value')).sort(desc('sum(donation_value)'))
result_5.show()


+-----+-------------------+
|month|sum(donation_value)|
+-----+-------------------+
|    5| 10696.400000000001|
|   12| 10430.539999999997|
|    9|            8971.71|
|   11|            8530.99|
|    6|            8354.05|
|    8|  8344.849999999999|
|    4|  8308.579999999998|
|   10|            8287.28|
|    7|  8151.220000000003|
|    3| 7646.9299999999985|
|    1|  6752.159999999997|
|    2|            4576.69|
+-----+-------------------+



### 3. Window Functions

_This section uses the same data as the last one._

Window functions are very useful for gathering aggregated data without actually aggregating the DataFrame. They can also be used to find "previous" and "next" information for entities, such as an user. [This article](https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html) has a very nice explanation about the concept.

We want to find the users who donated less than a threshold and remove them from the donations dataset. Now, there are two ways of doing that:

1) Performing a traditional aggregation to find the users who donated less than the threshold, then filtering these users from the donations dataset, either with `where(not(df("user_id").isin(a_local_list)))` or with a join of type "anti-join".

2) Using window functions to add a new column with the aggregated donations per user, and then using a normal filter such as `where(aggregated_donation < threshold)`

Let's implement both and compare the complexity:

First, perform the traditional aggregation and find the users who donated less than 500 in total:

In [0]:
bad_users_df = joined_df.groupBy('user_id').agg(fn.sum(joined_df.donation_value).alias('total_donation'))
bad_users_df.show()

+-------+------------------+
|user_id|    total_donation|
+-------+------------------+
|     31|            746.76|
|     85|437.78999999999996|
|     65| 804.3499999999999|
|     53| 755.6099999999999|
|     78|1088.8500000000001|
|     34|           1853.22|
|     81|           1086.24|
|     28|            908.56|
|     76|1405.9299999999998|
|     27|           1039.54|
|     26|           1030.63|
|     44|            820.06|
|     12|           1374.73|
|     91|1817.1899999999998|
|     22|1178.9399999999998|
|     93| 712.8100000000001|
|     47|            351.84|
|      1|            889.16|
|     52|           1099.23|
|     13|           1077.32|
+-------+------------------+
only showing top 20 rows



In [0]:
bad_users = bad_users_df[bad_users_df.total_donation < 500]
bad_users.show()

+-------+------------------+
|user_id|    total_donation|
+-------+------------------+
|     85|437.78999999999996|
|     47|            351.84|
|     96|            354.97|
|     84|329.46999999999997|
|     87|497.60999999999996|
|     82|            291.73|
|     73|455.20000000000005|
|     25|413.09000000000003|
|     42|            215.35|
|     99|            370.99|
+-------+------------------+



You should have found around 10 users. Now, perform an "anti-join" to remove those users from `joined_df`.

**Hint:** The `join` operation accepts a third argument which is the join type. The accepted values are: 'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left', 'rightouter', 'right', 'leftsemi', 'leftanti', 'cross'.

In [0]:
good_donations = joined_df.join(bad_users, 'user_id', 'leftanti')
good_donations.count()

Out[36]: 955

Verify if the count of `good_donations` makes sense by performing a normal join to find the `bad_donations`.

In [0]:
bad_donations = joined_df.join(good_donations, 'user_id', 'leftanti')
bad_donations.count()

Out[37]: 45

If you done everything right, at this point `good_donations.count()` + `bad_donations.count()` = `joined_df.count()`.

But using the join approach can be very heavy and it requires multipe operations. For this kind of problems, Window Functions are better.

In [0]:
good_donations.count() + bad_donations.count() == joined_df.count()

Out[38]: True

The first step is to create your window specification over `user_id` by using partitionBy

In [0]:
from pyspark.sql import Window

window_spec = Window.partitionBy('user_id')

Then, you can use one of the window functions of the `pyspark.sql.functions` package, appling to the created window_spec by using the `over` method. 

**Hint:** If you are blocked, try looking at the [documentation](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.over) for the `over` method, or searching on StackOverflow.

In [0]:
new_column = fn.sum('donation_value').over(window_spec)

donations_with_total = joined_df.withColumn('total_donated_by_user', new_column)
donations_with_total.show()

+-------+----------+---------+------+----------+-------------+-----+---------+-----------+--------------+----------+--------+-------------------+---------------------+
|user_id|first_name|last_name|gender|birth_date|      country|  age|age_range|donation_id|donation_value|      date|    time|          timestamp|total_donated_by_user|
+-------+----------+---------+------+----------+-------------+-----+---------+-----------+--------------+----------+--------+-------------------+---------------------+
|      1|   Janifer|   Lagadu|Female|1940-08-09|United States|81.47|     55-~|        145|         95.88|2017-05-13| 5:27:25| 2017-05-13 5:27:25|               889.16|
|      1|   Janifer|   Lagadu|Female|1940-08-09|United States|81.47|     55-~|        264|        150.51|2017-09-21| 4:36:37| 2017-09-21 4:36:37|               889.16|
|      1|   Janifer|   Lagadu|Female|1940-08-09|United States|81.47|     55-~|        317|         51.55|2017-10-26| 1:55:37| 2017-10-26 1:55:37|               

And now you can just filter on the `total_donated_by_user` column:

In [0]:
good_donations_wf = donations_with_total[donations_with_total.total_donated_by_user > 500]
good_donations_wf.count()

Out[42]: 955

If you done everything right, you should obtain `good_donations_wf.count()` = `good_donations.count()`

In [0]:
good_donations_wf.count() == good_donations.count()

Out[43]: True

Window functions also can be useful to find the "next" and "previous" operations by using `functions.lead` and `functions.lag`. In our example, we can use it to find the date interval between two donations by a specific user.

For this kind of Window function, the window specification must be ordered by the date. So, create a new window specification partitioned by the `user_id` and ordered by the donation timestamp.

In [0]:
ordered_window = Window.partitionBy('user_id').orderBy('date')

Now use `functions.lag().over()` to add a column with the timestamp of the previous donation of the same user. Then, inspect the result to see what it looks like.

In [0]:
new_column = fn.lag('date').over(ordered_window)

donations_with_lag = good_donations.withColumn('Previous_date', new_column)
donations_with_lag.orderBy("user_id", "timestamp").show()

+-------+----------+---------+------+----------+-------------+-----+---------+-----------+--------------+----------+--------+-------------------+-------------+
|user_id|first_name|last_name|gender|birth_date|      country|  age|age_range|donation_id|donation_value|      date|    time|          timestamp|Previous_date|
+-------+----------+---------+------+----------+-------------+-----+---------+-----------+--------------+----------+--------+-------------------+-------------+
|      1|   Janifer|   Lagadu|Female|1940-08-09|United States|81.47|     55-~|        560|        121.82|2016-12-03| 3:34:03| 2016-12-03 3:34:03|         null|
|      1|   Janifer|   Lagadu|Female|1940-08-09|United States|81.47|     55-~|        486|        188.37|2017-02-01|20:20:44|2017-02-01 20:20:44|   2016-12-03|
|      1|   Janifer|   Lagadu|Female|1940-08-09|United States|81.47|     55-~|        145|         95.88|2017-05-13| 5:27:25| 2017-05-13 5:27:25|   2017-02-01|
|      1|   Janifer|   Lagadu|Female|194

Finally, compute the average time it took for each user between two of their consecutive donations (in days), and print the 5 users with the smallest averages. The result must include at least the users' id, last name and birth date, as well as the computed average.

In [0]:
users_average_between_donations = donations_with_lag.???
users_average_between_donations.show(5)

Congratulations, you have finished this notebook!