In [3]:
pip install pyspark

Collecting pyspark
  Using cached pyspark-3.5.5-py2.py3-none-any.whl
Collecting py4j==0.10.9.7 (from pyspark)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.5
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.2 -> 25.0.1
[notice] To update, run: C:\Record Realm\Empire Repo\crime_data_analysis_envi\crime_env\Scripts\python.exe -m pip install --upgrade pip


<span style="font-size:20px"><b>Initializing Spark Session and Loading Data from HDFS</b> <br>
<span style="font-size:17px">This cell creates a SparkSession configured to connect to a local Hadoop Distributed File System (HDFS) and reads the crime dataset (`Crime_Data_from_2020_to_Present.csv`) from the specified HDFS path. The data is loaded with headers and schema inference enabled. The first 10 rows are displayed to verify successful loading, and a row count is performed to confirm the dataset size.</span></span>

In [2]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Crime Data Analysis") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .getOrCreate()

# Read the CSV file from HDFS
df = spark.read.csv("hdfs://localhost:9000/crime_data/Crime_Data_from_2020_to_Present.csv", header=True, inferSchema=True)

# Show first 10 rows
df.show(n=10, truncate=False)
df.count()

+---------+----------------------+----------------------+--------+----+----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+--------+--------+----------------------------------------+------------+-------+---------+
|DR_NO    |Date Rptd             |DATE OCC              |TIME OCC|AREA|AREA NAME |Rpt Dist No|Part 1-2|Crm Cd|Crm Cd Desc                             |Mocodes            |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc                                 |Weapon Used Cd|Weapon Desc|Status|Status Desc |Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|LOCATION                                |Cross Street|LAT    |LON      |
+---------+----------------------+----------------------+--------+----+----------+-----------+--------+------+----------------------------------------+-------------------+---

1005199

In [26]:
df.printSchema()
df.columns

root
 |-- DR_NO: integer (nullable = true)
 |-- Date Rptd: string (nullable = true)
 |-- DATE OCC: string (nullable = true)
 |-- TIME OCC: integer (nullable = true)
 |-- AREA: integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Part 1-2: integer (nullable = true)
 |-- Crm Cd: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Cd: integer (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: integer (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- Crm Cd 1: integer (nullable = true)
 |-- Crm Cd 2: integer (nullable = true)
 |-- Crm Cd 3: integer (nullable = true)
 |-- Crm Cd 4: integer (nullable = true)
 |-- L

['DR_NO',
 'Date Rptd',
 'DATE OCC',
 'TIME OCC',
 'AREA',
 'AREA NAME',
 'Rpt Dist No',
 'Part 1-2',
 'Crm Cd',
 'Crm Cd Desc',
 'Mocodes',
 'Vict Age',
 'Vict Sex',
 'Vict Descent',
 'Premis Cd',
 'Premis Desc',
 'Weapon Used Cd',
 'Weapon Desc',
 'Status',
 'Status Desc',
 'Crm Cd 1',
 'Crm Cd 2',
 'Crm Cd 3',
 'Crm Cd 4',
 'LOCATION',
 'Cross Street',
 'LAT',
 'LON']

In [20]:
from pyspark.sql.functions import col, sum, when, lit

# Count nulls or empty strings for each column
df.select([
    sum(
        when((col(c).isNull()) | (col(c) == "") | (col(c) == " "), 1).otherwise(0)
    ).alias(c + "_missing")
    for c in df.columns
]).show()

+-------------+-----------------+----------------+----------------+------------+-----------------+-------------------+----------------+--------------+-------------------+---------------+----------------+----------------+--------------------+-----------------+-------------------+----------------------+-------------------+--------------+-------------------+----------------+----------------+----------------+----------------+----------------+--------------------+-----------+-----------+
|DR_NO_missing|Date Rptd_missing|DATE OCC_missing|TIME OCC_missing|AREA_missing|AREA NAME_missing|Rpt Dist No_missing|Part 1-2_missing|Crm Cd_missing|Crm Cd Desc_missing|Mocodes_missing|Vict Age_missing|Vict Sex_missing|Vict Descent_missing|Premis Cd_missing|Premis Desc_missing|Weapon Used Cd_missing|Weapon Desc_missing|Status_missing|Status Desc_missing|Crm Cd 1_missing|Crm Cd 2_missing|Crm Cd 3_missing|Crm Cd 4_missing|LOCATION_missing|Cross Street_missing|LAT_missing|LON_missing|
+-------------+---------

<span style="font-size:20px"><b>Converting String Columns to Date Format</b> <br>
<span style="font-size:15px">This cell converts the `Date Rptd` and `DATE OCC` columns from string format to proper Spark `DateType` using the `to_date()` function. This ensures that the date values can be used reliably for filtering, sorting, and time-based analysis throughout the project. After conversion, the schema and a sample of the DataFrame are displayed to confirm the changes.</span></span>

In [3]:
from pyspark.sql.functions import to_date, col

df = df.withColumn("Date Rptd", to_date(col("Date Rptd"), "MM/dd/yyyy hh:mm:ss a"))
df = df.withColumn("DATE OCC", to_date(col("DATE OCC"), "MM/dd/yyyy hh:mm:ss a"))

df.show(10, truncate=False)
df.printSchema()

+---------+----------+----------+--------+----+----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+--------+--------+----------------------------------------+------------+-------+---------+
|DR_NO    |Date Rptd |DATE OCC  |TIME OCC|AREA|AREA NAME |Rpt Dist No|Part 1-2|Crm Cd|Crm Cd Desc                             |Mocodes            |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc                                 |Weapon Used Cd|Weapon Desc|Status|Status Desc |Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|LOCATION                                |Cross Street|LAT    |LON      |
+---------+----------+----------+--------+----+----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+----------------------------------

<span style="font-size:20px"><b>Standardizing 'AREA NAME' Column</b> <br>
<span style="font-size:15px">This cell cleans the `AREA NAME` column by trimming leading/trailing whitespaces and converting all values to uppercase. This standardization ensures consistency when grouping or filtering by area names in later analysis.</span></span>

In [4]:
from pyspark.sql.functions import trim, upper

#AREA NAME: Trim whitespaces and Convert to uppercase for consistency
df = df.withColumn("AREA NAME", upper(trim(col("AREA NAME"))))
df.show(n=10, truncate=False)

+---------+----------+----------+--------+----+----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+--------+--------+----------------------------------------+------------+-------+---------+
|DR_NO    |Date Rptd |DATE OCC  |TIME OCC|AREA|AREA NAME |Rpt Dist No|Part 1-2|Crm Cd|Crm Cd Desc                             |Mocodes            |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc                                 |Weapon Used Cd|Weapon Desc|Status|Status Desc |Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|LOCATION                                |Cross Street|LAT    |LON      |
+---------+----------+----------+--------+----+----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+----------------------------------

<span style="font-size:20px"><b>Cleaning the 'LOCATION' Column</b> <br>
<span style="font-size:15px">This cell standardizes the `LOCATION` column by:<br>
- Trimming leading and trailing whitespaces<br>
- Replacing multiple consecutive spaces (e.g., between street names and abbreviations) with a single space<br>

This improves the consistency of address data and prepares it for reliable grouping or matching operations.</span></span>

In [5]:
#LOCATION: Trim whitespace, Replace empty strings with null, Replace multiple white spaces in between words (especially in between address and street type abbreviation) with single white space
from pyspark.sql.functions import trim, regexp_replace, col

df = df.withColumn("LOCATION", trim(regexp_replace(col("LOCATION"), r"\s+", " ")))
df.show(n=10, truncate=False)

+---------+----------+----------+--------+----+----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+--------+--------+-------------------+------------+-------+---------+
|DR_NO    |Date Rptd |DATE OCC  |TIME OCC|AREA|AREA NAME |Rpt Dist No|Part 1-2|Crm Cd|Crm Cd Desc                             |Mocodes            |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc                                 |Weapon Used Cd|Weapon Desc|Status|Status Desc |Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|LOCATION           |Cross Street|LAT    |LON      |
+---------+----------+----------+--------+----+----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+----

<span style="font-size:20px"><b>Dropping Unnecessary Column: 'Cross Street'</b></span>

In [6]:
# Dropped column Cross Street
df = df.drop("Cross Street")
df.show(n=10, truncate=False)

+---------+----------+----------+--------+----+----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+--------+--------+-------------------+-------+---------+
|DR_NO    |Date Rptd |DATE OCC  |TIME OCC|AREA|AREA NAME |Rpt Dist No|Part 1-2|Crm Cd|Crm Cd Desc                             |Mocodes            |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc                                 |Weapon Used Cd|Weapon Desc|Status|Status Desc |Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|LOCATION           |LAT    |LON      |
+---------+----------+----------+--------+----+----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+-

<span style="font-size:20px"><b>Dropping Unused Crime Code Columns</b></span>

In [7]:
#Dropped columns Crm Cd 3 and Crm Cd 4
df = df.drop("Crm Cd 3")
df = df.drop("Crm Cd 4")
df.show(n=10, truncate=False)

+---------+----------+----------+--------+----+----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+-------------------+-------+---------+
|DR_NO    |Date Rptd |DATE OCC  |TIME OCC|AREA|AREA NAME |Rpt Dist No|Part 1-2|Crm Cd|Crm Cd Desc                             |Mocodes            |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc                                 |Weapon Used Cd|Weapon Desc|Status|Status Desc |Crm Cd 1|Crm Cd 2|LOCATION           |LAT    |LON      |
+---------+----------+----------+--------+----+----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+-------------------+-------+

<span style="font-size:20px"><b>Cleaning 'Vict Age' Column</b> <br>
<span style="font-size:15px">This cell replaces invalid victim ages with `null`. Specifically, any age less than or equal to 0 or greater than 110 is considered unrealistic and is set to `null`. This ensures data quality and accuracy for age-related analyses.</span></span>

In [8]:
from pyspark.sql.functions import when, col

# Replace Invalid Ages with null
df = df.withColumn(
    "Vict Age",
    when((col("Vict Age") <= 0) | (col("Vict Age") > 110), None).otherwise(col("Vict Age"))
)
df.show(n=20, truncate=False)

+---------+----------+----------+--------+----+-----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+-------------------+-------+---------+
|DR_NO    |Date Rptd |DATE OCC  |TIME OCC|AREA|AREA NAME  |Rpt Dist No|Part 1-2|Crm Cd|Crm Cd Desc                             |Mocodes            |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc                                 |Weapon Used Cd|Weapon Desc|Status|Status Desc |Crm Cd 1|Crm Cd 2|LOCATION           |LAT    |LON      |
+---------+----------+----------+--------+----+-----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+-------------------+-----

<span style="font-size:20px"><b>Standardizing 'Vict Sex' Column</b> <br>
This cell cleans and standardizes the `Vict Sex` column by :<br>
- Trimming leading and trailing whitespaces<br>
- Converting all values to uppercase<br>
- Replacing invalid or ambiguous entries (`null`, empty string, `-`, and `H`) with `"X"` to represent unknown or unspecified gender<br>

This ensures consistency and reliability in gender-related analyses.</span></span>

In [9]:
from pyspark.sql.functions import col, when, trim, upper
# Vict Sex: Trims leading/trailing spaces, Converts to uppercase for consistency, Converts all of the following to "X": null, empty string "", dash "-", invalid code "H"

df = df.withColumn(
    "Vict Sex",
    when(
        (col("Vict Sex").isNull()) |
        (trim(col("Vict Sex")) == "") |
        (upper(trim(col("Vict Sex"))).isin("-", "H")),
        "X"
    ).otherwise(upper(trim(col("Vict Sex"))))
)
df.show(n=20, truncate=False)
df.count()

+---------+----------+----------+--------+----+-----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+-------------------+-------+---------+
|DR_NO    |Date Rptd |DATE OCC  |TIME OCC|AREA|AREA NAME  |Rpt Dist No|Part 1-2|Crm Cd|Crm Cd Desc                             |Mocodes            |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc                                 |Weapon Used Cd|Weapon Desc|Status|Status Desc |Crm Cd 1|Crm Cd 2|LOCATION           |LAT    |LON      |
+---------+----------+----------+--------+----+-----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+-------------------+-----

1005199

In [68]:
from pyspark.sql.functions import col

vict_sex_x_count = df.filter(col("Vict Sex") == "X").count()
print(f"Number of victims with Vict Sex = 'X': {vict_sex_x_count}")


Number of victims with Vict Sex = 'X': 242682


<span style="font-size:20px"><b>Standardizing 'Vict Descent' Column</b> <br>
This cell cleans the `Vict Descent` column by:<br>
- Trimming whitespaces<br>
- Converting all values to uppercase for consistency<br>
- Replacing missing or invalid entries (`null`, empty strings, and `-`) with `"X"` to indicate unknown descent<br>

This ensures the descent data is standardized and ready for accurate grouping and analysis.</span></span>

In [10]:
from pyspark.sql.functions import col, when, trim, upper

# filling and replacing missing values (null and '-') with 'X' (unknown)
df = df.withColumn(
    "Vict Descent",
    when(
        (col("Vict Descent").isNull()) |
        (trim(col("Vict Descent")) == "") |
        (trim(col("Vict Descent")) == "-"),
        "X"
    ).otherwise(upper(trim(col("Vict Descent"))))
)
df.select("Vict Descent").distinct().show()
df.show(n=20, truncate=False)


+------------+
|Vict Descent|
+------------+
|           K|
|           F|
|           B|
|           L|
|           V|
|           U|
|           O|
|           D|
|           C|
|           J|
|           Z|
|           A|
|           X|
|           W|
|           S|
|           G|
|           I|
|           P|
|           H|
+------------+

+---------+----------+----------+--------+----+-----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+-------------------+-------+---------+
|DR_NO    |Date Rptd |DATE OCC  |TIME OCC|AREA|AREA NAME  |Rpt Dist No|Part 1-2|Crm Cd|Crm Cd Desc                             |Mocodes            |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc                                 |Weapon Used Cd|Weapon Desc|Status|Status Desc |Crm Cd 1|Crm Cd 2|LOCATION         

In [18]:
df.filter(
    (col("Vict Descent").isNull()) | 
    (trim(col("Vict Descent")) == "") | 
    (trim(col("Vict Descent")) == "-")
).select("Vict Descent").show(truncate=False)

+------------+
|Vict Descent|
+------------+
+------------+



<span style="font-size:20px"><b>Handling Missing 'Premis Cd' Values</b> <br>
<span style="font-size:18px">This cell replaces missing or `null` values in the `Premis Cd` column with `0`. Using `0` as a placeholder allows the data to remain numeric while indicating the absence of a valid premise code.</span></span>

In [11]:
from pyspark.sql.functions import when, col

# Fill missing or null Premis Cd with 0
df = df.withColumn(
    "Premis Cd",
    when(col("Premis Cd").isNull(), 0).otherwise(col("Premis Cd"))
)
df.select("Premis Cd").filter(col("Premis Cd") == 0).count()

16

<span style="font-size:20px"><b>Standardizing 'Premis Desc' Column</b> <br>
<span style="font-size:20px">This cell cleans the `Premis Desc` column by:<br>
- Trimming leading and trailing whitespaces<br>
- Converting all text to uppercase for consistency<br>
- Replacing missing or empty values with `"UNKNOWN PREMISE"` to clearly indicate unknown or unspecified locations<br>

This improves data quality and prepares the column for reliable analysis.</span></span>

In [12]:
from pyspark.sql.functions import trim, upper, when

# Clean Premis Desc, Trim leading/trailing whitespace, Convert to uppercase for consistency, Replace null or blank descriptions with a placeholder like "UNKNOWN PREMISE"

df = df.withColumn(
    "Premis Desc",
    when(
        (col("Premis Desc").isNull()) | (trim(col("Premis Desc")) == ""),
        "UNKNOWN PREMISE"
    ).otherwise(upper(trim(col("Premis Desc"))))
)
df.select("Premis Desc").filter(col("Premis Desc") == "UNKNOWN PREMISE").count()

588

In [22]:
df.show(truncate=False)

+---------+----------+----------+--------+----+-----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+-------------------+-------+---------+
|DR_NO    |Date Rptd |DATE OCC  |TIME OCC|AREA|AREA NAME  |Rpt Dist No|Part 1-2|Crm Cd|Crm Cd Desc                             |Mocodes            |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc                                 |Weapon Used Cd|Weapon Desc|Status|Status Desc |Crm Cd 1|Crm Cd 2|LOCATION           |LAT    |LON      |
+---------+----------+----------+--------+----+-----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+-------------------+-----

<span style="font-size:20px"><b>Handling Missing 'Weapon Used Cd' Values</b> <br>
<span style="font-size:18px">This cell fills missing (`null`) values in the `Weapon Used Cd` column with `0`, indicating that no weapon code was provided. This helps maintain numerical consistency while clearly distinguishing unknown weapon entries.</span></span>

In [13]:
from pyspark.sql.functions import when, col

# Fill null Weapon Used Cd with 0 
df = df.withColumn(
    "Weapon Used Cd",
    when(
        col("Weapon Used Cd").isNull(),  # If NULL
        0                                # Set to 0
    ).otherwise(
        col("Weapon Used Cd")             # Else keep existing int value
    )
)
# Verify no nulls are left
df.filter(col("Weapon Used Cd").isNull()).count()


0

<span style="font-size:20px"><b>Standardizing 'Weapon Desc' Column</b> <br>
<span style="font-size:20px">This cell cleans the `Weapon Desc` column by:<br>
- Replacing `null` or blank values with `"NO WEAPON"` to clearly indicate the absence of a weapon<br>
- Trimming whitespace and converting valid entries to uppercase for consistency<br>

This ensures uniform formatting and improves the accuracy of weapon-related analysis.</span></span>

In [14]:
from pyspark.sql.functions import when, col, trim, upper

# Clean Weapon Desc,If blank or null Fill missing with 'NO WEAPON' Otherwise clean and standardize
df = df.withColumn(
    "Weapon Desc",
    when(
        (col("Weapon Desc").isNull()) | (trim(col("Weapon Desc")) == ""),
        "NO WEAPON" 
    ).otherwise(
        upper(trim(col("Weapon Desc"))) 
    )
)
df.select("Weapon Desc").filter(col("Weapon Desc") == "NO WEAPON").count()
df.show(truncate=False)

+---------+----------+----------+--------+----+-----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+-------------------+-------+---------+
|DR_NO    |Date Rptd |DATE OCC  |TIME OCC|AREA|AREA NAME  |Rpt Dist No|Part 1-2|Crm Cd|Crm Cd Desc                             |Mocodes            |Vict Age|Vict Sex|Vict Descent|Premis Cd|Premis Desc                                 |Weapon Used Cd|Weapon Desc|Status|Status Desc |Crm Cd 1|Crm Cd 2|LOCATION           |LAT    |LON      |
+---------+----------+----------+--------+----+-----------+-----------+--------+------+----------------------------------------+-------------------+--------+--------+------------+---------+--------------------------------------------+--------------+-----------+------+------------+--------+--------+-------------------+-----

In [21]:
# Save cleaned dataframe directly into HDFS

df.coalesce(1) \
  .write \
  .mode("overwrite") \
  .option("header", "true") \
  .csv("hdfs:///crime_data/cleaned_crime_data")