#      Lab 6 - PySpark 

##      INET4710 Spring 2020

### Submitted by: Mahsa Ayoughi


---------------

### Lab Objectives

* Working with the DataFrame API
* Working with columns and rows
* Leveraging built-in Spark functions
* Creating your own functions in Spark
* Working with Resilient Distributed Datasets (RDDs)


Instructions: <br>
** For this lab, run the Jupyter notebook on Google Colab. Execute each cell to display the result from your code.**


## Download and install Spark

In [1]:
!ls

sample_data


In [2]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.173)] [1 InRelease 0 B/88.7 kB 0% [Waiting for headers] [Connected to cloud.r-project.org (13.32.85.81)] [Wait0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Connected to cloud.r-proje                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease [3,626 B]
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [3 InRelease 3,626 B/3,626 0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                            

## Setup environment

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

## Downloading and preprocessing Chicago's Reported Crime Data

In [4]:
!wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
!ls -l

--2020-03-09 02:53:53--  https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.140.199, 52.206.68.26, 52.206.140.205
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.140.199|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘rows.csv?accessType=DOWNLOAD’

rows.csv?accessType     [         <=>        ]   1.55G  3.22MB/s    in 8m 30s  

2020-03-09 03:02:24 (3.12 MB/s) - ‘rows.csv?accessType=DOWNLOAD’ saved [1670224294]

total 1851688
-rw-r--r--  1 root root 1670224294 Mar  8 11:15 'rows.csv?accessType=DOWNLOAD'
drwxr-xr-x  1 root root       4096 Mar  3 18:11  sample_data
drwxrwxr-x 13 1000 1000       4096 Jun  1  2018  spark-2.3.1-bin-hadoop2.7
-rw-r--r--  1 root root  225883783 Jun  1  2018  spark-2.3.1-bin-hadoop2.7.tgz


In [5]:
# I'm going to use the Linux command MV to rename the file.
#rename that file from Rose.csv access type download to reported crimes
!mv rows.csv\?accessType\=DOWNLOAD reported-crimes.csv
!ls -l

total 1851688
-rw-r--r--  1 root root 1670224294 Mar  8 11:15 reported-crimes.csv
drwxr-xr-x  1 root root       4096 Mar  3 18:11 sample_data
drwxrwxr-x 13 1000 1000       4096 Jun  1  2018 spark-2.3.1-bin-hadoop2.7
-rw-r--r--  1 root root  225883783 Jun  1  2018 spark-2.3.1-bin-hadoop2.7.tgz


In [6]:
from pyspark.sql.functions import to_timestamp,col,lit
rc = spark.read.csv('reported-crimes.csv',header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') <= lit('2018-11-11'))
rc.show(5)

+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|               Block|IUCR|       Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11034701|   JA366925|2001-01-01 11:00:00|     016XX E 86TH PL|1153| DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           RESIDENCE| false|   false|0412|     004|   8|            45|      11| 

# Schemas

In [0]:
# Caching this data frame as it's going to be read over and over again
rc = rc.cache()

In [8]:
rc.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)



## Working with columns

**Display only the first 5 rows of the column name IUCR**

In [9]:
rc.select('IUCR').show(5)

+----+
|IUCR|
+----+
|1153|
|0281|
|0620|
|0810|
|0281|
+----+
only showing top 5 rows



  **Display only the first 4 rows of the column names Case Number, Date and Arrest**

In [10]:
rc.select('Case Number','Date','Arrest').show(4)

+-----------+-------------------+------+
|Case Number|               Date|Arrest|
+-----------+-------------------+------+
|   JA366925|2001-01-01 11:00:00| false|
|   JB147188|2017-10-08 03:00:00| false|
|   JB147595|2017-03-28 14:00:00| false|
|   JB147230|2017-09-09 20:17:00| false|
+-----------+-------------------+------+
only showing top 4 rows



**Add a column with name One, with entries all 1s**

In [0]:
from pyspark.sql.functions import lit

In [0]:
new_column= rc.withColumn("One", lit(1))

In [13]:
new_column.show(5)

+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+---+
|      ID|Case Number|               Date|               Block|IUCR|       Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|One|
+--------+-----------+-------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+---+
|11034701|   JA366925|2001-01-01 11:00:00|     016XX E 86TH PL|1153| DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           RESIDENCE| false|   false|0412|     004|   8|            4

**Remove the column IUCR**

In [0]:
rc= rc.drop('IUCR')

In [15]:
rc.show(2)

+--------+-----------+-------------------+------------------+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|             Block|       Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+------------------+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11034701|   JA366925|2001-01-01 11:00:00|   016XX E 86TH PL| DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           RESIDENCE| false|   false|0412|     004|   8|            45|      11|        null|        null|200

## Working with rows

**Add the reported crimes for an additional day, 12-Nov-2018, to our dataset.**

In [0]:
from pyspark.sql import Row

In [0]:
newRow = sc.parallelize([Row(Date='2018-11-12')]).toDF()

In [18]:
rc.show()

+--------+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|               Block|        Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11034701|   JA366925|2001-01-01 11:00:00|     016XX E 86TH PL|  DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           RESIDENCE| false|   false|0412|     004|   8|            45|      11|        null|    

**What are the top 10 number of reported crimes by Primary type, in descending order of occurence?**

In [19]:
rc.select("Primary Type").distinct().count()

35

In [0]:
from pyspark.sql.functions import desc

In [0]:
rc_count = rc.groupBy("Primary Type").count()

In [22]:
rc_count.sort(desc("count")).show(10)

+-------------------+-------+
|       Primary Type|  count|
+-------------------+-------+
|              THEFT|1418417|
|            BATTERY|1232216|
|    CRIMINAL DAMAGE| 771497|
|          NARCOTICS| 711646|
|      OTHER OFFENSE| 418826|
|            ASSAULT| 418507|
|           BURGLARY| 388028|
|MOTOR VEHICLE THEFT| 314142|
| DECEPTIVE PRACTICE| 265409|
|            ROBBERY| 255599|
+-------------------+-------+
only showing top 10 rows



**What percentage of reported crimes resulted in an arrest?**

In [23]:
((rc.where(rc["Arrest"] == "true").count())/rc.count()) * 100

27.753961840099954

  **What are the top 3 locations for reported crimes?**

In [0]:
rc_location= rc.groupBy("Location Description").count()

In [25]:
rc_location.sort(desc("count")).show(3)

+--------------------+-------+
|Location Description|  count|
+--------------------+-------+
|              STREET|1770570|
|           RESIDENCE|1144566|
|           APARTMENT| 698122|
+--------------------+-------+
only showing top 3 rows



## Built-in functions

In [0]:
from pyspark.sql import functions

In [27]:
print(dir(functions))



## String functions

**Display the Primary Type column in lower and upper characters, and the first 4 characters of the column**

In [28]:
rc_lower =rc.withColumn("lower_Primary Type",functions.lower(col("Primary Type")))
rc_lower.select('Primary Type','lower_Primary Type').show(2)

+-------------------+-------------------+
|       Primary Type| lower_Primary Type|
+-------------------+-------------------+
| DECEPTIVE PRACTICE| deceptive practice|
|CRIM SEXUAL ASSAULT|crim sexual assault|
+-------------------+-------------------+
only showing top 2 rows



In [29]:
rc_upper =rc.withColumn("upper_Primary Type",functions.upper(col("Primary Type")))
rc_upper.select('Primary Type','upper_Primary Type').show(2)

+-------------------+-------------------+
|       Primary Type| upper_Primary Type|
+-------------------+-------------------+
| DECEPTIVE PRACTICE| DECEPTIVE PRACTICE|
|CRIM SEXUAL ASSAULT|CRIM SEXUAL ASSAULT|
+-------------------+-------------------+
only showing top 2 rows



In [0]:
rc_4char = rc.withColumn("4char_Primary Type", functions.rpad(col("Primary Type"), 4,''))

In [31]:
rc_4char.select('Primary Type','4char_Primary Type').show(2)

+-------------------+------------------+
|       Primary Type|4char_Primary Type|
+-------------------+------------------+
| DECEPTIVE PRACTICE|              DECE|
|CRIM SEXUAL ASSAULT|              CRIM|
+-------------------+------------------+
only showing top 2 rows



## Numeric functions

**Show the oldest date and the most recent date**

In [32]:
rc.select(functions.max(col('Date'))).show()

+-------------------+
|          max(Date)|
+-------------------+
|2018-11-10 23:55:00|
+-------------------+



In [33]:
rc.select(functions.min(col('Date'))).show()

+-------------------+
|          min(Date)|
+-------------------+
|2001-01-01 00:00:00|
+-------------------+



## Working with joins

**Download police station data**

In [34]:
!wget -O police-stations.csv https://data.cityofchicago.org/api/views/z8bn-74gv/rows.csv?accessType=DOWNLOAD
!ls -l

--2020-03-09 03:06:40--  https://data.cityofchicago.org/api/views/z8bn-74gv/rows.csv?accessType=DOWNLOAD
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.140.199, 52.206.68.26, 52.206.140.205
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.140.199|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘police-stations.csv’

police-stations.csv     [<=>                 ]       0  --.-KB/s               police-stations.csv     [ <=>                ]   5.57K  --.-KB/s    in 0s      

2020-03-09 03:06:41 (704 MB/s) - ‘police-stations.csv’ saved [5699]

total 1851700
-rw-r--r--  1 root root       5699 Aug 19  2019 police-stations.csv
-rw-r--r--  1 root root 1670224294 Mar  8 11:15 reported-crimes.csv
drwxr-xr-x  1 root root       4096 Mar  3 18:11 sample_data
drwxrwxr-x 13 1000 1000       4096 Jun  1  2018 spark-2.3.1-bin-hadoop2.7
-rw-r--r--  1 root root  225883783 Jun  1  2018 spark-2.3.1-bin-ha

In [0]:
ps = spark.read.csv('police-stations.csv',header=True)

In [36]:
ps.printSchema()

root
 |-- DISTRICT: string (nullable = true)
 |-- DISTRICT NAME: string (nullable = true)
 |-- ADDRESS: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- ZIP: string (nullable = true)
 |-- WEBSITE: string (nullable = true)
 |-- PHONE: string (nullable = true)
 |-- FAX: string (nullable = true)
 |-- TTY: string (nullable = true)
 |-- X COORDINATE: string (nullable = true)
 |-- Y COORDINATE: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- LOCATION: string (nullable = true)



In [37]:
ps.show(5)

+------------+--------------+--------------------+-------+-----+-----+--------------------+------------+------------+------------+------------+------------+-----------+------------+--------------------+
|    DISTRICT| DISTRICT NAME|             ADDRESS|   CITY|STATE|  ZIP|             WEBSITE|       PHONE|         FAX|         TTY|X COORDINATE|Y COORDINATE|   LATITUDE|   LONGITUDE|            LOCATION|
+------------+--------------+--------------------+-------+-----+-----+--------------------+------------+------------+------------+------------+------------+-----------+------------+--------------------+
|Headquarters|  Headquarters| 3510 S Michigan Ave|Chicago|   IL|60653|http://home.chica...|        null|        null|        null| 1177731.401| 1881697.404|41.83070169|-87.62339535|(41.8307016873, -...|
|           1|       Central|     1718 S State St|Chicago|   IL|60616|http://home.chica...|312-745-4290|312-745-3694|312-745-3693| 1176569.052| 1891771.704|41.85837259|-87.62735617|(41.858

**The reported crimes dataset has only the district number. Add the district name by joining with the police station dataset**

In [0]:
df = rc.join(ps, on=['DISTRICT'], how='left')

In [39]:
df.show(5)

+--------+--------+-----------+-------------------+--------------------+-------------------+--------------------+--------------------+------+--------+----+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+-------------+-------+----+-----+----+-------+-----+----+----+------------+------------+--------+---------+--------+
|District|      ID|Case Number|               Date|               Block|       Primary Type|         Description|Location Description|Arrest|Domestic|Beat|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|DISTRICT NAME|ADDRESS|CITY|STATE| ZIP|WEBSITE|PHONE| FAX| TTY|X COORDINATE|Y COORDINATE|LATITUDE|LONGITUDE|LOCATION|
+--------+--------+-----------+-------------------+--------------------+-------------------+--------------------+--------------------+------+--------+----+----+--------------+--------+------------+------------+----+--------------------+--

**Find the most frequently reported noncriminal activity**

In [0]:
df_count = df.groupBy("Primary Type").count()

In [41]:
df_count.where(df_count["Primary Type"] == "NON-CRIMINAL").show()

+------------+-----+
|Primary Type|count|
+------------+-----+
|NON-CRIMINAL|  164|
+------------+-----+



**Find the day of the week with the most reported crime**

In [42]:
a = df.groupBy("Date").count()
a.sort(desc("count")).show(1)

+-------------------+-----+
|               Date|count|
+-------------------+-----+
|2008-01-01 00:01:00|  296|
+-------------------+-----+
only showing top 1 row



**Using a bar chart, plot which day of the week has the most number of reported crime.**

In [0]:
pip install pyspark_dist_explore

Collecting pyspark_dist_explore
  Downloading https://files.pythonhosted.org/packages/3c/33/2b6c29265413f2b56516caf02b8befbb6a79a1a3516d57bf1b0742a1be40/pyspark_dist_explore-0.1.8-py3-none-any.whl
Installing collected packages: pyspark-dist-explore
Successfully installed pyspark-dist-explore-0.1.8


In [0]:
import seaborn as sns
import pandas as pd

In [0]:
a= df.withColumn("week_day", functions.date_format(col("Date"), "E")).count()

In [0]:
num_crimes = pd.DataFrame(rc.rdd.map(lambda e: e.asDict()).collect())

## RDDs setup

#### Please use Chicago's police station dataset

In [0]:
#!wget -O police-stations.csv https://data.cityofchicago.org/api/views/z8bn-74gv/rows.csv?accessType=DOWNLOAD
#!ls -l

In [0]:
ps = spark.read.csv('police-stations.csv',header=True)

**How many police stations are there?**

In [90]:
ps.select("ADDRESS").distinct().count()

24

**Display the District ID, District name, Address and Zip for the police station with District ID 7**

In [95]:
ps.select('DISTRICT','DISTRICT NAME','ADDRESS','ZIP').where(ps["DISTRICT"] == 7).show()

+--------+-------------+--------------+-----+
|DISTRICT|DISTRICT NAME|       ADDRESS|  ZIP|
+--------+-------------+--------------+-----+
|       7|    Englewood|1438 W 63rd St|60636|
+--------+-------------+--------------+-----+



**Police stations 10 and 11 are geographically close to each other. Display the District ID, District name, address and zip code**

In [118]:
ps.select('DISTRICT','DISTRICT NAME','ADDRESS','ZIP').where(ps['DISTRICT'] == 10).show()
ps.select('DISTRICT','DISTRICT NAME','ADDRESS','ZIP').where(ps['DISTRICT'] == 11).show()

+--------+-------------+----------------+-----+
|DISTRICT|DISTRICT NAME|         ADDRESS|  ZIP|
+--------+-------------+----------------+-----+
|      10|        Ogden|3315 W Ogden Ave|60623|
+--------+-------------+----------------+-----+

+--------+-------------+------------------+-----+
|DISTRICT|DISTRICT NAME|           ADDRESS|  ZIP|
+--------+-------------+------------------+-----+
|      11|     Harrison|3151 W Harrison St|60612|
+--------+-------------+------------------+-----+

