# Exercises on Spark SQL/DataFrame API
<font color='violet'>Any changes made to the notebook text for the homework assignment are indicated in violet.</font>

This notebook contains the spark exercises using the **Spark SQL/DataFrame API**.

We start by installing pyspark (only execute if this is needed, e.g., if you are running this on Google Colab), and downloading the datasets. The exercises, which are the same as for the CORE API, follow.

### Useful documentation to do these exercises.

The PySpark Documentation is available at https://spark.apache.org/docs/latest/api/python/index.html. 

Instructions on how to install PySpark on your local PC may be found at https://spark.apache.org/docs/latest/api/python/getting_started/install.html. Note that by installing PySpark in this way, you automatically have a local copy of Spark.

The Spark SQL and Dataframe API api that we use below has the following  documentation which is a useful reference to have: https://spark.apache.org/docs/latest/sql-programming-guide.html

#### Installing PySpark

In [1]:
# This installs pyspark in the current python environment.
# By installing pyspark, we automatically also install spark.
# You **need** to run this cell when running this notebook in google colab
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=a3dbac6cd3b19db66a6dd6a9b44958b9d099be4769e02a0629e520d5c62b4bdc
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

#### General imports and starting Spark

In [2]:
#This is needed to start a Spark session from the notebook
#You may adjust the memory used by the driver program based on your machine's settings
import os 
os.environ['PYSPARK_SUBMIT_ARGS'] ="--conf spark.driver.memory=3g  pyspark-shell"

from pyspark.sql import SparkSession

In [3]:
# -------------------------------
# Start Spark in LOCAL mode
# -------------------------------

#The following lines are just there to allow this cell to be re-executed multiple times:
#if a spark session was already started, we stop it before starting a new one
#(there can be only one spark context per jupyter notebook)
try: 
    spark
    print("Spark application already started. Terminating existing application and starting new one")
    spark.stop()
except: 
    pass

# Create a new spark session (note, the * indicates to use all available CPU cores)
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("demoRDD") \
    .getOrCreate()
    
#When dealing with RDDs, we work the sparkContext object. See https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext
sc=spark.sparkContext

# We print the sparkcontext. This prints general information about the spark instance we have connected to. 
# In particular, the hyperlink allows us to open the spark UI (useful for seeing what is going on)
# Note: this hyperlink won't work when running this notebook in Google Colab.
sc

### Downloading data

The next cell downloads the data required to do the exercises

In [4]:
!mkdir downloads
!wget 'https://drive.google.com/u/0/uc?id=1Xlmwiku1RKLyACAPMFZMg1-RsWY-8tYS&export=download' -O downloads/data-spark-exercises.zip
!unzip -q downloads/data-spark-exercises.zip
!ls data

--2023-03-31 19:36:31--  https://drive.google.com/u/0/uc?id=1Xlmwiku1RKLyACAPMFZMg1-RsWY-8tYS&export=download
Resolving drive.google.com (drive.google.com)... 64.233.191.138, 64.233.191.101, 64.233.191.113, ...
Connecting to drive.google.com (drive.google.com)|64.233.191.138|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://drive.google.com/uc?id=1Xlmwiku1RKLyACAPMFZMg1-RsWY-8tYS&export=download [following]
--2023-03-31 19:36:31--  https://drive.google.com/uc?id=1Xlmwiku1RKLyACAPMFZMg1-RsWY-8tYS&export=download
Reusing existing connection to drive.google.com:443.
HTTP request sent, awaiting response... 303 See Other
Location: https://doc-0o-50-docs.googleusercontent.com/docs/securesc/ha0ro937gcuc7l7deffksulhg5h7mbp1/boenlfe2l782e86v8f8fgsiqiici1leh/1680291375000/12785547293638390956/*/1Xlmwiku1RKLyACAPMFZMg1-RsWY-8tYS?e=download&uuid=f4790f26-2834-4a42-81c7-a7ab12fe7556 [following]
--2023-03-31 19:36:33--  https://doc-0o-50-docs.googleusercontent.co

## 1. Sensor data exercises
In the file “data/sensors/sensor-sample.txt” you will find on each line, multiple fields of information, let’s call them : Date(Date), Time(Time), RoomId(Integer)-SensorId(Integer), Value1(float), Value2(float)
Using this file, use spark to compute the following queries :

1. Count the number of entries for each day.
2. Count the number of measures for each pair of RoomId-SensorId.
3. Compute the average of Value1.

<font color='violet'>Reading in the data:</font>

In [5]:
schema1 = "Date DATE, Time TIMESTAMP, RoomIdDashSensorId STRING, Value1 FLOAT, Value2 FLOAT"
sensorDF = spark.read.option("delimiter", " ").csv("data/sensors/sensor-sample.txt", schema=schema1)
sensorDF.show(5)

+----------+--------------------+------------------+--------+-------+
|      Date|                Time|RoomIdDashSensorId|  Value1| Value2|
+----------+--------------------+------------------+--------+-------+
|2017-03-31|2023-03-31 03:38:...|               1-0| 122.153|2.03397|
|2017-03-31|2023-03-31 03:38:...|               1-1|-3.91901|2.09397|
|2017-03-31|2023-03-31 03:38:...|               1-2|   11.04|2.07397|
|2017-02-28|2023-03-31 00:59:...|               1-0| 19.9884|2.74964|
|2017-02-28|2023-03-31 00:59:...|               1-1| 37.0933|2.76964|
+----------+--------------------+------------------+--------+-------+
only showing top 5 rows



<font color='violet'>1. Count the number of entries for each day.

The sorting is not strictly necessary, but helps verify the correctness of the result, and comparison with notebook 2:</font>

In [6]:
sensorDF.groupBy("Date").count().sort("Date").show(5)

+----------+-----+
|      Date|count|
+----------+-----+
|2017-02-28|62103|
|2017-03-01|33423|
|2017-03-02|32403|
|2017-03-03|29727|
|2017-03-04|30225|
+----------+-----+
only showing top 5 rows



<font color='violet'>2. Count the number of measures for each pair of RoomId-SensorId.

Again, sorting is not stirctly needed:</font>


In [7]:
sensorDF.groupBy("RoomIdDashSensorId").count().sort("RoomIdDashSensorId").show(5)

+------------------+-----+
|RoomIdDashSensorId|count|
+------------------+-----+
|               1-0|43047|
|               1-1|43047|
|               1-2|43047|
|               2-0|46915|
|               2-1|46915|
+------------------+-----+
only showing top 5 rows



<font color='violet'> 3. Compute the average of Value1: </font>

In [8]:
from pyspark.sql.functions import mean
sensorDF.select(mean("Value1")).show()

+-----------------+
|      avg(Value1)|
+-----------------+
|92.82106000775526|
+-----------------+



<font color='violet'> Or alternatively: </font>

In [9]:
sensorDF.groupby().mean("Value1").show()

+-----------------+
|      avg(Value1)|
+-----------------+
|92.82106000775526|
+-----------------+



## 2. Movielens movie data exercises

Movielens (https://movielens.org/) is a website that provides non-commercial, personalised movie recommendations. GroupLens Research has collected and made available rating data sets from the MovieLens web site for the purpose of research into making recommendation services. In this exercise, we will use one of these datasets (the movielens latest dataset, http://files.grouplens.org/datasets/movielens/ml-latest-small.zip) and compute some basic queries on it.
The dataset has already been downloaded and is available at data/movielens/movies.csv, data/movielens/ratings.csv, data/movielens/tags.csv, data/movielens/links.csv

1. Inspect the dataset's [README file](http://files.grouplens.org/datasets/movielens/ml-latest-small-README.html), in particular the section titled "Content and Use of Files" to learn the structure of these three files.
2. Compute all pairs (`movieid`, `rat`) where `movieid` is a movie id (as found in ratings.csv) and `rat` is the average rating of that movie id. (Hint: use aggregateByKey to compute first the sum of all ratings as well as the number of ratings per key).
3. Compute all pairs (`title`, `rat`) where `title` is a full movie title (as found in the movies.csv file), and `rat` is the average rating of that movie (computed over all possible ratings for that movie, as found in the ratings.csv file)
4. [_Extra_] Compute all pairs (`title`, `tag`) where `title` is a full movie title that has an average rating of at least 3.5, and `tag` is a tag for that movie (as found in the tags.csv file)

Extra: if you want to experiment with larger datasets, download the 10m dataset (http://files.grouplens.org/datasets/movielens/ml-10m.zip, 250 Mb uncompressed) and re-do the exercises above

<font color='violet'>Reading in the data.

"timestamp" should be converted to a proper DATE/TIME format instead of INT,
but since that column is not used in the exercises, I did not put in the additional effort.</font>

In [10]:
schema2_1 = "userId INT,movieId INT,rating FLOAT,timestamp INT"
ratingsDF = spark.read.option("delimiter", ",").csv("data/movielens/ratings.csv", schema=schema2_1)
ratingsDF.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [11]:
schema2_2 = "movieId INT,title STRING,genres STRING"
moviesDF = spark.read.option("delimiter", ",").csv("data/movielens/movies.csv", schema=schema2_2)
moviesDF.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [12]:
schema2_3 = "userId INT, movieId INT, tag STRING, timestamp INT"
tagsDF = spark.read.option("delimiter", ",").csv("data/movielens/tags.csv", schema=schema2_3)
tagsDF.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



<font color='violet'> 1. is the same as in notebook 2. </font>

<font color='violet'> 2. Compute all pairs (movieid, rat) where movieid is a movie id (as found in ratings.csv) and rat is the average rating of that movie id. (Hint: use aggregateByKey to compute first the sum of all ratings as well as the number of ratings per key).

Again, sorting is not strictly needed:
</font>

In [13]:
ratingsDF.groupBy("movieId").mean("rating").sort("movieId").show(5)

+-------+------------------+
|movieId|       avg(rating)|
+-------+------------------+
|      1|3.9209302325581397|
|      2|3.4318181818181817|
|      3|3.2596153846153846|
|      4| 2.357142857142857|
|      5|3.0714285714285716|
+-------+------------------+
only showing top 5 rows



<font color='violet'>3. Compute all pairs (`title`, `rat`) where `title` is a full movie title (as found in the movies.csv file), and `rat` is the average rating of that movie (computed over all possible ratings for that movie, as found in the ratings.csv file)

Again, sorting is not strictly necessary.
Also note that, compared to notebook 2, the September 11 movie is sorted to the first position, instead of the 32nd. There are additional quotes present around the title that were not properly stripped:</font>

In [14]:
ratingsDF.join(moviesDF, ratingsDF.movieId == moviesDF.movieId).groupBy("title").mean("rating").sort("title").show(32,truncate=False)

+------------------------------------------------------+------------------+
|title                                                 |avg(rating)       |
+------------------------------------------------------+------------------+
|"11'09""01 - September 11 (2002)"                     |4.0               |
|'71 (2014)                                            |4.0               |
|'Hellboy': The Seeds of Creation (2004)               |4.0               |
|'Round Midnight (1986)                                |3.5               |
|'Salem's Lot (2004)                                   |5.0               |
|'Til There Was You (1997)                             |4.0               |
|'Tis the Season for Love (2015)                       |1.5               |
|'burbs, The (1989)                                    |3.176470588235294 |
|'night Mother (1986)                                  |3.0               |
|(500) Days of Summer (2009)                           |3.6666666666666665|
|*batteries 

<font color='violet'>Or alternatively:</font>

In [15]:
ratingsDF.groupBy("movieId").mean("rating").join(moviesDF, ratingsDF.movieId == moviesDF.movieId).select("title","avg(rating)").sort("title").show(5,truncate=False)

+---------------------------------------+-----------+
|title                                  |avg(rating)|
+---------------------------------------+-----------+
|"11'09""01 - September 11 (2002)"      |4.0        |
|'71 (2014)                             |4.0        |
|'Hellboy': The Seeds of Creation (2004)|4.0        |
|'Round Midnight (1986)                 |3.5        |
|'Salem's Lot (2004)                    |5.0        |
+---------------------------------------+-----------+
only showing top 5 rows



<font color='violet'> 4. [_Extra_] Compute all pairs (title, tag) where title is a full movie title that has an average rating of at least 3.5, and tag is a tag for that movie (as found in the tags.csv file)

Again, sorting is not strictly needed, and the average rating is only included to verify the that no ratings less than 3.5 are listed.</font>

In [16]:
ratingsDF.groupBy("movieId")\
         .mean("rating")\
         .filter("avg(rating)>=3.5")\
         .join(tagsDF,ratingsDF.movieId == tagsDF.movieId)\
         .join(moviesDF, ratingsDF.movieId == moviesDF.movieId)\
         .select("title","tag","avg(rating)")\
         .sort("avg(rating)")\
         .show(10,truncate=False)

+-------------------------------+---------------------+-----------+
|title                          |tag                  |avg(rating)|
+-------------------------------+---------------------+-----------+
|Age of Innocence, The (1993)   |Edith Wharton        |3.5        |
|Ghost World (2001)             |adolescence          |3.5        |
|Othello (1995)                 |Shakespeare          |3.5        |
|Crossfire (1947)               |anti-Semitism        |3.5        |
|Preacher's Wife, The (1996)    |religion             |3.5        |
|Shadow of the Thin Man (1941)  |Nick and Nora Charles|3.5        |
|Return of the Secaucus 7 (1980)|In Netflix queue     |3.5        |
|Possession (2002)              |books                |3.5        |
|My Life (1993)                 |death                |3.5        |
|Beat the Devil (1953)          |crime                |3.5        |
+-------------------------------+---------------------+-----------+
only showing top 10 rows



## 3. Github log data exercises
Github makes activity logs publicly available at https://www.githubarchive.org/. One such log file, which contains activity data for 2015-03-01 between 0h-1h at night, has been downloaded and is available at `data/github/2015-03-01-0.json.gz`. This (compressed) file contains multiple JSON objects, one per line. Here is a sample line of this file, neatly formatted:

`{ "id": "2614896652",
    "type": "CreateEvent",
    "actor": {
        "id": 739622,
        "login": "treydock",
        "gravatar_id": "",
        "url": "https://api.githb.com/users/treydock",
        "avatar_url": "https://avatars.githubusercontent.com/u/739622?"
    },
    "repo": {
        "id": 23934080,
        "name": "Early-Modern-OCR/emop-dashboard",
    "url": "https://api.github.com/repos/Early-Modern-OCR/emop-dashboard"
    },
    "payload": {
        "ref": "development",
        "ref_type": "branch",
        "master-branch": "master",
        "description": "",
        "pusher_type": "user",
    },
    "public": true,
    "created_at": "2015-03-01T00:00:00Z",
    "org": {
        "id": 10965476,
        "login": "Early-Modern-OCR",
        "gravatar_id": "",
        "url": "https://api.github.com/orgs/Early-Modern-OCR",
        "avatar_url": "https://avatars.githubusercontent.com/u/10965476?"
    }
}`

This log entry has `CreateEvent` type and its `payload.ref_type` is `branch` . So someone named "treydock" (`actor.login`) created a repository branch called "development" (`payload.ref`) in the first second of March 1, 2015 (`created_at`) .

1. Load the json file as spark dataframe 

2. Filter this dataframe to retain only those rows that represent push activities (where `type` equals `PushEvent`)

3. Count the number of push events.

4. Compute the number of push events, grouped per `actor.login`. 

5. Retrieve the results of (4) in sorted order, where logins with higher number of pushes come first. Retrieve the 10 first such results (which contain the highest number of pushes)

6. You are representing a company and need to retrieve the number of pushes for every employee in the company. The file `data/github/employees.txt` contains a list of all employee login names at your company.

Extra: if you want to experiment with larger datasets, download more log data from the github archive website and re-do the exercises above

<font color='violet'>1. Load the json file as spark dataframe: </font>

In [17]:
gitDF = spark.read.json('data/github/2015-03-01-0.json.gz')
gitDF.show(3,truncate=False)

+--------------------------------------------------------------------------------------------------------------------+--------------------+----------+-------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-------------------------------------------------------------------------------------------------------------------------------------+-----------+
|actor                                                                                             

<font color='violet'> 2. Filter this dataframe to retain only those rows that represent push activities (where type equals PushEvent).

Again, sorting is not strictly needed:</font>

In [18]:
gitDF.filter(gitDF.type == "PushEvent").sort("actor.login").show(3,truncate=False)

+--------------------------------------------------------------------------------------------------------------------+--------------------+----------+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+---------------------------------------------------------------------------------------+---------+
|actor                                                                                                               |created_at          |id        |org |payload                                                                                                           

<font color='violet'>3. Count the number of push events:</font>

In [19]:
gitDF.filter(gitDF.type == "PushEvent").groupby().count().show()

+-----+
|count|
+-----+
| 8793|
+-----+




<font color='violet'> 4. Compute the number of push events, grouped per `actor.login`.

Again, sorting is not strictly needed:</font>

In [20]:
gitDF.filter(gitDF.type == "PushEvent").groupby("actor.login").count().sort("actor.login").show(3)

+-----------+-----+
|      login|count|
+-----------+-----+
|0000marcell|    1|
|   01000101|    1|
|    05K4R1N|    2|
+-----------+-----+
only showing top 3 rows



<font color='violet'>5. Retrieve the results of (4) in sorted order, where logins with higher number of pushes come first. Retrieve the 10 first such results (which contain the highest number of pushes):</font>


In [21]:
gitDF.filter(gitDF.type == "PushEvent").groupby("actor.login").count().sort("count",ascending=False).show(10)

+------------------+-----+
|             login|count|
+------------------+-----+
|      greatfirebot|  192|
|diversify-exp-user|  146|
|     KenanSulayman|   72|
|        manuelrp07|   45|
|    mirror-updates|   42|
|     tryton-mirror|   37|
|           Somasis|   26|
|   direwolf-github|   24|
|   EmanueleMinotto|   22|
|           hansliu|   21|
+------------------+-----+
only showing top 10 rows



<font color='violet'> 6. You are representing a company and need to retrieve the number of pushes for every employee in the company. The file `data/github/employees.txt` contains a list of all employee login names at your company.

IMPORTANT: All the employees in the file `data/github/employees.txt` seem to have at least one push. I have added additional employees with zero pushes to make sure my code is robust:
</font>

In [22]:
schema3 = "employee STRING"
employeeDF = spark.read.option("delimiter", ",").csv('data/github/employees.txt', schema=schema3)
newRow = spark.createDataFrame([('An employee with zero pushes',),('Another employee with zero pushes',)],schema=schema3)
employeeDF = employeeDF.union(newRow)
employeeDF.sort("employee").show(5, truncate=False)

+---------------------------------+
|employee                         |
+---------------------------------+
|AiMadobe                         |
|Akkyie                           |
|An employee with zero pushes     |
|Another employee with zero pushes|
|BatMiles                         |
+---------------------------------+
only showing top 5 rows



<font color='violet'>Notice lack of zero push employee:</font>

In [23]:
pushersDF = gitDF.filter(gitDF.type == "PushEvent").join(employeeDF, gitDF.actor.login == employeeDF.employee).groupBy("employee").count().sort("count",ascending=True)
pushersDF.show(3)

+-----------+-----+
|   employee|count|
+-----------+-----+
|   Tookmund|    1|
|  gvincenzi|    1|
|JustScience|    1|
+-----------+-----+
only showing top 3 rows



<font color='violet'>We can find the employees with zero pushes:</font>

In [24]:
from pyspark.sql.functions import lit
nopushersDF = employeeDF.join(gitDF, employeeDF.employee == gitDF.actor.login, 'left_anti').withColumn("count", lit(0))
nopushersDF.show(truncate=False)

+---------------------------------+-----+
|employee                         |count|
+---------------------------------+-----+
|An employee with zero pushes     |0    |
|Another employee with zero pushes|0    |
+---------------------------------+-----+



<font color='violet'>Join the two together for the correct output. Sorting is not strictly necessary:</font>

In [25]:
pushersDF.union(nopushersDF).sort("count").show(truncate=False)

+---------------------------------+-----+
|employee                         |count|
+---------------------------------+-----+
|An employee with zero pushes     |0    |
|Another employee with zero pushes|0    |
|Gix075                           |1    |
|JustScience                      |1    |
|Tookmund                         |1    |
|serranoarevalo                   |1    |
|jbernie2                         |1    |
|gvincenzi                        |1    |
|summersd                         |1    |
|whh8b                            |1    |
|IrinaDmt                         |1    |
|listingslab                      |1    |
|gkop                             |1    |
|WhiteHalmos                      |1    |
|jinmingmu                        |1    |
|nesteves                         |1    |
|makjona                          |1    |
|Juxnist                          |1    |
|dpyryesk                         |1    |
|aclindsa                         |1    |
+---------------------------------

In [26]:
pushersDF.union(nopushersDF).sort("count",ascending=False).show(3,truncate=False)

+-------------+-----+
|employee     |count|
+-------------+-----+
|KenanSulayman|72   |
|manuelrp07   |45   |
|Somasis      |26   |
+-------------+-----+
only showing top 3 rows



<font color='violet'>Stopping Spark:</font>

In [27]:
sc.stop()