!pip install pyspark

In [1]:
import os
import shutil
from pathlib import Path
from pyspark.sql import functions
from pyspark.sql.window import Window
from pyspark.sql.session import SparkSession
from concurrent.futures import ThreadPoolExecutor

In [2]:
DATA_DIR =Path(__name__).resolve().parent /'data'
SPARK_SESSION =SparkSession.builder.appName("KCCD Analysis").getOrCreate()

**1. Read Dataset**

In [3]:
def read_csv(path:str)->tuple:
    df =SPARK_SESSION.read.csv(path, inferSchema =True, header =True)
    return path.split('_')[3], df

In [4]:
def read_data(DIR =DATA_DIR):
    dfs, paths ={}, []
    for root, _, files in os.walk(DIR):
        for file in files:
            path =os.path.join(root, file)
            if os.path.exists(path): paths.append(path)
    with ThreadPoolExecutor() as executor:
        jobs = [executor.submit(read_csv, path) for path in paths]
        for job in jobs:
            year, df =job.result()
            dfs[year] =df
    return dfs

In [5]:
datasets =read_data()

**2. Data Exploration and Summary**

In [6]:
def explore_dfs(dfs:dict):
    NULL_FIELDS, dfc =[], []
    title ="Year\t\tColumns\t\tRows\t\tNull Columns\t\tNull Fields"
    print(title)
    print("=="*len(title))
    for year, df in dfs.items():
        cols =df.columns
        dfc =dict(df.dtypes)
        col_nulls_ct =df.select([functions.sum(functions.col(col).isNull().cast("int")).alias(col) for col in df.columns])
        cols_with_nulls = col_nulls_ct.columns
        cols_with_nulls =[]
        for col in cols_with_nulls:
            if col_nulls_ct.select(functions.col(col)).head()[0] >0: cols_with_nulls.append(col)
        cols_with_nulls= col_nulls_ct.select(cols_with_nulls)
        null_counts_dict = cols_with_nulls.first().asDict()
        total_cols =len(cols)
        null_fields =sum([val for val in null_counts_dict.values()])
        total_rows =df.count()
        total_fields =total_rows *total_cols
        percantage_nulls =(null_fields/total_fields) *100
        print(f"{year}\t\t{total_cols}\t\t{total_rows}\t\t{len(null_counts_dict)}\t\t\t{null_fields}/{total_fields} - ({percantage_nulls:.2f}%)")
    
    print("\nColumns")
    print("==="*len(title))
    print(dfc)

In [7]:
explore_dfs(datasets)

Year		Columns		Rows		Null Columns		Null Fields
2009		23		394079		0			0/9063817 - (0.00%)
2010		23		403470		0			0/9279810 - (0.00%)
2011		23		379397		0			0/8726131 - (0.00%)
2012		23		386723		0			0/8894629 - (0.00%)
2013		23		375453		0			0/8635419 - (0.00%)
2014		23		365153		0			0/8398519 - (0.00%)
2015		23		360611		0			0/8294053 - (0.00%)
2016		23		378577		0			0/8707271 - (0.00%)
2017		23		357847		0			0/8230481 - (0.00%)
2018		23		350897		0			0/8070631 - (0.00%)
2019		23		261352		0			0/6011096 - (0.00%)
2020		23		254079		0			0/5843817 - (0.00%)
2021		23		92127		0			0/2118921 - (0.00%)
2022		23		101848		0			0/2342504 - (0.00%)
2023		23		88625		0			0/2038375 - (0.00%)

Columns
{'Report_No': 'string', 'Reported_Date': 'string', 'Reported_Time': 'timestamp', 'From_Date': 'string', 'From_Time': 'timestamp', 'To_Date': 'string', 'To_Time': 'timestamp', 'Offense': 'string', 'IBRS': 'string', 'Description': 'string', 'Beat': 'int', 'Address': 'string', 'City': 'string', 'Zip_Code': 'int', 'Rep

**3. Handling missing Values**

In [8]:
df_clean ={}

In [9]:
def clean_dataset(dfs:dict):
    fill_with_mean =['Age']
    fill_with_u =['Sex']
    fill_with_unknown =['Description', 'Area', 'Rep_Dist']
    for year, df in dfs.items():
        df_clean[year] =df.select([*fill_with_unknown, *fill_with_mean, *fill_with_u,'Zip_Code'])
        df_clean[year] =df_clean[year].na.fill('Uknown', subset =fill_with_unknown)
        df_clean[year] =df_clean[year].na.fill('U', subset =fill_with_u)
        for col in fill_with_mean: 
            df_clean[year] =df_clean[year].withColumn(col, functions.when(df_clean[year][col].isNull(), df_clean[year].select(functions.mean(col)).collect()[0][0]).otherwise(df_clean[year][col]))
        top_n =50
        zip_code_counts =df_clean[year].groupBy("Zip_Code").count()
        sorted_zip_codes =zip_code_counts.orderBy(functions.col("count").desc()).limit(top_n)
        df_clean[year] =df_clean[year].filter(functions.col("Zip_Code").isin([row.Zip_Code for row in sorted_zip_codes.collect()]))
        df_clean[year] =df_clean[year].na.drop(how ='any')
        df_clean[year] =df_clean[year].filter(functions.col('Age') >=5 & functions.col('Age') <=100)


In [10]:
clean_dataset(datasets)

**4. Data Transformation**

In [11]:
def transform_dataset(dfs:dict):
    for year, df in dfs.items():
        df_clean[year] =df_clean[year].withColumn("Age", functions.col("Age").cast("int"))
        df_clean[year] =df_clean[year].withColumn("Zip_Code", functions.col("Zip_Code").cast("int"))

In [12]:
transform_dataset(df_clean)

In [13]:
explore_dfs(df_clean)

Year		Columns		Rows		Null Columns		Null Fields
2009		6		126941		0			0/761646 - (0.00%)
2010		6		131914		0			0/791484 - (0.00%)
2011		6		124601		0			0/747606 - (0.00%)
2012		6		126463		0			0/758778 - (0.00%)
2013		6		121359		0			0/728154 - (0.00%)
2014		6		120140		0			0/720840 - (0.00%)
2015		6		121236		0			0/727416 - (0.00%)
2016		6		127212		0			0/763272 - (0.00%)
2017		6		131436		0			0/788616 - (0.00%)
2018		6		128256		0			0/769536 - (0.00%)
2019		6		88463		0			0/530778 - (0.00%)
2020		6		84128		0			0/504768 - (0.00%)
2021		6		86679		0			0/520074 - (0.00%)
2022		6		95829		0			0/574974 - (0.00%)
2023		6		83301		0			0/499806 - (0.00%)

Columns
{'Description': 'string', 'Area': 'string', 'Rep_Dist': 'string', 'Age': 'int', 'Sex': 'string', 'Zip_Code': 'int'}


In [14]:
df_aggregated, yearly_stats ={}, []

In [15]:
def aggregate_datasets(dfs:dict):
    for year, df in dfs.items():
        df_aggregated[year] ={}
        for col in df.columns:
            df_aggregated[year][col]= df.groupBy(col).agg(functions.count(col).alias("Crimes"))
        yearly_stats.append([str(year), df.count()])

In [16]:
aggregate_datasets(df_clean)

In [17]:
import pandas as pd
yearly_crimes_df = pd.DataFrame(yearly_stats, columns=["Year", "Crimes"])

In [20]:
def write_csv(dfs:dict):
    try:
        if os.path.exists('out'): shutil.rmtree('out')
        else: os.mkdir('out')
        yearly_crimes_df.to_csv('out/Yearly.csv', index=False)
        for year, df in dfs.items(): 
            print(f"Saving {year} dataset")
            for key, rec in df.items(): rec.write.options(header='True', delimiter=',').csv(f"out/{year}/{key}")
    except Exception as e: print(e)

In [21]:
write_csv(df_aggregated)

Saving 2009 dataset
Saving 2010 dataset
Saving 2011 dataset
Saving 2012 dataset
Saving 2013 dataset
Saving 2014 dataset
Saving 2015 dataset
Saving 2016 dataset
Saving 2017 dataset
Saving 2018 dataset
Saving 2019 dataset
Saving 2020 dataset
Saving 2021 dataset
Saving 2022 dataset
Saving 2023 dataset
