APAN5400_GroupProject: Chicago Crime Anaysis

Group: Dollar is Coming

Part 1: Colab clean large data which stores in Google Drive by Spark

# Import Packages

In [7]:
# !pip install -U "psycopg[binary]"
# !pip install -U pyspark
# !pip install psycopg2
# !pip install geopandas matplotlib
# !pip install flask pandas

In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import psycopg, os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pandas as pd
from sqlalchemy import create_engine
from pyspark.sql.functions import to_date
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
import os
import sys
import psycopg2
import geopandas as gpd
import plotly.express as px
import json
import plotly

In [1]:
# Make sure in the same version
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
spark = SparkSession \
    .builder \
    .appName("Intro to Apache Spark") \
    .config("spark.cores.max", "4") \
    .config('spark.executor.memory', '8G') \
    .config('spark.driver.maxResultSize', '8g') \
    .config('spark.kryoserializer.buffer.max', '512m') \
    .config("spark.driver.cores", "4") \
    .getOrCreate()
sc = spark.sparkContext

print("Using Apache Spark Version", spark.version)

Using Apache Spark Version 3.5.1


# Read Data

We finish this part through google drive.

In [5]:
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
# Define the filename
filename = '/content/drive/My Drive/Colab Notebooks/Crimes_-_2001_to_Present.csv'

# Load data into Spark DataFrame
crime_df = spark.read.option("header", "true") \
                   .option("delimiter", ",") \
                   .option("inferSchema", "true") \
                   .csv(filename)

crime_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: integer (nullable = true)
 |-- Y Coordinate: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



In [None]:
# Get the size of the CSV file
file_size_gb = os.path.getsize(filename) / (1024 * 1024 * 1024)  # 1 GB = 1024 * 1024 * 1024 bytes

print(f"Size of the CSV file:{file_size_gb}GB")

Size of the CSV file:1.7671623276546597GB


In [None]:
print(f"There are {crime_df.count()} observations of Chicago crime and {len(crime_df.columns)} features: {crime_df.columns}.")

There are 8034263 observations of Chicago crime and 22 features: ['ID', 'Case Number', 'Date', 'Block', 'IUCR', 'Primary Type', 'Description', 'Location Description', 'Arrest', 'Domestic', 'Beat', 'District', 'Ward', 'Community Area', 'FBI Code', 'X Coordinate', 'Y Coordinate', 'Year', 'Updated On', 'Latitude', 'Longitude', 'Location'].


# Data Cleaning

In [None]:
# Rename columns by replacing spaces with underscores
new_column_names = [col.replace(' ', '_') for col in crime_df.columns]
crime_df_cleaning = crime_df
for i in range(len(crime_df.columns)):
    crime_df_cleaning = crime_df_cleaning.withColumnRenamed(crime_df.columns[i], new_column_names[i])

**Columns Explaination:**

ID: Unique identifier for each record.

Case_Number: Unique identifier for each reported case.

Date: Date of the reported incident.

Block: Block where the incident occurred.

IUCR: Illinois Uniform Crime Reporting code, a standardized code for criminal offenses.

Primary_Type: Primary description of the crime.

Description: Detailed description of the crime.

Location_Description: Description of the location where the incident occurred.

Arrest: Indicates whether an arrest was made for the reported crime.

Domestic: Indicates whether the incident involved domestic violence.

Beat: Police beat where the incident occurred.

District: Police district where the incident occurred.

Ward: Ward where the incident occurred.

Community_Area: Community area where the incident occurred.

FBI_Code: FBI crime classification code.

X_Coordinate: X-coordinate of the location where the incident occurred.

Y_Coordinate: Y-coordinate of the location where the incident occurred.

Year: Year of the reported incident.

Updated_On: Date when the record was last updated.

Latitude: Latitude of the location where the incident occurred.

Longitude: Longitude of the location where the incident occurred.

Location: Combined latitude and longitude.

Due to there're a few similar columns appear at the same time (eg. Beat, District, Ward, Community_Area; X_Coordinate, Y_Coordinate, Latitute, Longtitude, Location), so we only keep one of each. There are a few duplicate case numbers, we would like to extract every duplicated row and see how to clean them. Also, Date and Updated_Date have Timestamp which we also do not need, so we only keep dd-mm-yyyy date format later.

### Drop Similar Columns

In [None]:
columns_to_drop = ['Beat', 'District', 'Ward', 'FBI_Code', 'X_Coordinate', 'Y_Coordinate', 'Location']
crime_df_cleaning = crime_df_cleaning.drop(*columns_to_drop)

### Check Duplicate Case Number Observations

In [None]:
# Group by Case_Number and count occurrences
duplicate_case_numbers = crime_df_cleaning.groupBy("Case_Number").agg(F.count("ID").alias("count"))

# Filter out duplicate case numbers with count > 1
duplicate_case_numbers = duplicate_case_numbers.filter("count > 1")

# Show the duplicate case numbers
duplicate_case_numbers.count()

470

In [None]:
# Join duplicate_case_numbers with cb_sdf_cleaning to get all columns for duplicate case numbers
duplicate_rows_with_details = duplicate_case_numbers.join(crime_df_cleaning, "Case_Number", "inner")

# Show the DataFrame with details of duplicate case numbers
duplicate_rows_with_details.show(5)

+-----------+-----+-----+--------------------+--------------------+----+------------+-------------------+--------------------+------+--------+--------------+----+--------------------+------------+-------------+
|Case_Number|count|   ID|                Date|               Block|IUCR|Primary_Type|        Description|Location_Description|Arrest|Domestic|Community_Area|Year|          Updated_On|    Latitude|    Longitude|
+-----------+-----+-----+--------------------+--------------------+----+------------+-------------------+--------------------+------+--------+--------------+----+--------------------+------------+-------------+
|   HH494479|    2| 1598|07/08/2002 01:45:...| 080XX S STEWART AVE|0110|    HOMICIDE|FIRST DEGREE MURDER|                AUTO|  true|   false|            44|2002|09/19/2022 03:41:...| 41.74828207|  -87.6342366|
|   HH494479|    2| 1597|07/08/2002 01:45:...| 080XX S STEWART AVE|0110|    HOMICIDE|FIRST DEGREE MURDER|                AUTO|  true|   false|            44

We can see these duplicate case number all have very serious IUCR and 'First Degree Murder' which indicates these case are serious and have multiple people report this case. So, we want to keep only the newest record for each duplicate case number and drop the duplicates,

In [None]:
# Drop duplicate rows based on Case_Number and keep the newest one
crime_df_cleaning = crime_df_cleaning.orderBy("Updated_On", ascending=False).dropDuplicates(["Case_Number"])

In [None]:
crime_df_cleaning.count()

8033700

In [None]:
# Extract one row to verify whether above action success
crime_df_cleaning.filter(crime_df_cleaning["Case_Number"] == 'HH494479').show()

+----+-----------+----------+-------------------+----+------------+-------------------+--------------------+------+--------+--------------+----+----------+-----------+-----------+
|  ID|Case_Number|      Date|              Block|IUCR|Primary_Type|        Description|Location_Description|Arrest|Domestic|Community_Area|Year|Updated_On|   Latitude|  Longitude|
+----+-----------+----------+-------------------+----+------------+-------------------+--------------------+------+--------+--------------+----+----------+-----------+-----------+
|1598|   HH494479|2002-07-08|080XX S STEWART AVE|0110|    HOMICIDE|FIRST DEGREE MURDER|                AUTO|  true|   false|            44|2002|2022-09-19|41.74828207|-87.6342366|
+----+-----------+----------+-------------------+----+------------+-------------------+--------------------+------+--------+--------------+----+----------+-----------+-----------+



### Convert Date Format

In [None]:
# Convert 'Date' and 'Updated_On' columns to date format
crime_df_cleaned = crime_df_cleaning.withColumn("Date", to_date("Date", "MM/dd/yyyy hh:mm:ss a")) \
                                 .withColumn("Updated_On", to_date("Updated_On", "MM/dd/yyyy hh:mm:ss a"))

In [None]:
crime_df_cleaned.show(5)

+-------+-----------+----------+--------------------+----+------------+-------------------+--------------------+------+--------+--------------+----+----------+------------+-------------+
|     ID|Case_Number|      Date|               Block|IUCR|Primary_Type|        Description|Location_Description|Arrest|Domestic|Community_Area|Year|Updated_On|    Latitude|    Longitude|
+-------+-----------+----------+--------------------+----+------------+-------------------+--------------------+------+--------+--------------+----+----------+------------+-------------+
|  21408|    A417216|2010-09-06|039XX S VINCENNES...|0110|    HOMICIDE|FIRST DEGREE MURDER|                AUTO|  true|   false|            38|2010|2022-09-19|41.823529117|-87.612707977|
|   1900|    C153276|2002-12-05|     055XX W RICE ST|0110|    HOMICIDE|FIRST DEGREE MURDER|              STREET|  true|   false|            25|2002|2022-09-19|41.895773675|-87.764072031|
|   1956|    D212672|2002-12-31|  061XX S DREXEL AVE|0110|    HOM

In [None]:
# Define the start date of COVID-19 pandemic (change if needed)
covid_start_date = '2020-01-01'

# Filter the DataFrame to only include dates from the start of COVID-19 until now
crime_df_during_covid = crime_df_cleaned.filter(col('Date') >= covid_start_date)

In [None]:
crime_df_during_covid.count()

980337

After clean data and extract data since covid, we have 980337 observations now. 

### Save data in drive

##### Complete Cleaned Crime Data

In [None]:
crime_df_cleaned.coalesce(1).write.format("csv").option("header", "true").save('/content/drive/My Drive/Colab Notebooks/crimes_cleaned.csv')

In [5]:
# Define the file path
file_path = '/content/drive/My Drive/Colab Notebooks/crimes_cleaned.csv/part-00000-5fb4f1db-d2d4-4392-ad16-034ead198222-c000.csv'

# Read the CSV file into a Pandas DataFrame
crimes_df = pd.read_csv(file_path)

In [None]:
# Get the size of the CSV file
file_size_GB = os.path.getsize(file_path) / (1024 * 1024 * 1024)  # 1 GB = 1024 * 1024 * 1024 bytes

print(f"Size of the CSV file:{file_size_GB}GB")

Size of the CSV file:1.122369566000998GB


##### Crime Data After Covid

In [None]:
crime_df_during_covid.coalesce(1).write.format("csv").option("header", "true").save('/content/drive/My Drive/Colab Notebooks/crimes_covid.csv')