# Apache PySpark by Example

**[June 2023 update]**

I've consolidated all the notebooks for this course into a single notebook. (The course videos will still show individual notebooks)


## Introduction to Google Colab

### Jupyter notebook basics

#### Code cells

In [27]:
2*5

10

In [28]:
import pandas as pd

In [29]:
!ls

reported-crimes.csv  sample_data


#### Text cells

### Access to the shell

In [None]:
!pwd

/content


In [None]:
!ls

sample_data


## Install Spark

- Google colab recently made some changes which breaks the Spark installation.
- Please use the code below where we install from the pyspark package instead

In [1]:
!pip install pyspark==3.4.0

Collecting pyspark==3.4.0
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317122 sha256=fcb29c2610a9f3936a009a4ae573d0846f2f0c43377017fb8dc560b1c819d15c
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

## (02-02) Download Chicago's Reported Crime Data

### Downloading and preprocessing Chicago's Reported Crime Data

In [4]:
!wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
!ls -l

--2024-05-27 19:05:39--  https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.140.199, 52.206.68.26, 52.206.140.205
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.140.199|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘rows.csv?accessType=DOWNLOAD’

rows.csv?accessType     [       <=>          ]   1.77G  3.04MB/s    in 10m 33s 

2024-05-27 19:16:12 (2.87 MB/s) - ‘rows.csv?accessType=DOWNLOAD’ saved [1905686757]

total 1861032
-rw-r--r-- 1 root root 1905686757 May 27 11:01 'rows.csv?accessType=DOWNLOAD'
drwxr-xr-x 1 root root       4096 May 23 13:25  sample_data


In [6]:
!mv rows.csv\?accessType\=DOWNLOAD reported-crimes.csv
!ls -l

total 1861032
-rw-r--r-- 1 root root 1905686757 May 27 11:01 reported-crimes.csv
drwxr-xr-x 1 root root       4096 May 23 13:25 sample_data


In [7]:
from pyspark.sql.functions import to_timestamp,col,lit
rc = spark.read.csv('reported-crimes.csv',header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') <= lit('2018-11-11'))
rc.show(5)

+--------+-----------+-------------------+------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|             Block|IUCR|        Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
| 5741943|   HN549294|2007-08-25 09:22:18|074XX N ROGERS AVE|0560|             ASSAULT|              SIMPLE|               OTHER| false|   false|2422|     024|  49|             1|     08A|     

## (03-03) Schemas

In [8]:
rc.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)



In [21]:
from pyspark.sql.types import IntegerType,StringType,BooleanType,TimestampType,DoubleType,StructField,StructType

In [22]:
rc.columns

['ID',
 'Case Number',
 'Date',
 'Block',
 'IUCR',
 'Primary Type',
 'Description',
 'Location Description',
 'Arrest',
 'Domestic',
 'Beat',
 'District',
 'Ward',
 'Community Area',
 'FBI Code',
 'X Coordinate',
 'Y Coordinate',
 'Year',
 'Updated On',
 'Latitude',
 'Longitude',
 'Location']

In [26]:
schema = StructType([
  StructField('ID', StringType, True),
  StructField('Case Number', IntegerType, True),
  'Date',
 'Block',
 'IUCR',
 'Primary Type',
 'Description',
 'Location Description',
 'Arrest',
 'Domestic',
 'Beat',
 'District',
 'Ward',
 'Community Area',
 'FBI Code',
 'X Coordinate',
 'Y Coordinate',
 'Year',
 'Updated On',
 'Latitude',
 'Longitude',
 'Location'
])

AssertionError: dataType <class 'pyspark.sql.types.StringType'> should be an instance of <class 'pyspark.sql.types.DataType'>

In [51]:
labels = [('ID', StringType()),
 ('Case Number', StringType()),
 ('Date', StringType()),
 ('Block', StringType()),
 ('IUCR', StringType()),
 ('Primary Type', StringType()),
 ('Description', StringType()),
 ('Location Description', StringType()),
 ('Arrest', BooleanType()),
 ('Domestic', BooleanType()),
 ('Beat', IntegerType()),
 ('District', IntegerType()),
 ('Ward', IntegerType()),
 ('Community Area', IntegerType()),
 ('FBI Code', IntegerType()),
 ('X Coordinate', StringType()),
 ('Y Coordinate', StringType()),
 ('Year', IntegerType()),
 ('Updated On', StringType()),
 ('Latitude', DoubleType()),
 ('Longitude', DoubleType()),
 ('Location', StringType())]
labels

[('ID', StringType()),
 ('Case Number', StringType()),
 ('Date', StringType()),
 ('Block', StringType()),
 ('IUCR', StringType()),
 ('Primary Type', StringType()),
 ('Description', StringType()),
 ('Location Description', StringType()),
 ('Arrest', BooleanType()),
 ('Domestic', BooleanType()),
 ('Beat', IntegerType()),
 ('District', IntegerType()),
 ('Ward', IntegerType()),
 ('Community Area', IntegerType()),
 ('FBI Code', IntegerType()),
 ('X Coordinate', StringType()),
 ('Y Coordinate', StringType()),
 ('Year', IntegerType()),
 ('Updated On', StringType()),
 ('Latitude', DoubleType()),
 ('Longitude', DoubleType()),
 ('Location', StringType())]

In [59]:
schema = StructType([StructField(x[0],x[1],True) for x in labels])
schema

StructType([StructField('ID', StringType(), True), StructField('Case Number', StringType(), True), StructField('Date', StringType(), True), StructField('Block', StringType(), True), StructField('IUCR', StringType(), True), StructField('Primary Type', StringType(), True), StructField('Description', StringType(), True), StructField('Location Description', StringType(), True), StructField('Arrest', BooleanType(), True), StructField('Domestic', BooleanType(), True), StructField('Beat', IntegerType(), True), StructField('District', IntegerType(), True), StructField('Ward', IntegerType(), True), StructField('Community Area', IntegerType(), True), StructField('FBI Code', IntegerType(), True), StructField('X Coordinate', StringType(), True), StructField('Y Coordinate', StringType(), True), StructField('Year', IntegerType(), True), StructField('Updated On', StringType(), True), StructField('Latitude', DoubleType(), True), StructField('Longitude', DoubleType(), True), StructField('Location', Str

In [60]:
rc = spark.read.csv("reported-crimes.csv", schema=schema)
rc.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: integer (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



In [58]:
rc.show(5)

+--------+-----------+--------------------+--------------------+----+------------+-------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|                Date|               Block|IUCR|Primary Type|        Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+--------------------+--------------------+----+------------+-------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|                Date|               Block|IUCR|Primary Type|        Description|Location Description|  null|    null|null|    nul

## (03-04) Working with columns

**Display only the first 5 rows of the column name IUCR**

In [61]:
rc.select('IUCR').show(5)

+----+
|IUCR|
+----+
|IUCR|
|0560|
|0110|
|0110|
|0620|
+----+
only showing top 5 rows



In [63]:
rc.select(col('IUCR')).show(5)

+----+
|IUCR|
+----+
|IUCR|
|0560|
|0110|
|0110|
|0620|
+----+
only showing top 5 rows



In [64]:
rc.select(rc.IUCR).show(5)

+----+
|IUCR|
+----+
|IUCR|
|0560|
|0110|
|0110|
|0620|
+----+
only showing top 5 rows



  **Display only the first 4 rows of the column names Case Number, Date and Arrest**

In [67]:
rc.select(col('Case Number'),col('Date'),col('Arrest')).show(4)

+-----------+--------------------+------+
|Case Number|                Date|Arrest|
+-----------+--------------------+------+
|Case Number|                Date|  null|
|   HN549294|08/25/2007 09:22:...| false|
|   JE240540|05/24/2021 03:06:...|  true|
|   JE279849|06/26/2021 09:24:...|  true|
+-----------+--------------------+------+
only showing top 4 rows



In [69]:
rc.select('Case Number', 'Date', 'Arrest').show(4)

+-----------+--------------------+------+
|Case Number|                Date|Arrest|
+-----------+--------------------+------+
|Case Number|                Date|  null|
|   HN549294|08/25/2007 09:22:...| false|
|   JE240540|05/24/2021 03:06:...|  true|
|   JE279849|06/26/2021 09:24:...|  true|
+-----------+--------------------+------+
only showing top 4 rows



**Add a column with name One, with entries all 1s**

In [70]:
from pyspark.sql.functions import lit
rc.withColumn('One', lit(1)).show(5)

+--------+-----------+--------------------+--------------------+----+------------+-------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+---+
|      ID|Case Number|                Date|               Block|IUCR|Primary Type|        Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|One|
+--------+-----------+--------------------+--------------------+----+------------+-------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+---+
|      ID|Case Number|                Date|               Block|IUCR|Primary Type|        Description|Location Description|  null|    null|

**Remove the column IUCR**

In [71]:
rc = rc.drop('IUCR')
rc.show(5)

+--------+-----------+--------------------+--------------------+------------+-------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|                Date|               Block|Primary Type|        Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+--------------------+--------------------+------------+-------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|                Date|               Block|Primary Type|        Description|Location Description|  null|    null|null|    null|null|          nul

## (03-05) Working with rows

**Add the reported crimes for an additional day, 12-Nov-2018, to our dataset.**

In [102]:
rc2 = spark.read.csv('reported-crimes.csv',header=True, schema=schema).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') == lit('2018-11-12'))

In [103]:
rc2.count()

4

In [107]:
rc2.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: integer (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



In [99]:
rc.count()

8068563

In [105]:
rc2 = rc2.drop('IUCR') #Compatibility with "(3-04) Working with columns"

In [106]:
rc.union(rc2).orderBy('Date', ascending=True).count()

8068567

**What are the top 10 number of reported crimes by Primary type, in descending order of occurence?**

In [109]:
rc.groupBy('Primary Type').count().show()

+--------------------+-------+
|        Primary Type|  count|
+--------------------+-------+
|OFFENSE INVOLVING...|  57907|
|CRIMINAL SEXUAL A...|   8696|
|            STALKING|   5339|
|PUBLIC PEACE VIOL...|  53269|
|           OBSCENITY|    872|
|               ARSON|  13809|
|   DOMESTIC VIOLENCE|      1|
|            GAMBLING|  14638|
|   CRIMINAL TRESPASS| 219363|
|             ASSAULT| 532253|
|LIQUOR LAW VIOLATION|  15110|
| MOTOR VEHICLE THEFT| 403957|
|               THEFT|1704698|
|             BATTERY|1471786|
|             ROBBERY| 304091|
|            HOMICIDE|  13208|
|           RITUALISM|     24|
|    PUBLIC INDECENCY|    203|
| CRIM SEXUAL ASSAULT|  27485|
|   HUMAN TRAFFICKING|    107|
+--------------------+-------+
only showing top 20 rows



In [114]:
rc.groupBy('Primary Type').count().orderBy('count', ascending=False).show(10)

+-------------------+-------+
|       Primary Type|  count|
+-------------------+-------+
|              THEFT|1704698|
|            BATTERY|1471786|
|    CRIMINAL DAMAGE| 919298|
|          NARCOTICS| 753682|
|            ASSAULT| 532253|
|      OTHER OFFENSE| 501287|
|           BURGLARY| 432419|
|MOTOR VEHICLE THEFT| 403957|
| DECEPTIVE PRACTICE| 364327|
|            ROBBERY| 304091|
+-------------------+-------+
only showing top 10 rows



## (03-06) Challenge

**What percentage of reported crimes resulted in an arrest?**

In [116]:
rc.groupBy('Arrest').count().show()

+------+-------+
|Arrest|  count|
+------+-------+
|  null|      1|
|  true|2070630|
| false|5997932|
+------+-------+



In [117]:
percentage = 2070630.0 / rc.count() * 100
percentage

25.662934031747657

  **What are the top 3 locations for reported crimes?**

In [120]:
rc.groupBy('Location').count().orderBy('count', ascending=False).show(4)

+--------------------+-----+
|            Location|count|
+--------------------+-----+
|                null|88885|
|(41.976290414, -8...|14482|
|(41.754592961, -8...|10568|
|(41.883500187, -8...| 8857|
+--------------------+-----+
only showing top 4 rows



In [121]:
rc.groupBy('Location Description').count().orderBy('count', ascending=False).show(4)

+--------------------+-------+
|Location Description|  count|
+--------------------+-------+
|              STREET|2107672|
|           RESIDENCE|1340273|
|           APARTMENT| 932839|
|            SIDEWALK| 743394|
+--------------------+-------+
only showing top 4 rows



## (04-01) Built-in functions

In [None]:
from pyspark.sql import functions

In [None]:
print(dir(functions))

### String functions

**Display the Primary Type column in lower and upper characters, and the first 4 characters of the column**

### Numeric functions


**Show the oldest date and the most recent date**

### Date

**What is 3 days earlier that the oldest date and 3 days later than the most recent date?**

## (04-02) Working with dates

  **2019-12-25 13:30:00**

**25/Dec/2019 13:30:00**

**12/25/2019 01:30:00 PM**

## (04-03) Joins

**Download police station data**

**The reported crimes dataset has only the district number. Add the district name by joining with the police station dataset**

## (04-05) Challenge questions

**What is the most frequently reported non-criminal activity?**

**Using a bar chart, plot which day of the week has the most number of reported crime.**

## (05-01) RDDs setup

**How many police stations are there?**

**Display the District ID, District name, Address and Zip for the police station with District ID 7**



**Police stations 10 and 11 are geographically close to each other. Display the District ID, District name, address and zip code**