<a href="https://colab.research.google.com/github/MiaMiya/02807-Computational-Tools-for-Data-Science/blob/main/Week7/exercises_week_7_solutions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 02807 - Week 7 Exercises:  Getting started with Spark, Solutions
**To learn the most, please find your own solutions before conferring with the ones provided below**


# Setup

* See also this week's slides for context

In [1]:
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable


In [2]:
# Instructions on p. 20 Learning Spark, 2nd ed.
# Here's a quick-guide, googling may also be required
# 1) Install pyspark via conda/pip
#          pyspark requires the JAVA_HOME environment variable is set.
# 2) Install JRE/JDK, figure out the install location
#          You will need to install Java 8 or above
# 3) Update the JAVA_HOME environment variable set programmatically below 
#    with your installation specifics

# Some maybe helpful pointers on JRE/JDK:
# MacOS: Consider using https://github.com/AdoptOpenJDK/homebrew-openjdk
# Debian (colab): !apt install openjdk-8-jdk-headless -qq
# Windows: 
#    If you didn't change the path during installation, it'll be 
#    something like C:\Program Files\Java\jdk1.8.0_65
#    You can also type where java at the command prompt.

# JAVA_HOME environment variable is set programatically below
# but you must point it to your local install

import os
os.environ["JAVA_HOME"] = "C:\Programmer\Java\jdk-14.0.2"
os.environ['PYSPARK_PYTHON'] = 'python'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'notebook'

In [3]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark import SparkContext, SparkConf

Let's initialise a **Spark Session**. 
* A SparkSession object is the entry point to the Spark functionality. 
* When you create the SparkSession object, it initiates a **Spark Application** which all the code for that Session will run on.


In [4]:
# create the Spark session
conf = SparkConf().set("spark.ui.port", "4050")
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [5]:
spark

# Exercise 1: The Titanic Dataset

In this exercise you should use Spark to count the number of Titanic passengers in different age brackets. More specifically, you need to count the number of people age 0 to 9, 10 to 19, and so on.

The data is available [here](https://courses.compute.dtu.dk/02807/2021/lectures/week7/titanic_full.csv) and should be loaded into a dataframe in the next cell.

In [6]:
# Your code goes here
df = spark.read.format('jdbc').option('header', True).option('inferSchema', True).csv('titanic_full.csv')

## Cleaning the data

Remove the rows that do not have an age 


In [7]:
# Your code goes here
df = df.filter(F.col('Age').isNotNull())

## Adding age brackets 

Create a new column with a value that identifies the bracket that passengers are in

In [8]:
# Your code goes here
def age_bracket(age):
    for bracket in range(10, 150, 10):
        if age < bracket:
            return f"{bracket-10}-{bracket-1}"
        
age_bracket_udf = F.udf(age_bracket)

df = df.withColumn('AgeBracketUDF', age_bracket_udf(F.col('Age'))) \
        .withColumn('AgeBracketDiv', (F.col('Age') / 10).cast('integer')*10)

df.select('Age', 'AgeBracketUDF', 'AgeBracketDiv').show()

+----+-------------+-------------+
| Age|AgeBracketUDF|AgeBracketDiv|
+----+-------------+-------------+
|22.0|        20-29|           20|
|38.0|        30-39|           30|
|26.0|        20-29|           20|
|35.0|        30-39|           30|
|35.0|        30-39|           30|
|54.0|        50-59|           50|
| 2.0|          0-9|            0|
|27.0|        20-29|           20|
|14.0|        10-19|           10|
| 4.0|          0-9|            0|
|58.0|        50-59|           50|
|20.0|        20-29|           20|
|39.0|        30-39|           30|
|14.0|        10-19|           10|
|55.0|        50-59|           50|
| 2.0|          0-9|            0|
|31.0|        30-39|           30|
|35.0|        30-39|           30|
|34.0|        30-39|           30|
|15.0|        10-19|           10|
+----+-------------+-------------+
only showing top 20 rows



## Age bracket counts

Create a Spark dataframe with the sum of passengers in each bracket, and sort it by Age Bracket (youngest to oldest).

In [9]:
# Your code goes here
df.groupBy('AgeBracketUDF', 'AgeBracketDiv') \
    .agg(F.count('AgeBracketUDF'), F.count('AgeBracketDiv')) \
    .sort('AgeBracketUDF', 'AgeBracketDiv') \
    .collect()

[Row(AgeBracketUDF='0-9', AgeBracketDiv=0, count(AgeBracketUDF)=62, count(AgeBracketDiv)=62),
 Row(AgeBracketUDF='10-19', AgeBracketDiv=10, count(AgeBracketUDF)=102, count(AgeBracketDiv)=102),
 Row(AgeBracketUDF='20-29', AgeBracketDiv=20, count(AgeBracketUDF)=220, count(AgeBracketDiv)=220),
 Row(AgeBracketUDF='30-39', AgeBracketDiv=30, count(AgeBracketUDF)=167, count(AgeBracketDiv)=167),
 Row(AgeBracketUDF='40-49', AgeBracketDiv=40, count(AgeBracketUDF)=89, count(AgeBracketDiv)=89),
 Row(AgeBracketUDF='50-59', AgeBracketDiv=50, count(AgeBracketUDF)=48, count(AgeBracketDiv)=48),
 Row(AgeBracketUDF='60-69', AgeBracketDiv=60, count(AgeBracketUDF)=19, count(AgeBracketDiv)=19),
 Row(AgeBracketUDF='70-79', AgeBracketDiv=70, count(AgeBracketUDF)=6, count(AgeBracketDiv)=6),
 Row(AgeBracketUDF='80-89', AgeBracketDiv=80, count(AgeBracketUDF)=1, count(AgeBracketDiv)=1)]

## Spark plans and jobs

1) Display the plans spark creates to achieve your query in the previous exercise. Identify how many shuffles/exchanges are to take place.

2) How many jobs are spawned when your query is executed? (Inspect via the Spark UI)

*Your answers here*

1) Two Exchanges are to take place, firstly hashpartioned, secondly rangepartioned.

2) Two jobs were spawned. The second job skips a stage and reads from the shuffle created (and cached) by the first job.

In [10]:
# Your code goes here

df.groupBy('AgeBracketUDF', 'AgeBracketDiv') \
    .agg(F.count('AgeBracketUDF'), F.count('AgeBracketDiv')) \
    .sort('AgeBracketUDF', 'AgeBracketDiv') \
    .explain()

== Physical Plan ==
*(4) Sort [AgeBracketUDF#41 ASC NULLS FIRST, AgeBracketDiv#55 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(AgeBracketUDF#41 ASC NULLS FIRST, AgeBracketDiv#55 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#148]
   +- *(3) HashAggregate(keys=[AgeBracketUDF#41, AgeBracketDiv#55], functions=[count(AgeBracketUDF#41), count(AgeBracketDiv#55)])
      +- Exchange hashpartitioning(AgeBracketUDF#41, AgeBracketDiv#55, 200), ENSURE_REQUIREMENTS, [id=#144]
         +- *(2) HashAggregate(keys=[AgeBracketUDF#41, AgeBracketDiv#55], functions=[partial_count(AgeBracketUDF#41), partial_count(AgeBracketDiv#55)])
            +- *(2) Project [pythonUDF0#138 AS AgeBracketUDF#41, (cast((Age#21 / 10.0) as int) * 10) AS AgeBracketDiv#55]
               +- BatchEvalPython [age_bracket(Age#21)], [pythonUDF0#138]
                  +- *(1) Filter isnotnull(Age#21)
                     +- FileScan csv [Age#21] Batched: false, DataFilters: [isnotnull(Age#21)], Format: CSV, Location: I

# Exercise 2: Actions and transformations

For each of the following Spark operations, decide if they are transformations or actions. If they are transformations, determine if they are wide or narrow.

* ``select()``
* `groupBy()`
* `filter()`
* `where()`
* `count()`
* `show()`
* `agg()`
* `write()`
* `sort()`

*Your answers here*

* `select()` is a narrow transformation
* `groupBy()` is a wide transformation as we must gather groups from different partitions
* `filter()` is a narrow transformation - we can filter each individual partition
* `where()` is a narrow transformation, an alias for filter
* `count()` is an action, or a wide transformation when done on a groupeddataset
* `show()` is an action
* `agg()` is a wide transformation
* `write()` is an action 
* `sort` is a wide transformation as we must sort across partitions

# Exercise 3: Exploratory data analysis for the Chicago crime dataset
The Chicago Crime dataset contains a summary of the reported crimes occurred in the City of Chicago from 2001 to 2017. 

We'll work on a sample of it available [here](https://courses.compute.dtu.dk/02807/2021/lectures/week7/reported-crimes.csv).

You may optionally work on the entire dataset which is available [here](https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD).

Now load the data you've downloaded locally into a spark dataframe, and proceed to answer the questions below.

In [12]:
# Your code goes here
rc = spark.read.csv('reported-crimes.csv', header=True)
rc.toPandas().head(3)

Unnamed: 0,ID,CASE_NUMBER,DATE,BLOCK,IUCR,PRIMARY_TYPE,DESCRIPTION,LOCATION_DESCRIPTION,ARREST,DOMESTIC,...,WARD,COMMUNITY_AREA_NUMBER,FBICODE,X_COORDINATE,Y_COORDINATE,YEAR,UPDATEDON,LATITUDE,LONGITUDE,LOCATION
0,3512276,HK587712,08/28/2004 05:50:56 PM,047XX S KEDZIE AVE,890,THEFT,FROM BUILDING,SMALL RETAIL STORE,False,False,...,14,58,6,1155838,1873050,2004,02/10/2018 03:50:01 PM,41.8074405,-87.70395585,"(41.8074405, -87.703955849)"
1,3406613,HK456306,06/26/2004 12:40:00 PM,009XX N CENTRAL PARK AVE,820,THEFT,$500 AND UNDER,OTHER,False,False,...,27,23,6,1152206,1906127,2004,02/28/2018 03:56:25 PM,41.89827996,-87.71640551,"(41.898279962, -87.716405505)"
2,8002131,HT233595,04/04/2011 05:45:00 AM,043XX S WABASH AVE,820,THEFT,$500 AND UNDER,NURSING HOME/RETIREMENT HOME,False,False,...,3,38,6,1177436,1876313,2011,02/10/2018 03:50:01 PM,41.81593313,-87.62464213,"(41.815933131, -87.624642127)"


## What percentage of reported crimes resulted in an arrest?

In [13]:
# Your code goes here
(rc.filter(F.col('ARREST') == 'TRUE').count() / rc.count()) * 100, 'percent out of', rc.count(), 'reports'

(30.58161350844278, 'percent out of', 533, 'reports')

## What are the top 3 locations for reported crimes?

In [14]:
# Your code goes here
rc.groupBy('LOCATION_DESCRIPTION').count() \
    .sort(F.desc('count')) \
    .limit(3).toPandas()

Unnamed: 0,LOCATION_DESCRIPTION,count
0,STREET,136
1,RESIDENCE,84
2,SIDEWALK,64


## What are the top 3 locations for reported thefts?

In [15]:
# Your code goes here

rc.filter(F.col('PRIMARY_TYPE') == 'THEFT') \
    .groupBy('LOCATION_DESCRIPTION').count() \
    .sort(F.desc('count')) \
    .limit(3).toPandas()

Unnamed: 0,LOCATION_DESCRIPTION,count
0,STREET,31
1,GROCERY FOOD STORE,7
2,DEPARTMENT STORE,7


## What is the most common primary type of crime in district 22?


In [16]:
# Your code goes here
rc.filter(F.col('DISTRICT') == '22') \
    .groupBy(F.col('PRIMARY_TYPE')).count() \
    .sort(F.desc('count')).show(3)

+------------+-----+
|PRIMARY_TYPE|count|
+------------+-----+
|       THEFT|    6|
|    BURGLARY|    2|
|     BATTERY|    1|
+------------+-----+
only showing top 3 rows



## What are the suffixes where crime is taking place?

Inspect the `BLOCK` column to observe common suffixes are `AVE, BLVD, ST, DR, PL, RD` (there's a few more even). Create a dataframe that contains the frequency of each block suffix, e.g. showing there are 258 `AVE` suffixes and 191 `ST` suffixes.


In [17]:
# Your code goes here

rc.select(
    F.reverse(F.split('BLOCK', ' '))[0].alias('suffix')
).groupby('suffix').count().toPandas().set_index('suffix').T

suffix,PL,DR,AV,BROADWAY,E,PKWY,BL,CT,D,RD,AVE,Ave,TER,BLVD,ST
count,9,11,25,2,1,2,2,1,1,13,258,1,1,15,191


In [18]:
# This more verbose option gives more granular control

rc.select(F.lower('BLOCK').alias('block')) \
    .select(
    F.col('block').endswith('ave').alias('AVE').cast('integer'),
    F.col('block').endswith('av').alias('AV').cast('integer'),
    F.col('block').endswith('blvd').alias('BLVD').cast('integer'),
    F.col('block').endswith('st').alias('ST').cast('integer'),
    F.col('block').endswith('dr').alias('DR').cast('integer'),
    F.col('block').endswith('pl').alias('PL').cast('integer'),
    F.col('block').endswith('rd').alias('RD').cast('integer'),
    F.col('block').endswith('pkwy').alias('PKWY').cast('integer'),
    F.col('block').endswith('ct').alias('CT').cast('integer')
).agg(
    (F.sum('AVE') + F.sum('AV')).alias('sum(AVE)'), 
    F.sum('BLVD'), F.sum('ST'), 
    F.sum('DR'), F.sum('PL'), 
    F.sum('RD'), F.sum('PKWY'), 
    F.sum('CT')
).toPandas()

Unnamed: 0,sum(AVE),sum(BLVD),sum(ST),sum(DR),sum(PL),sum(RD),sum(PKWY),sum(CT)
0,284,15,191,11,9,13,2,1
