# Spark SQL and DataFrames

In [0]:
from pyspark.sql.functions import *

### 1. Basic manipulations
First, let's create some users:

In [0]:
from datetime import date

# Let's first create some users:
col_names = ["first_name", "last_name", "birth_date", "gender", "country"]
users = [
  ("Alice", "Jones", date(1981, 4, 15), "female", "Canada"),
  ("John", "Doe", date(1951, 1, 21), "male", "USA"),
  ("Barbara", "May", date(1951, 9, 1), "female", "Australia"),
  ("James", "Smith", date(1975, 7, 12), "male", "United Kingdom"),
  ("Gerrard", "Dupont", date(1968, 5, 9), "male", "France"),
  ("Amanda", "B.", date(1988, 12, 16), "female", "New Zeland")
]

users_df = spark.createDataFrame(users, col_names)
display(users_df) # Only works in Databricks. Elswehere, use "df.show()" or "df.toPandas()"

first_name,last_name,birth_date,gender,country
Alice,Jones,1981-04-15,female,Canada
John,Doe,1951-01-21,male,USA
Barbara,May,1951-09-01,female,Australia
James,Smith,1975-07-12,male,United Kingdom
Gerrard,Dupont,1968-05-09,male,France
Amanda,B.,1988-12-16,female,New Zeland


Now it's your turn, create more users (at least 3, with different names) and add to the initial users, saving the result in a new variable.

In [0]:
new_users = [
  ("Jon", "Jones", date(1981, 4, 15), "male", "USA"),
  ("Francis", "Ngannou", date(1951, 1, 21), "male", "Cameroun"),
  ("Cyril", "Gane", date(1951, 9, 1), "male", "France")
]

new_users_df = spark.createDataFrame(new_users, col_names)
all_users_df = users_df.union(new_users_df)
display(all_users_df) # or all_users_df.show()

first_name,last_name,birth_date,gender,country
Alice,Jones,1981-04-15,female,Canada
John,Doe,1951-01-21,male,USA
Barbara,May,1951-09-01,female,Australia
James,Smith,1975-07-12,male,United Kingdom
Gerrard,Dupont,1968-05-09,male,France
Amanda,B.,1988-12-16,female,New Zeland
Jon,Jones,1981-04-15,male,USA
Francis,Ngannou,1951-01-21,male,Cameroun
Cyril,Gane,1951-09-01,male,France


Now, select only two columns and show the resulting DataFrame, without saving it into a variable.

In [0]:
all_users_df.select("first_name", "last_name").show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|     Alice|    Jones|
|      John|      Doe|
|   Barbara|      May|
|     James|    Smith|
|   Gerrard|   Dupont|
|    Amanda|       B.|
|       Jon|    Jones|
|   Francis|  Ngannou|
|     Cyril|     Gane|
+----------+---------+



Now, register your DataFrame as a table and select the same two columns with a SQL query string

In [0]:
query_string = " SELECT first_name, last_name from a "
all_users_df.registerTempTable('a') # creates a local temporary table accessible by a SQL query
spark.sql(query_string).show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|     Alice|    Jones|
|      John|      Doe|
|   Barbara|      May|
|     James|    Smith|
|   Gerrard|   Dupont|
|    Amanda|       B.|
|       Jon|    Jones|
|   Francis|  Ngannou|
|     Cyril|     Gane|
+----------+---------+



Now we want to add an unique identifier for each user in the table. There are many strategies for that, and for our example we will use the string `{last_name}_{first_name}`

You can use `all_users_df` since your latest operation did not override its values. Add a new column called `user_id` to your DataFrame and save to a new variable.

**Hint:** The first place to look is in the [functions](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions) package

In [0]:
from pyspark.sql import functions as fn

users_with_id = all_users_df.withColumn('user_id', fn.concat(fn.col('last_name'), fn.lit('_'), fn.col('first_name')))
display(users_with_id)

first_name,last_name,birth_date,gender,country,user_id
Alice,Jones,1981-04-15,female,Canada,Jones_Alice
John,Doe,1951-01-21,male,USA,Doe_John
Barbara,May,1951-09-01,female,Australia,May_Barbara
James,Smith,1975-07-12,male,United Kingdom,Smith_James
Gerrard,Dupont,1968-05-09,male,France,Dupont_Gerrard
Amanda,B.,1988-12-16,female,New Zeland,B._Amanda
Jon,Jones,1981-04-15,male,USA,Jones_Jon
Francis,Ngannou,1951-01-21,male,Cameroun,Ngannou_Francis
Cyril,Gane,1951-09-01,male,France,Gane_Cyril


You can also do the same thing with an User Defined Function by passing a lambda, although it is not recommended when there is already a function in the `functions` package.

Add a new column called `user_id_udf` to the DataFrame, using an UDF that receives two parameters and concatenate them.

In [0]:
concat_udf = fn.udf(lambda x, y: x + '_' + y)
users_with_id_udf = all_users_df.withColumn('user_id', concat_udf(fn.col('last_name'), fn.col('first_name')))
display(users_with_id_udf)

first_name,last_name,birth_date,gender,country,user_id
Alice,Jones,1981-04-15,female,Canada,Jones_Alice
John,Doe,1951-01-21,male,USA,Doe_John
Barbara,May,1951-09-01,female,Australia,May_Barbara
James,Smith,1975-07-12,male,United Kingdom,Smith_James
Gerrard,Dupont,1968-05-09,male,France,Dupont_Gerrard
Amanda,B.,1988-12-16,female,New Zeland,B._Amanda
Jon,Jones,1981-04-15,male,USA,Jones_Jon
Francis,Ngannou,1951-01-21,male,Cameroun,Ngannou_Francis
Cyril,Gane,1951-09-01,male,France,Gane_Cyril


Now, let's add another column called `age` with the computed age (in years) of each user, based on a given reference date, and save the resulting DataFrame into a new variable.

**Hint:** You can first compute the age in months, and then divide by 12. A final operation will probably be needed to get an integer number.

In [0]:
reference_date = date(2017, 12, 31)
all_users_df = all_users_df.withColumn('reference_date', fn.lit(reference_date))
users_with_age = all_users_df.withColumn("age", round(datediff(fn.col('reference_date'), fn.col("birth_date"))/365.25, 0))
display(users_with_age)

first_name,last_name,birth_date,gender,country,reference_date,age
Alice,Jones,1981-04-15,female,Canada,2017-12-31,37.0
John,Doe,1951-01-21,male,USA,2017-12-31,67.0
Barbara,May,1951-09-01,female,Australia,2017-12-31,66.0
James,Smith,1975-07-12,male,United Kingdom,2017-12-31,42.0
Gerrard,Dupont,1968-05-09,male,France,2017-12-31,50.0
Amanda,B.,1988-12-16,female,New Zeland,2017-12-31,29.0
Jon,Jones,1981-04-15,male,USA,2017-12-31,37.0
Francis,Ngannou,1951-01-21,male,Cameroun,2017-12-31,67.0
Cyril,Gane,1951-09-01,male,France,2017-12-31,66.0


Now, an analytical question: How many users of each gender who are more than 40 years old exist in this data? The solution must be a DataFrame with two columns: `age` and `count` and two lines, one for each gender.

Bonus: Try to do your solution in a single chain (without intermediate variables)

**Hint:** You will need to filter and aggregate the data.

In [0]:
sql_string = "select gender, count(first_name) as count from b where age >= 40 group by gender"
users_with_age.registerTempTable('b') # creates a local temporary table accessible by a SQL query
#spark.sql(sql_string).show()

result = spark.sql(sql_string)
display(result)

gender,count
male,5
female,1


### 2. Reading files, performing joins, and aggregating

For this section you will use some fake data of two datasets: `Users` and `Donations`. The data is provided in two CSV files *with header* and using *comma* as separator.

The `Users` dataset contains information about about the users. 

The `Donations` dataset contains information about Donations performed by those users.

The first task is to read these files into the appropriate DataFrames.

**Note:** You need to set the option "inferSchema" to true in order to have the columns in the correct types.

_The data for this section has been created using [Mockaroo](https://www.mockaroo.com/)_.

In [0]:
users_from_file = spark.read.format("csv").option("sep", ",").option("header", "true").option("inferSchema", "true").load("dbfs:/FileStore/shared_uploads/emir.dakin@ut-capitole.fr/users.csv")

donations = spark.read.format("csv").option("sep", ",").option("header", "true").option("inferSchema", "true").load("dbfs:/FileStore/shared_uploads/emir.dakin@ut-capitole.fr/donations.csv")

In [0]:
display(users_from_file)
display(donations)

user_id,first_name,last_name,gender,birth_date,country
1,Janifer,Lagadu,Female,1940-08-09,United States
2,Bruis,Danilov,Male,1966-05-28,Brazil
3,Omar,Spinozzi,Male,1977-08-28,China
4,Mag,Kennedy,Female,1963-11-11,China
5,Gerome,Lushey,Male,1990-06-08,Brazil
6,Lazarus,Andrus,Male,1983-04-22,France
7,Adora,Bartunek,Female,1971-06-04,France
8,Gladys,Gulland,Female,1959-07-07,Brazil
9,Pearline,Kitcher,Female,1944-06-01,China
10,Jolie,Diggin,Female,1944-07-31,Nigeria


donation_id,user_id,donation_value,date,time,timestamp
1,63,25.41,2017-03-06,18:28:20,2017-03-06 18:28:20
2,53,93.32,2017-03-15,15:15:09,2017-03-15 15:15:09
3,74,136.04,2017-10-29,20:36:23,2017-10-29 20:36:23
4,94,37.25,2017-05-16,12:33:58,2017-05-16 12:33:58
5,83,89.45,2017-05-09,22:45:44,2017-05-09 22:45:44
6,80,163.67,2017-07-25,21:50:36,2017-07-25 21:50:36
7,19,10.69,2017-11-15,8:43:54,2017-11-15 8:43:54
8,53,135.09,2017-04-14,7:43:37,2017-04-14 7:43:37
9,46,94.38,2017-03-07,10:30:34,2017-03-07 10:30:34
10,90,68.63,2017-02-02,14:23:43,2017-02-02 14:23:43


Now investigate the columns, contents and size of both datasets

In [0]:
# print the column names and types
print('users_from_file :', users_from_file.dtypes)
print('donations :', donations.dtypes)

# print 5 elements of the datasets in a tabular format
display(users_from_file.head(5))
display(donations.head(5))

# print the number of lines of each dataset
print('users_from_file number of lines :', users_from_file.count())
print('donations number of lines :', donations.count())

users_from_file : [('user_id', 'int'), ('first_name', 'string'), ('last_name', 'string'), ('gender', 'string'), ('birth_date', 'string'), ('country', 'string')]
donations : [('donation_id', 'int'), ('user_id', 'int'), ('donation_value', 'double'), ('date', 'string'), ('time', 'string'), ('timestamp', 'string')]
users_from_file number of lines : 100
donations number of lines : 1000


user_id,first_name,last_name,gender,birth_date,country
1,Janifer,Lagadu,Female,1940-08-09,United States
2,Bruis,Danilov,Male,1966-05-28,Brazil
3,Omar,Spinozzi,Male,1977-08-28,China
4,Mag,Kennedy,Female,1963-11-11,China
5,Gerome,Lushey,Male,1990-06-08,Brazil


donation_id,user_id,donation_value,date,time,timestamp
1,63,25.41,2017-03-06,18:28:20,2017-03-06 18:28:20
2,53,93.32,2017-03-15,15:15:09,2017-03-15 15:15:09
3,74,136.04,2017-10-29,20:36:23,2017-10-29 20:36:23
4,94,37.25,2017-05-16,12:33:58,2017-05-16 12:33:58
5,83,89.45,2017-05-09,22:45:44,2017-05-09 22:45:44


**Note:** If all the column types shown in the previous results are "string", you need to make sure you passed "inferSchema" as true when reading the CSV files before continuing.

Before using the data, we may want to add some information about the users. 

Add a column containing the age of each user.

In [0]:
users_from_file = users_from_file.withColumn("age", round(datediff(current_date(), col('birth_date'))/365.25, 0))
users_from_file.show()

+-------+----------+----------+------+----------+-------------+----+
|user_id|first_name| last_name|gender|birth_date|      country| age|
+-------+----------+----------+------+----------+-------------+----+
|      1|   Janifer|    Lagadu|Female|1940-08-09|United States|81.0|
|      2|     Bruis|   Danilov|  Male|1966-05-28|       Brazil|56.0|
|      3|      Omar|  Spinozzi|  Male|1977-08-28|        China|44.0|
|      4|       Mag|   Kennedy|Female|1963-11-11|        China|58.0|
|      5|    Gerome|    Lushey|  Male|1990-06-08|       Brazil|32.0|
|      6|   Lazarus|    Andrus|  Male|1983-04-22|       France|39.0|
|      7|     Adora|  Bartunek|Female|1971-06-04|       France|51.0|
|      8|    Gladys|   Gulland|Female|1959-07-07|       Brazil|63.0|
|      9|  Pearline|   Kitcher|Female|1944-06-01|        China|78.0|
|     10|     Jolie|    Diggin|Female|1944-07-31|      Nigeria|78.0|
|     11|    Devlen|     Devil|  Male|1955-01-11|United States|67.0|
|     12|     Sibel|  Lewisham|Fem

Another useful information to have is the age **range** of each user. Using the `when` function, create the following 5 age ranges:
- "(0, 25]"
- "(25, 35]"
- "(35, 45]"
- "(45, 55]"
- "(55, ~]"

And add a new column to the users DataFrame, containing this information.

**Note:** When building logical operations with Spark DataFrames, it's better to be add parantheses. Example:
```python
df.select("name", "age").where( (df("age") > 20) & (df("age") <= 30) )
```

**Note 2:** If you are having problems with the `when` function, you can make an User Defined Function and do your logic in standard python.

In [0]:
ranges = udf(lambda age: '(0, 25]' if age <= 25 else 
                       '(25, 35]' if (age > 25 and age <= 35) else
                       '(35, 45]' if (age > 35 and age <= 45) else
                       '(45, 55]' if (age > 45 and age <= 55) else
                       '(55, ~]' if (age > 55) else '')

users_from_file = users_from_file.withColumn('age_range', ranges(users_from_file.age))

users_from_file.show()

+-------+----------+----------+------+----------+-------------+----+---------+
|user_id|first_name| last_name|gender|birth_date|      country| age|age_range|
+-------+----------+----------+------+----------+-------------+----+---------+
|      1|   Janifer|    Lagadu|Female|1940-08-09|United States|81.0|  (55, ~]|
|      2|     Bruis|   Danilov|  Male|1966-05-28|       Brazil|56.0|  (55, ~]|
|      3|      Omar|  Spinozzi|  Male|1977-08-28|        China|44.0| (35, 45]|
|      4|       Mag|   Kennedy|Female|1963-11-11|        China|58.0|  (55, ~]|
|      5|    Gerome|    Lushey|  Male|1990-06-08|       Brazil|32.0| (25, 35]|
|      6|   Lazarus|    Andrus|  Male|1983-04-22|       France|39.0| (35, 45]|
|      7|     Adora|  Bartunek|Female|1971-06-04|       France|51.0| (45, 55]|
|      8|    Gladys|   Gulland|Female|1959-07-07|       Brazil|63.0|  (55, ~]|
|      9|  Pearline|   Kitcher|Female|1944-06-01|        China|78.0|  (55, ~]|
|     10|     Jolie|    Diggin|Female|1944-07-31|   

Now that we have improved our users' DataFrame, the first analysis we want to make is the average donation performed by each gender. However, the gender information and the donation value are in different tables, so first we need to join them, using the `user_id` as joining key.

**Note:** Make sure you are not using the `users` DataFrame from the first part of the lab.

**Note 2:** For better performance, you can [broadcast](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.broadcast) the smaller dataset. But you should only do this when you are sure the DataFrame fits in the memory of the nodes.

In [0]:
joined_df = donations.join(users_from_file, on='user_id', how='left')
display(joined_df)

user_id,donation_id,donation_value,date,time,timestamp,first_name,last_name,gender,birth_date,country,age,age_range
63,1,25.41,2017-03-06,18:28:20,2017-03-06 18:28:20,Tabor,Bramer,Male,1983-05-29,Brazil,39.0,"(35, 45]"
53,2,93.32,2017-03-15,15:15:09,2017-03-15 15:15:09,Chris,Climie,Male,1970-06-02,France,52.0,"(45, 55]"
74,3,136.04,2017-10-29,20:36:23,2017-10-29 20:36:23,Livia,Lodemann,Female,1967-03-11,United States,55.0,"(45, 55]"
94,4,37.25,2017-05-16,12:33:58,2017-05-16 12:33:58,Kendre,Pankhurst.,Female,1941-05-28,China,81.0,"(55, ~]"
83,5,89.45,2017-05-09,22:45:44,2017-05-09 22:45:44,Quincy,Marquess,Male,1997-08-13,Brazil,24.0,"(0, 25]"
80,6,163.67,2017-07-25,21:50:36,2017-07-25 21:50:36,Boothe,Endley,Male,1960-03-03,China,62.0,"(55, ~]"
19,7,10.69,2017-11-15,8:43:54,2017-11-15 8:43:54,Sharl,Airy,Female,1941-09-08,Brazil,80.0,"(55, ~]"
53,8,135.09,2017-04-14,7:43:37,2017-04-14 7:43:37,Chris,Climie,Male,1970-06-02,France,52.0,"(45, 55]"
46,9,94.38,2017-03-07,10:30:34,2017-03-07 10:30:34,Ethelyn,Raubenheim,Female,1953-07-19,Brazil,69.0,"(55, ~]"
90,10,68.63,2017-02-02,14:23:43,2017-02-02 14:23:43,Farlee,O'Connell,Male,1985-05-15,China,37.0,"(35, 45]"


Now, use aggregation to find the the min, max and avg donation by gender.

In [0]:
sql_string = "select gender, AVG(donation_value) as avg_donation_value, MIN(donation_value) as min_donation_value, MAX(donation_value) as max_donation_value from c group by gender"

joined_df.registerTempTable('c') # creates a local temporary table accessible by a SQL query

donations_by_gender = spark.sql(sql_string)

donations_by_gender.show()

+------+------------------+------------------+------------------+
|gender|avg_donation_value|min_donation_value|max_donation_value|
+------+------------------+------------------+------------------+
|Female| 97.62667400881057|              1.25|            199.87|
|  Male|100.23606227106244|              1.12|            199.93|
+------+------------------+------------------+------------------+



Now, make the necessary transformations and aggregations to answer to the following questions about the data. Note that some questions are only about the users, so make sure to use the smaller possible dataset when looking for your answers!

**Question 1:** 

a) What's the average, min and max age of the users? 

b) What's the average, min and max age of the users, by gender?

In [0]:
result_1a = users_from_file.agg(avg('age').alias('average age'), 
                                min('age').alias('minimum age'), 
                                max('age').alias('maximum age'))
result_1a.show()

result_1b = users_from_file.groupBy('gender').agg(avg('age').alias('average age'), 
                                min('age').alias('minimum age'), 
                                max('age').alias('maximum age'))
result_1b.show()

+-----------+-----------+-----------+
|average age|minimum age|maximum age|
+-----------+-----------+-----------+
|      52.79|       23.0|       82.0|
+-----------+-----------+-----------+

+------+------------------+-----------+-----------+
|gender|       average age|minimum age|maximum age|
+------+------------------+-----------+-----------+
|Female|57.142857142857146|       23.0|       82.0|
|  Male|  48.6078431372549|       23.0|       80.0|
+------+------------------+-----------+-----------+



**Question 2:**

a) How many distinct country origins exist in the data? Print a DataFrame listing them.

b) What are the top 5 countries with the most users? Print a DataFrame containing the name of the countries and the counts.

In [0]:
result_2a = users_from_file.agg(countDistinct("country"))
result_2a.show()

result_2b = users_from_file.groupBy("country").agg(count("country")).sort(col("count(country)").desc()).limit(5)
result_2b.show()

+--------------+
|count(country)|
+--------------+
|             8|
+--------------+

+-------------+--------------+
|      country|count(country)|
+-------------+--------------+
|        China|            54|
|       Brazil|            19|
|       France|            13|
|United States|             5|
|        Japan|             4|
+-------------+--------------+



**Question 3:**

What's the number of donations average, min and max donations values by age range?

In [0]:
result_3 = joined_df.groupBy("age_range").agg(
        count('user_id').alias('number of donations'),
        avg('donation_value').alias('average donation value'), 
        min('donation_value').alias('minimum donation value'), 
        max('donation_value').alias('maximum donation value'))
result_3.show()

+---------+-------------------+----------------------+----------------------+----------------------+
|age_range|number of donations|average donation value|minimum donation value|maximum donation value|
+---------+-------------------+----------------------+----------------------+----------------------+
| (25, 35]|                149|     90.77322147651006|                  1.12|                199.28|
|  (55, ~]|                412|    100.41560679611653|                  1.19|                199.91|
| (45, 55]|                196|    100.64775510204082|                  2.49|                199.93|
|  (0, 25]|                 36|     105.3863888888889|                  4.33|                198.15|
| (35, 45]|                207|     99.68159420289848|                  2.03|                199.87|
+---------+-------------------+----------------------+----------------------+----------------------+



**Question 4:**

a) What's the number of donations, average, min and max donation values by user location (country)?

b) What is the number of donations, average, min and max donation values by gender for each user location (contry)? (the resulting DataFrame must contain 5 columns: the gender of the user, their country and the 3 metrics)

In [0]:
result_4a = joined_df.groupBy("country").agg(
        count('user_id').alias('number of donations'),
        avg('donation_value').alias('average donation value'), 
        min('donation_value').alias('minimum donation value'), 
        max('donation_value').alias('maximum donation value'))
result_4a.show()

result_4b = joined_df.groupBy("gender", "country").agg(
        count('user_id').alias('number of donations'),
        avg('donation_value').alias('average donation value'), 
        min('donation_value').alias('minimum donation value'), 
        max('donation_value').alias('maximum donation value'))
result_4b.show()

+-------------+-------------------+----------------------+----------------------+----------------------+
|      country|number of donations|average donation value|minimum donation value|maximum donation value|
+-------------+-------------------+----------------------+----------------------+----------------------+
|      Germany|                 19|     95.64157894736842|                 17.81|                180.48|
|       France|                120|     95.63199999999999|                  5.79|                199.93|
|United States|                 50|              108.3796|                  1.36|                198.15|
|        China|                563|     97.62353463587931|                  1.25|                199.32|
|      Nigeria|                 20|     81.22600000000001|                 14.47|                192.63|
|       Brazil|                180|    105.70655555555568|                  1.12|                199.91|
|        Japan|                 37|     95.632702702702

**Question 5**

Which month of the year has the largest aggregated donation value?

**Hint**: you can use a function to extract the month from a date, then you can aggregate to find the total donation value.

In [0]:
result_5 = donations.groupBy(month("date").alias("month")).agg(
                             sum('donation_value').alias('aggregated donation value')).sort(col("aggregated donation value").desc()).limit(1)

result_5.show()

+-----+-------------------------+
|month|aggregated donation value|
+-----+-------------------------+
|    5|       10696.400000000001|
+-----+-------------------------+



### 3. Window Functions

_This section uses the same data as the last one._

Window functions are very useful for gathering aggregated data without actually aggregating the DataFrame. They can also be used to find "previous" and "next" information for entities, such as an user. [This article](https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html) has a very nice explanation about the concept.

We want to find the users who donated less than a threshold and remove them from the donations dataset. Now, there are two ways of doing that:

1) Performing a traditional aggregation to find the users who donated less than the threshold, then filtering these users from the donations dataset, either with `where(not(df("user_id").isin(a_local_list)))` or with a join of type "anti-join".

2) Using window functions to add a new column with the aggregated donations per user, and then using a normal filter such as `where(aggregated_donation < threshold)`

Let's implement both and compare the complexity:

First, perform the traditional aggregation and find the users who donated less than 500 in total:

In [0]:
bad_users = joined_df.groupBy(fn.col('user_id')).agg(sum(fn.col('donation_value'))).filter(fn.col('sum(donation_value)') < 500)
bad_users.show()

+-------+-------------------+
|user_id|sum(donation_value)|
+-------+-------------------+
|     85| 437.78999999999996|
|     47|             351.84|
|     96|             354.97|
|     84| 329.46999999999997|
|     87| 497.60999999999996|
|     82|             291.73|
|     73| 455.20000000000005|
|     25| 413.09000000000003|
|     42|             215.35|
|     99|             370.99|
+-------+-------------------+



In [0]:
# timestamp as timestamp for future uses
joined_df = joined_df.withColumn("timestamp",to_timestamp("timestamp"))

joined_df.dtypes

Out[23]: [('user_id', 'int'),
 ('donation_id', 'int'),
 ('donation_value', 'double'),
 ('date', 'string'),
 ('time', 'string'),
 ('timestamp', 'timestamp'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('birth_date', 'string'),
 ('country', 'string'),
 ('age', 'double'),
 ('age_range', 'string')]

You should have found around 10 users. Now, perform an "anti-join" to remove those users from `joined_df`.

**Hint:** The `join` operation accepts a third argument which is the join type. The accepted values are: 'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left', 'rightouter', 'right', 'leftsemi', 'leftanti', 'cross'.

In [0]:
good_donations = joined_df.join(bad_users, on='user_id', how='leftanti')
good_donations.count()

Out[24]: 955

Verify if the count of `good_donations` makes sense by performing a normal join to find the `bad_donations`.

In [0]:
bad_donations = joined_df.join(bad_users, on='user_id', how='right')
bad_donations.count()

Out[25]: 45

If you done everything right, at this point `good_donations.count()` + `bad_donations.count()` = `joined_df.count()`.

But using the join approach can be very heavy and it requires multipe operations. For this kind of problems, Window Functions are better.

In [0]:
good_donations.count() + bad_donations.count() == joined_df.count()

Out[26]: True

The first step is to create your window specification over `user_id` by using partitionBy

In [0]:
from pyspark.sql import Window

window_spec = Window.partitionBy(joined_df['user_id'])

Then, you can use one of the window functions of the `pyspark.sql.functions` package, appling to the created window_spec by using the `over` method. 

**Hint:** If you are blocked, try looking at the [documentation](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.over) for the `over` method, or searching on StackOverflow.

In [0]:
new_column = fn.sum(joined_df['donation_value']).over(window_spec)

donations_with_total = joined_df.withColumn('total_donated_by_user', new_column)
donations_with_total.show()

+-------+-----------+--------------+----------+--------+-------------------+----------+---------+------+----------+-------------+----+---------+---------------------+
|user_id|donation_id|donation_value|      date|    time|          timestamp|first_name|last_name|gender|birth_date|      country| age|age_range|total_donated_by_user|
+-------+-----------+--------------+----------+--------+-------------------+----------+---------+------+----------+-------------+----+---------+---------------------+
|      1|        145|         95.88|2017-05-13| 5:27:25|2017-05-13 05:27:25|   Janifer|   Lagadu|Female|1940-08-09|United States|81.0|  (55, ~]|               889.16|
|      1|        264|        150.51|2017-09-21| 4:36:37|2017-09-21 04:36:37|   Janifer|   Lagadu|Female|1940-08-09|United States|81.0|  (55, ~]|               889.16|
|      1|        317|         51.55|2017-10-26| 1:55:37|2017-10-26 01:55:37|   Janifer|   Lagadu|Female|1940-08-09|United States|81.0|  (55, ~]|               889.16

And now you can just filter on the `total_donated_by_user` column:

In [0]:
good_donations_wf = donations_with_total.filter(fn.col('total_donated_by_user') < 500)
good_donations_wf.count()

Out[29]: 45

If you done everything right, you should obtain `good_donations_wf.count()` = `good_donations.count()`

In [0]:
good_donations_wf.count() == good_donations.count()

Out[30]: False

Window functions also can be useful to find the "next" and "previous" operations by using `functions.lead` and `functions.lag`. In our example, we can use it to find the date interval between two donations by a specific user.

For this kind of Window function, the window specification must be ordered by the date. So, create a new window specification partitioned by the `user_id` and ordered by the donation timestamp.

In [0]:
ordered_window = Window.partitionBy('user_id').orderBy('timestamp')

Now use `functions.lag().over()` to add a column with the timestamp of the previous donation of the same user. Then, inspect the result to see what it looks like.

In [0]:
new_column = fn.lag(donations_with_total['timestamp']).over(ordered_window)

donations_with_lag = good_donations.withColumn('last_donation_timestamp', new_column)
donations_with_lag.orderBy("user_id", "timestamp").show()

+-------+-----------+--------------+----------+--------+-------------------+----------+---------+------+----------+-------------+----+---------+-----------------------+
|user_id|donation_id|donation_value|      date|    time|          timestamp|first_name|last_name|gender|birth_date|      country| age|age_range|last_donation_timestamp|
+-------+-----------+--------------+----------+--------+-------------------+----------+---------+------+----------+-------------+----+---------+-----------------------+
|      1|        560|        121.82|2016-12-03| 3:34:03|2016-12-03 03:34:03|   Janifer|   Lagadu|Female|1940-08-09|United States|81.0|  (55, ~]|                   null|
|      1|        486|        188.37|2017-02-01|20:20:44|2017-02-01 20:20:44|   Janifer|   Lagadu|Female|1940-08-09|United States|81.0|  (55, ~]|    2016-12-03 03:34:03|
|      1|        145|         95.88|2017-05-13| 5:27:25|2017-05-13 05:27:25|   Janifer|   Lagadu|Female|1940-08-09|United States|81.0|  (55, ~]|    2017-02

Finally, compute the average time it took for each user between two of their consecutive donations (in days), and print the 5 users with the smallest averages. The result must include at least the users' id, last name and birth date, as well as the computed average.

In [0]:
users_average_between_donations = donations_with_lag.withColumn('days_between_donations', round(((fn.col('timestamp').cast('long') - fn.col('last_donation_timestamp').cast('long')) / 86400), 0)).groupBy(fn.col('user_id')).agg(avg(fn.col('days_between_donations'))).orderBy('avg(days_between_donations)')
#users_average_between_donations = joined_df.join(users_average_between_donations, on='user_id', how='right').orderBy('avg(days_between_donations)').groupBy('user_id')
users_average_between_donations.show(5)

+-------+---------------------------+
|user_id|avg(days_between_donations)|
+-------+---------------------------+
|     67|         14.307692307692308|
|     78|         16.166666666666668|
|     91|          17.88888888888889|
|      6|          18.58823529411765|
|     63|          18.88888888888889|
+-------+---------------------------+
only showing top 5 rows



Congratulations, you have finished this notebook!