# Chicago Crime Analysis and Pipeline with PySpark

#### Goals


Some goals for this project:
- Do some simple EDA on *Chicago Crime* from [Kaggle](https://www.kaggle.com/)

All this will be done using **[PySpark](https://spark.apache.org/docs/latest/api/python/)**

<hr>

#### Import Libraries

In [1]:
try:
    from pyspark.sql import SparkSession
    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    from pyspark.sql import DataFrame
    import pyspark.sql.functions as F
    import pandas as pd
    import numpy as np
    import glob
    from functools import reduce
    
    print('[SUCCESS]')
except ImportError as ie:
    raise ImportError(f'[Error importing]: {ie}')

[SUCCESS]


**INITIALIZE SESSION**

In [2]:
sc = SparkContext('local')
spark = SparkSession(sc)

<hr>

#### Read in our DATA

In [3]:
PATH = 'DATA' # FOLDER CONTAINING FILES
csv_files = glob.glob(PATH + '/*.csv') # GET ALL CSV FILES


# CREATE FUNCTION TO READ IN THE DATA AND MERGE FILES
def merge_csv(files):
    df = spark.read.options(header = True).csv(files)
    
    return df

df = merge_csv(csv_files)

In [4]:
df.show(1)

+---+-----------+--------+--------------------+-----------------+------------+-----------+--------------------+---------+--------+-----+--------+----+--------------+--------+------------+------------+----+----------+--------------------+---------+--------+
| ID|Case Number|    Date|               Block|             IUCR|Primary Type|Description|Location Description|   Arrest|Domestic| Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On|            Latitude|Longitude|Location|
+---+-----------+--------+--------------------+-----------------+------------+-----------+--------------------+---------+--------+-----+--------+----+--------------+--------+------------+------------+----+----------+--------------------+---------+--------+
|879|    4786321|HM399414|01/01/2004 12:01:...|082XX S COLES AVE|        0840|      THEFT|FINANCIAL ID THEF...|RESIDENCE|   False|False|     424| 4.0|           7.0|    46.0|          06|        null|null|      2004|08/17/2015 03

<hr>

#### Look at our Schema

Looking at schema allows us to see how our column types are set up.

In [6]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)



Because our schema has the appropriate data types for each column, we don't have to create a custom schema. 

#### Exploratory Data Analysis

Goals:
- Dataset overview
    - NA values
    - Dimensions of the data
- Description on columns (Important ones chosen by me)
    - Description
    - Arrest
    - Year
    - 


In [7]:
'''
FUNCTION TO FIND NULL/NA VALUES IN DATAFRAME
'''
def null_values(df):
    return df.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns])

df_null = null_values(df)

In [8]:
df_null.show()

+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+------+----------+--------+---------+--------+
| ID|Case Number|Date|Block|IUCR|Primary Type|Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|  Year|Updated On|Latitude|Longitude|Location|
+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+------+----------+--------+---------+--------+
|  0|          1|   7|    0|   0|           0|          0|                   0|  1990|       0|   0|       0|  91|        700224|  702091|           0|      105573|105573|         0|       0|   105573|  105574|
+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------------+-----

Below is the output of the number of NA values in our dateframe per column
```python
+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+
| ID|Case Number|Date|Block|IUCR|Primary Type|Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|
+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+
|  0|          1|   7|    0|   0|           0|          0|                   0|  1990|       0|   0|       0|  91|        700224|
+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+
+--------+------------+------------+------+----------+--------+---------+--------+
|FBI Code|X Coordinate|Y Coordinate|  Year|Updated On|Latitude|Longitude|Location|
+--------+------------+------------+------+----------+--------+---------+--------+
|  702091|           0|      105573|105573|         0|       0|   105573|  105574|
+--------+------------+------------+------+----------+--------+---------+--------+
```

In [9]:
'''
FUNCTION TO GET DIMENSIONS OF DATAFRAME
'''
def GET_DIMENSIONS(df):
    return (df.count(), len(df.columns))

# -----
print(f'Number of rows: {GET_DIMENSIONS(df)[0]} \nNumber of columns: {GET_DIMENSIONS(df)[1]}')

Number of rows: 7941286 
Number of columns: 22


#### Group By Functions


In [24]:
new_df = df
new_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)



In [25]:
new_df.show(5)

+----+-----------+--------+--------------------+-------------------+------------+--------------------+--------------------+---------+--------+-----+--------+----+--------------+--------+------------+------------+---------+----------+--------------------+------------+-------------+
|  ID|Case Number|    Date|               Block|               IUCR|Primary Type|         Description|Location Description|   Arrest|Domestic| Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|     Year|Updated On|            Latitude|   Longitude|     Location|
+----+-----------+--------+--------------------+-------------------+------------+--------------------+--------------------+---------+--------+-----+--------+----+--------------+--------+------------+------------+---------+----------+--------------------+------------+-------------+
| 879|    4786321|HM399414|01/01/2004 12:01:...|  082XX S COLES AVE|        0840|               THEFT|FINANCIAL ID THEF...|RESIDENCE|   False|False|     4

In [38]:
desc_df = new_df.groupby('Description').agg(F.count('ID').alias('Count'))


'''
SHOW FULL CONTENT
'''
desc_df.show(desc_df.count(), False)

+---------------------------------+-------+
|Description                      |Count  |
+---------------------------------+-------+
|OFFENSE INVOLVING CHILDREN       |51441  |
|STALKING                         |3734   |
|PUBLIC PEACE VIOLATION           |58548  |
|OBSCENITY                        |496    |
|ARSON                            |13097  |
|DOMESTIC VIOLENCE                |2      |
|GAMBLING                         |18806  |
|CRIMINAL TRESPASS                |229367 |
|ASSAULT                          |481661 |
|LIQUOR LAW VIOLATION             |17513  |
|MOTOR VEHICLE THEFT              |370548 |
|THEFT                            |1640506|
|BATTERY                          |1442717|
|ROBBERY                          |300453 |
|HOMICIDE                         |9051   |
|RITUALISM                        |31     |
|PUBLIC INDECENCY                 |163    |
|CRIM SEXUAL ASSAULT              |29868  |
|INTIMIDATION                     |4636   |
|PROSTITUTION                   

In [39]:
sc.stop()