# Why use pyspark?

**PySpark** is an interface for Apache Spark in Python. **Apache Spark** is an open-source unified analytics engine for large-scale data processing.

There is a lot to cover - MLlib, SQL, implementing with AWS (EMR). Source for later reference: https://aws.amazon.com/what-is/apache-spark/


# Setting up Pyspark

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=36af4ccb0b6f779f4b63c410186ae87008931d037ba55dd0ae15650c56e3ffaa
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


# Getting Dataset

In [None]:
!wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD

--2024-04-15 18:50:36--  https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.68.26, 52.206.140.199, 52.206.140.205
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.68.26|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘rows.csv?accessType=DOWNLOAD’

rows.csv?accessType     [    <=>             ]   1.77G  1.53MB/s    in 12m 30s 

2024-04-15 19:03:07 (2.41 MB/s) - ‘rows.csv?accessType=DOWNLOAD’ saved [1898542709]



In [None]:
mv rows.csv\?accessType\=DOWNLOAD reported-crimes.csv

In [None]:
!ls

reported-crimes.csv  sample_data


# Importing Required Libraries

A SparkSession is an entry point into all functionality in Spark, and is required if you want to build a dataframe in PySpark.

In [None]:
## python
import pandas as pd

In [None]:
## pyspark
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Reading Data

In [None]:
## python
df_python = pd.read_csv('reported-crimes.csv')
df_python.head()

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
0,11037294,JA371270,03/18/2015 12:00:00 PM,0000X W WACKER DR,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,BANK,False,False,...,42.0,32.0,11,,,2015,08/01/2017 03:52:26 PM,,,
1,11646293,JC213749,12/20/2018 03:00:00 PM,023XX N LOCKWOOD AVE,1154,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT $300 AND UNDER,APARTMENT,False,False,...,36.0,19.0,11,,,2018,04/06/2019 04:04:43 PM,,,
2,11645836,JC212333,05/01/2016 12:25:00 AM,055XX S ROCKWELL ST,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,,False,False,...,15.0,63.0,11,,,2016,04/06/2019 04:04:43 PM,,,
3,11645959,JC211511,12/20/2018 04:00:00 PM,045XX N ALBANY AVE,2820,OTHER OFFENSE,TELEPHONE THREAT,RESIDENCE,False,False,...,33.0,14.0,08A,,,2018,04/06/2019 04:04:43 PM,,,
4,11645601,JC212935,06/01/2014 12:01:00 AM,087XX S SANGAMON ST,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,RESIDENCE,False,False,...,21.0,71.0,11,,,2014,04/06/2019 04:04:43 PM,,,


In [None]:
## pyspark
df_pyspark = spark.read.csv('reported-crimes.csv', header=True, escape="\"")
df_pyspark.show(5,0) # 0 is to make sure all columns are displayed


+--------+-----------+----------------------+--------------------+----+------------------+---------------------------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+--------+---------+--------+
|ID      |Case Number|Date                  |Block               |IUCR|Primary Type      |Description                            |Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On            |Latitude|Longitude|Location|
+--------+-----------+----------------------+--------------------+----+------------------+---------------------------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------------------+--------+---------+--------+
|11037294|JA371270   |03/18/2015 12:00:00 PM|0000X W WACKER DR   |1153|DECEPTIVE PRACTICE|FINANCIAL IDENTITY THEFT OVER $

## Basic operations
* shape of dataframe = pd.shape
* list of column names = pd.columns
* data types of columns = pd.datatypes

particular column
* value counts = pd.value_counts()
* unique categories in a column = nunique()

In [None]:
## python
df_python.shape

(8038710, 22)

In [None]:
## pyspark
df_pyspark.count(), len(df_pyspark.columns)

(8038710, 22)

In [None]:
## python
df_python.columns

Index(['ID', 'Case Number', 'Date', 'Block', 'IUCR', 'Primary Type',
       'Description', 'Location Description', 'Arrest', 'Domestic', 'Beat',
       'District', 'Ward', 'Community Area', 'FBI Code', 'X Coordinate',
       'Y Coordinate', 'Year', 'Updated On', 'Latitude', 'Longitude',
       'Location'],
      dtype='object')

In [None]:
## pyspark
df_pyspark.columns

['ID',
 'Case Number',
 'Date',
 'Block',
 'IUCR',
 'Primary Type',
 'Description',
 'Location Description',
 'Arrest',
 'Domestic',
 'Beat',
 'District',
 'Ward',
 'Community Area',
 'FBI Code',
 'X Coordinate',
 'Y Coordinate',
 'Year',
 'Updated On',
 'Latitude',
 'Longitude',
 'Location']

In [None]:
## python
df_python.dtypes

ID                        int64
Case Number              object
Date                     object
Block                    object
IUCR                     object
Primary Type             object
Description              object
Location Description     object
Arrest                     bool
Domestic                   bool
Beat                      int64
District                float64
Ward                    float64
Community Area          float64
FBI Code                 object
X Coordinate            float64
Y Coordinate            float64
Year                      int64
Updated On               object
Latitude                float64
Longitude               float64
Location                 object
dtype: object

In [None]:
## pyspark
# data types

## particular column operations

In [None]:
## python
df_python.nunique()

ID                      8038710
Case Number             8038147
Date                    3328866
Block                     63808
IUCR                        405
Primary Type                 36
Description                 552
Location Description        217
Arrest                        2
Domestic                      2
Beat                        305
District                     24
Ward                         50
Community Area               78
FBI Code                     27
X Coordinate              79134
Y Coordinate             130291
Year                         24
Updated On                 6193
Latitude                 897016
Longitude                896418
Location                 898293
dtype: int64

In [None]:
## pyspark
# nunique

In [None]:
## python
df_python['Location Description'].value_counts()

Location Description
STREET               2099971
RESIDENCE            1336471
APARTMENT             927141
SIDEWALK              741818
OTHER                 270005
                      ...   
POOLROOM                   1
ROOF                       1
TRUCKING TERMINAL          1
LAGOON                     1
POLICE FACILITY            1
Name: count, Length: 217, dtype: int64

In [None]:
## pyspark
# df_pyspark.select(df_pyspark.age).show()
df_pyspark.select('Location Description').distinct().count()

218

In [None]:
## pyspark

df_pyspark.groupBy('Location Description').count().orderBy('count', ascending = False).show()

+--------------------+-------+
|Location Description|  count|
+--------------------+-------+
|              STREET|2099971|
|           RESIDENCE|1336471|
|           APARTMENT| 927141|
|            SIDEWALK| 741818|
|               OTHER| 270005|
|PARKING LOT/GARAG...| 202963|
|               ALLEY| 178786|
|  SMALL RETAIL STORE| 154740|
|SCHOOL, PUBLIC, B...| 146373|
|    RESIDENCE-GARAGE| 135523|
|          RESTAURANT| 132660|
|VEHICLE NON-COMME...| 127612|
|RESIDENCE PORCH/H...| 124190|
|    DEPARTMENT STORE| 102920|
|  GROCERY FOOD STORE| 101114|
|         GAS STATION|  89481|
|RESIDENTIAL YARD ...|  75148|
|COMMERCIAL / BUSI...|  63945|
|       PARK PROPERTY|  60447|
|CHA PARKING LOT/G...|  56101|
+--------------------+-------+
only showing top 20 rows



# Data cleaning

*   Identify missing values
*   Correct datatypes
*   Extract month and year
*   Drop rows not needed



In [None]:
## python

df_python.isnull().sum()

ID                           0
Case Number                  0
Date                         0
Block                        0
IUCR                         0
Primary Type                 0
Description                  0
Location Description     12867
Arrest                       0
Domestic                     0
Beat                         0
District                    47
Ward                    614848
Community Area          613472
FBI Code                     0
X Coordinate             88498
Y Coordinate             88498
Year                         0
Updated On                   0
Latitude                 88498
Longitude                88498
Location                 88498
dtype: int64

In [None]:
import pyspark.sql.functions as sf

In [None]:
df_pyspark.select([sf.count(sf.when(sf.isnull(c), c)).alias(c) for c in df_pyspark.columns]).show()

+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+------+--------------+--------+------------+------------+----+----------+--------+---------+--------+
| ID|Case Number|Date|Block|IUCR|Primary Type|Description|Location Description|Arrest|Domestic|Beat|District|  Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On|Latitude|Longitude|Location|
+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+------+--------------+--------+------------+------------+----+----------+--------+---------+--------+
|  0|          0|   0|    0|   0|           0|          0|               12867|     0|       0|   0|      47|614848|        613472|       0|       88498|       88498|   0|         0|   88498|    88498|   88498|
+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+------+--------------+--------+------------+---

# Data Analysis


*   Which location description has most reports
*   Which month and year saw most crimes
*   Bivariate - Time of day and Primary type


#### location description with most reports (most count of rows)

In [None]:
## python
df_python['Location Description'].value_counts()

Location Description
STREET               2099971
RESIDENCE            1336471
APARTMENT             927141
SIDEWALK              741818
OTHER                 270005
                      ...   
POOLROOM                   1
ROOF                       1
TRUCKING TERMINAL          1
LAGOON                     1
POLICE FACILITY            1
Name: count, Length: 217, dtype: int64

In [None]:
## pyspark
df_pyspark.groupBy('Location Description').count().orderBy(desc('count')).show()

+--------------------+-------+
|Location Description|  count|
+--------------------+-------+
|              STREET|2099971|
|           RESIDENCE|1336471|
|           APARTMENT| 927141|
|            SIDEWALK| 741818|
|               OTHER| 270005|
|PARKING LOT/GARAG...| 202963|
|               ALLEY| 178786|
|  SMALL RETAIL STORE| 154740|
|SCHOOL, PUBLIC, B...| 146373|
|    RESIDENCE-GARAGE| 135523|
|          RESTAURANT| 132660|
|VEHICLE NON-COMME...| 127612|
|RESIDENCE PORCH/H...| 124190|
|    DEPARTMENT STORE| 102920|
|  GROCERY FOOD STORE| 101114|
|         GAS STATION|  89481|
|RESIDENTIAL YARD ...|  75148|
|COMMERCIAL / BUSI...|  63945|
|       PARK PROPERTY|  60447|
|CHA PARKING LOT/G...|  56101|
+--------------------+-------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import avg, col, desc
df_pyspark.groupBy('Location Description').agg(countDistinct('ID').alias('crime_count')).orderBy(desc('crime_count')).show()

+--------------------+-----------+
|Location Description|crime_count|
+--------------------+-----------+
|              STREET|    2099971|
|           RESIDENCE|    1336471|
|           APARTMENT|     927141|
|            SIDEWALK|     741818|
|               OTHER|     270005|
|PARKING LOT/GARAG...|     202963|
|               ALLEY|     178786|
|  SMALL RETAIL STORE|     154740|
|SCHOOL, PUBLIC, B...|     146373|
|    RESIDENCE-GARAGE|     135523|
|          RESTAURANT|     132660|
|VEHICLE NON-COMME...|     127612|
|RESIDENCE PORCH/H...|     124190|
|    DEPARTMENT STORE|     102920|
|  GROCERY FOOD STORE|     101114|
|         GAS STATION|      89481|
|RESIDENTIAL YARD ...|      75148|
|COMMERCIAL / BUSI...|      63945|
|       PARK PROPERTY|      60447|
|CHA PARKING LOT/G...|      56101|
+--------------------+-----------+
only showing top 20 rows



#### Date time variable

In [None]:
## python
print(df_python['Date'].dtype)
df_python['Date'] = pd.to_datetime(df_python['Date'])
print(df_python['Date'].dtype)

object
datetime64[ns]


In [None]:
## python
df_python['hour'] = df_python['Date'].dt.hour
df_python['month'] = df_python['Date'].dt.month

In [None]:
## pyspark
df_pyspark.select('Date').dtypes

[('Date', 'string')]

In [None]:
## pyspark
from pyspark.sql.functions import to_date, to_timestamp
df = df_pyspark.select(to_date(df_pyspark.Date, 'MM-dd-yyyy HH:mm:ss').alias('NewDate'))

In [None]:
df.select('NewDate').show(5,0)

+-------+
|NewDate|
+-------+
|NULL   |
|NULL   |
|NULL   |
|NULL   |
|NULL   |
+-------+
only showing top 5 rows



# Build ML model