# Apache PySpark by Example

**[June 2023 update]**

I've consolidated all the notebooks for this course into a single notebook. (The course videos will still show individual notebooks)


## Install Spark

- Google colab recently made some changes which breaks the Spark installation.
- Please use the code below where we install from the pyspark package instead

In [2]:
!pip install pyspark==3.4.0



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/01 03:26:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## (02-02) Download Chicago's Reported Crime Data

### Downloading and preprocessing Chicago's Reported Crime Data

In [8]:
!wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv

--2024-05-01 03:04:52--  https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.140.205, 52.206.140.199, 52.206.68.26
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.140.205|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘rows.csv’

rows.csv                [                <=> ]   1.33G  1.61MB/s               

In [1]:
!mv rows.csv reported-crimes.csv
!ls -l

total 2818296
-rw-r--r--  1 navidshokouhi  staff       33677 May  1 02:53 Apache_PySpark_by_Example.ipynb
-rw-r--r--  1 navidshokouhi  staff         635 May  1 02:53 CONTRIBUTING.md
-rw-r--r--  1 navidshokouhi  staff        6648 May  1 02:53 LICENSE
-rw-r--r--  1 navidshokouhi  staff         617 May  1 02:53 NOTICE
-rw-r--r--  1 navidshokouhi  staff         319 May  1 02:53 README.md
-rw-r--r--  1 navidshokouhi  staff  1428236437 May  1 03:25 reported-crimes.csv
-rw-r--r--  1 navidshokouhi  staff         392 May  1 03:25 wget-log


In [12]:
from pyspark.sql.functions import to_timestamp, col, lit
rc = spark.read.csv('reported-crimes.csv', header=True
                    ).withColumn('Date', to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')
                                 ).filter(col('Date') <= lit('2018-11-11'))
rc.show()

+--------+-----------+-------------------+--------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|      ID|Case Number|               Date|               Block|IUCR|        Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|
+--------+-----------+-------------------+--------------------+----+--------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+
|11037294|   JA371270|2015-03-18 12:00:00|   0000X W WACKER DR|1153|  DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|                B

In [9]:
rc.count()

                                                                                

5202712

## (03-03) Schemas

## (03-04) Working with columns

**Display only the first 5 rows of the column name IUCR**

  **Display only the first 4 rows of the column names Case Number, Date and Arrest**

**Add a column with name One, with entries all 1s**

**Remove the column IUCR**

## (03-05) Working with rows

**Add the reported crimes for an additional day, 12-Nov-2018, to our dataset.**

**What are the top 10 number of reported crimes by Primary type, in descending order of occurence?**

## (03-06) Challenge

**What percentage of reported crimes resulted in an arrest?**

  **What are the top 3 locations for reported crimes?**

## (04-01) Built-in functions

In [None]:
from pyspark.sql import functions

In [None]:
print(dir(functions))

### String functions

**Display the Primary Type column in lower and upper characters, and the first 4 characters of the column**

### Numeric functions


**Show the oldest date and the most recent date**

### Date

**What is 3 days earlier that the oldest date and 3 days later than the most recent date?**

## (04-02) Working with dates

  **2019-12-25 13:30:00**

**25/Dec/2019 13:30:00**

**12/25/2019 01:30:00 PM**

## (04-03) Joins

**Download police station data**

**The reported crimes dataset has only the district number. Add the district name by joining with the police station dataset**

## (04-05) Challenge questions

**What is the most frequently reported non-criminal activity?**

**Using a bar chart, plot which day of the week has the most number of reported crime.**

## (05-01) RDDs setup

**How many police stations are there?**

**Display the District ID, District name, Address and Zip for the police station with District ID 7**



**Police stations 10 and 11 are geographically close to each other. Display the District ID, District name, address and zip code**