<a href="https://colab.research.google.com/github/hanlululu/02807-Computational-tools-for-Data-Science/blob/main/Week7/exercises_week_7.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 02807 - Week 7 Exercises:  Getting started with Spark

# Setup

* See also this week's slides for context

In [2]:
!pip install pyspark



You should consider upgrading via the 'c:\users\hanlu\anaconda3\python.exe -m pip install --upgrade pip' command.


In [3]:
# Instructions on p. 20 Learning Spark, 2nd ed.
# Here's a quick-guide, googling may also be required
# 1) Install pyspark via conda/pip
#          pyspark requires the JAVA_HOME environment variable is set.
# 2) Install JRE/JDK, figure out the install location
#          You will need to install Java 8 or above
# 3) Update the JAVA_HOME environment variable set programmatically below 
#    with your installation specifics

# Some maybe helpful pointers on JRE/JDK:
# MacOS: Consider using https://github.com/AdoptOpenJDK/homebrew-openjdk
# Debian (colab): !apt install openjdk-8-jdk-headless -qq
# Windows: 
#    If you didn't change the path during installation, it'll be 
#    something like C:\Program Files\Java\jdk1.8.0_65
#    You can also type where java at the command prompt.

# JAVA_HOME environment variable is set programatically below
# but you must point it to your local install

import os
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-15.0.2"
os.environ['PYSPARK_PYTHON'] = 'python'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'notebook'

In [4]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark import SparkContext, SparkConf

Let's initialise a **Spark Session**. 
* A SparkSession object is the entry point to the Spark functionality. 
* When you create the SparkSession object, it initiates a **Spark Application** which all the code for that Session will run on.


In [5]:
# create the Spark session
conf = SparkConf().set("spark.ui.port", "4050")
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()


In [6]:
spark

# Exercise 1: The Titanic Dataset

In this exercise you should use Spark to count the number of Titanic passengers in different age brackets. More specifically, you need to count the number of people age 0 to 9, 10 to 19, and so on.

The data is available [here](https://courses.compute.dtu.dk/02807/2021/lectures/week7/titanic_full.csv) and should be loaded into a dataframe in the next cell.

In [7]:
df = spark.read.option('header', True) \
                .option('inferSchema', True) \
                .csv('titanic_full.csv')

In [8]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [9]:
df.toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.2500,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.9250,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1000,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.0500,,S
...,...,...,...,...,...,...,...,...,...,...,...,...
886,887,0,2,"Montvila, Rev. Juozas",male,27.0,0,0,211536,13.0000,,S
887,888,1,1,"Graham, Miss. Margaret Edith",female,19.0,0,0,112053,30.0000,B42,S
888,889,0,3,"""Johnston, Miss. Catherine Helen """"Carrie""""""",female,,1,2,W./C. 6607,23.4500,,S
889,890,1,1,"Behr, Mr. Karl Howell",male,26.0,0,0,111369,30.0000,C148,C


## Cleaning the data

Remove the rows that do not have an age 


In [10]:
df = df.filter(F.col('Age').isNotNull())
df.toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.2500,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.9250,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1000,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.0500,,S
...,...,...,...,...,...,...,...,...,...,...,...,...
709,886,0,3,"Rice, Mrs. William (Margaret Norton)",female,39.0,0,5,382652,29.1250,,Q
710,887,0,2,"Montvila, Rev. Juozas",male,27.0,0,0,211536,13.0000,,S
711,888,1,1,"Graham, Miss. Margaret Edith",female,19.0,0,0,112053,30.0000,B42,S
712,890,1,1,"Behr, Mr. Karl Howell",male,26.0,0,0,111369,30.0000,C148,C


## Adding age brackets 

Create a new column with a value that identifies the bracket that passengers are in

In [11]:
def age_bracket(age):
    for bracket in range(10, 150, 10):
        if int(age) < bracket:
            return f"{bracket-10}-{bracket-1}"
        
age_bracket_udf = F.udf(age_bracket)

df = df.withColumn('AgeBracketUDF', age_bracket_udf(F.col('Age'))) \
        .withColumn('AgeBracketDiv', (F.col('Age') / 10).cast('integer')*10)

df.select('Age', 'AgeBracketUDF', 'AgeBracketDiv').show()

+----+-------------+-------------+
| Age|AgeBracketUDF|AgeBracketDiv|
+----+-------------+-------------+
|22.0|        20-29|           20|
|38.0|        30-39|           30|
|26.0|        20-29|           20|
|35.0|        30-39|           30|
|35.0|        30-39|           30|
|54.0|        50-59|           50|
| 2.0|          0-9|            0|
|27.0|        20-29|           20|
|14.0|        10-19|           10|
| 4.0|          0-9|            0|
|58.0|        50-59|           50|
|20.0|        20-29|           20|
|39.0|        30-39|           30|
|14.0|        10-19|           10|
|55.0|        50-59|           50|
| 2.0|          0-9|            0|
|31.0|        30-39|           30|
|35.0|        30-39|           30|
|34.0|        30-39|           30|
|15.0|        10-19|           10|
+----+-------------+-------------+
only showing top 20 rows



## Age bracket counts

Create a Spark dataframe with the sum of passengers in each bracket, and sort it by Age Bracket (youngest to oldest).

In [12]:
df.groupBy('AgeBracketUDF', 'AgeBracketDiv') \
    .agg(F.count('AgeBracketUDF'), F.count('AgeBracketDiv')) \
    .sort('AgeBracketUDF', 'AgeBracketDiv') \
    .collect()

[Row(AgeBracketUDF='0-9', AgeBracketDiv=0, count(AgeBracketUDF)=62, count(AgeBracketDiv)=62),
 Row(AgeBracketUDF='10-19', AgeBracketDiv=10, count(AgeBracketUDF)=102, count(AgeBracketDiv)=102),
 Row(AgeBracketUDF='20-29', AgeBracketDiv=20, count(AgeBracketUDF)=220, count(AgeBracketDiv)=220),
 Row(AgeBracketUDF='30-39', AgeBracketDiv=30, count(AgeBracketUDF)=167, count(AgeBracketDiv)=167),
 Row(AgeBracketUDF='40-49', AgeBracketDiv=40, count(AgeBracketUDF)=89, count(AgeBracketDiv)=89),
 Row(AgeBracketUDF='50-59', AgeBracketDiv=50, count(AgeBracketUDF)=48, count(AgeBracketDiv)=48),
 Row(AgeBracketUDF='60-69', AgeBracketDiv=60, count(AgeBracketUDF)=19, count(AgeBracketDiv)=19),
 Row(AgeBracketUDF='70-79', AgeBracketDiv=70, count(AgeBracketUDF)=6, count(AgeBracketDiv)=6),
 Row(AgeBracketUDF='80-89', AgeBracketDiv=80, count(AgeBracketUDF)=1, count(AgeBracketDiv)=1)]

## Spark plans and jobs

1) Display the plans spark creates to achieve your query in the previous exercise. Identify how many shuffles/exchanges are to take place.

2) How many jobs are spawned when your query is executed? (Inspect via the Spark UI)

1. Two exchanges/shuffles were to take place, fistly a hash partitioning and then followed by a range partitioning. 
2. Two jobs spawned and the second job skips a stage and reads from the shuffle created (and cached) by the first job.

In [13]:
df.groupBy('AgeBracketUDF', 'AgeBracketDiv') \
    .agg(F.count('AgeBracketUDF'), F.count('AgeBracketDiv')) \
    .sort('AgeBracketUDF', 'AgeBracketDiv') \
    .explain()

== Physical Plan ==
*(4) Sort [AgeBracketUDF#65 ASC NULLS FIRST, AgeBracketDiv#79 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(AgeBracketUDF#65 ASC NULLS FIRST, AgeBracketDiv#79 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#161]
   +- *(3) HashAggregate(keys=[AgeBracketUDF#65, AgeBracketDiv#79], functions=[count(AgeBracketUDF#65), count(AgeBracketDiv#79)])
      +- Exchange hashpartitioning(AgeBracketUDF#65, AgeBracketDiv#79, 200), ENSURE_REQUIREMENTS, [id=#157]
         +- *(2) HashAggregate(keys=[AgeBracketUDF#65, AgeBracketDiv#79], functions=[partial_count(AgeBracketUDF#65), partial_count(AgeBracketDiv#79)])
            +- *(2) Project [pythonUDF0#162 AS AgeBracketUDF#65, (cast((Age#21 / 10.0) as int) * 10) AS AgeBracketDiv#79]
               +- BatchEvalPython [age_bracket(Age#21)], [pythonUDF0#162]
                  +- *(1) Filter isnotnull(Age#21)
                     +- FileScan csv [Age#21] Batched: false, DataFilters: [isnotnull(Age#21)], Format: CSV, Location: I

# Exercise 2: Actions and transformations

For each of the following Spark operations, decide if they are transformations or actions. If they are transformations, determine if they are wide or narrow.

* ``select()``
* `groupBy()`
* `filter()`
* `where()`
* `count()`
* `show()`
* `agg()`
* `write()`
* `sort()`

* ``select()`` narrow transformation
* `groupBy()` wide transformation as we must gather data from different partitions 
* `filter()` narrow transformation - we can filter each individual partition 
* `where()` narrow transformation 
* `count()` action, or a wide transformation when done on a grouped dataset
* `show()` action
* `agg()` wide transformation 
* `write()` action
* `sort()` is a wide transformation as we must sort across parititons

# Exercise 3: Exploratory data analysis for the Chicago crime dataset
The Chicago Crime dataset contains a summary of the reported crimes occurred in the City of Chicago from 2001 to 2017. 

We'll work on a sample of it available [here](https://courses.compute.dtu.dk/02807/2021/lectures/week7/reported-crimes.csv).

You may optionally work on the entire dataset which is available [here](https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD).

Now load the data you've downloaded locally into a spark dataframe, and proceed to answer the questions below.

In [14]:
# Your code goes here
rc = spark.read.csv('reported-crimes.csv', header=True)
rc.toPandas().head(3)

Unnamed: 0,ID,CASE_NUMBER,DATE,BLOCK,IUCR,PRIMARY_TYPE,DESCRIPTION,LOCATION_DESCRIPTION,ARREST,DOMESTIC,...,WARD,COMMUNITY_AREA_NUMBER,FBICODE,X_COORDINATE,Y_COORDINATE,YEAR,UPDATEDON,LATITUDE,LONGITUDE,LOCATION
0,3512276,HK587712,08/28/2004 05:50:56 PM,047XX S KEDZIE AVE,890,THEFT,FROM BUILDING,SMALL RETAIL STORE,False,False,...,14,58,6,1155838,1873050,2004,02/10/2018 03:50:01 PM,41.8074405,-87.70395585,"(41.8074405, -87.703955849)"
1,3406613,HK456306,06/26/2004 12:40:00 PM,009XX N CENTRAL PARK AVE,820,THEFT,$500 AND UNDER,OTHER,False,False,...,27,23,6,1152206,1906127,2004,02/28/2018 03:56:25 PM,41.89827996,-87.71640551,"(41.898279962, -87.716405505)"
2,8002131,HT233595,04/04/2011 05:45:00 AM,043XX S WABASH AVE,820,THEFT,$500 AND UNDER,NURSING HOME/RETIREMENT HOME,False,False,...,3,38,6,1177436,1876313,2011,02/10/2018 03:50:01 PM,41.81593313,-87.62464213,"(41.815933131, -87.624642127)"


In [21]:
rc.columns

['ID',
 'CASE_NUMBER',
 'DATE',
 'BLOCK',
 'IUCR',
 'PRIMARY_TYPE',
 'DESCRIPTION',
 'LOCATION_DESCRIPTION',
 'ARREST',
 'DOMESTIC',
 'BEAT',
 'DISTRICT',
 'WARD',
 'COMMUNITY_AREA_NUMBER',
 'FBICODE',
 'X_COORDINATE',
 'Y_COORDINATE',
 'YEAR',
 'UPDATEDON',
 'LATITUDE',
 'LONGITUDE',
 'LOCATION']

## What percentage of reported crimes resulted in an arrest?

In [16]:
# Your code goes here

(rc.filter(F.col('ARREST') == True).count()/rc.count())*100, '% out of', rc.count(), 'reports'

(30.58161350844278, '% out of', 533, 'reports')

## What are the top 3 locations for reported crimes?

In [19]:
# Your code goes here
rc.groupBy('LOCATION_DESCRIPTION').count()\
    .sort(F.desc('count'))\
        .limit(3).toPandas()

Unnamed: 0,LOCATION_DESCRIPTION,count
0,STREET,136
1,RESIDENCE,84
2,SIDEWALK,64


## What are the top 3 locations for reported thefts?

In [20]:
# Your code goes here
rc.filter(F.col('PRIMARY_TYPE') == 'THEFT')\
    .groupBy('LOCATION_DESCRIPTION').count() \
        .sort(F.desc('count')) \
            .limit(3).toPandas()

Unnamed: 0,LOCATION_DESCRIPTION,count
0,STREET,31
1,DEPARTMENT STORE,7
2,GROCERY FOOD STORE,7


## What is the most common primary type of crime in district 22?


In [22]:
# Your code goes here
rc.filter(F.col('DISTRICT') == '22') \
    .groupBy(F.col('PRIMARY_TYPE')).count() \
        .sort(F.desc('count')).show(3)

+------------+-----+
|PRIMARY_TYPE|count|
+------------+-----+
|       THEFT|    6|
|    BURGLARY|    2|
|     BATTERY|    1|
+------------+-----+
only showing top 3 rows



## What are the suffixes where crime is taking place?

Inspect the `BLOCK` column to observe common suffixes are `AVE, BLVD, ST, DR, PL, RD` (there's a few more even). Create a dataframe that contains the frequency of each block suffix, e.g. showing there are 258 `AVE` suffixes and 191 `ST` suffixes.


In [23]:
# Your code goes here
rc.select(
    F.reverse(F.split('BLOCK', ' '))[0].alias('suffix')
).groupby('suffix').count().toPandas().set_index('suffix').T

suffix,PL,DR,AV,BROADWAY,E,PKWY,BL,CT,D,RD,AVE,Ave,TER,BLVD,ST
count,9,11,25,2,1,2,2,1,1,13,258,1,1,15,191
