<a href="https://colab.research.google.com/github/freddykrugs/crime-arrest-prediction-bigdata/blob/main/BIG_DATA_ASSGT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [74]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CrimeArrestPrediction") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

In [75]:
from pyspark.sql.types import *

schema = StructType([
    StructField("ID", LongType(), True),
    StructField("Case Number", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("Block", StringType(), True),
    StructField("IUCR", StringType(), True),
    StructField("Primary Type", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Location Description", StringType(), True),
    StructField("Arrest", BooleanType(), True),
    StructField("Domestic", BooleanType(), True),
    StructField("Beat", IntegerType(), True),
    StructField("District", IntegerType(), True),
    StructField("Ward", IntegerType(), True),
    StructField("Community Area", IntegerType(), True),
    StructField("FBI Code", StringType(), True),
    StructField("X Coordinate", DoubleType(), True),
    StructField("Y Coordinate", DoubleType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Updated On", StringType(), True),
    StructField("Latitude", DoubleType(), True),
    StructField("Longitude", DoubleType(), True),
    StructField("Location", StringType(), True)
])


In [6]:
df = spark.read.csv(
    file_path,
    header=True,
    schema=schema
)

In [7]:
df.printSchema()
df.count()
df.describe().show()

root
 |-- ID: long (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: double (nullable = true)
 |-- Y Coordinate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)

+-------+-----------------+------------------+--------------------+--------------+------------------+

In [8]:
from pyspark.sql.functions import col, sum

In [9]:
df.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df.columns
]).show()

+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+------+--------------+--------+------------+------------+----+----------+--------+---------+--------+
| ID|Case Number|Date|Block|IUCR|Primary Type|Description|Location Description|Arrest|Domestic|Beat|District|  Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On|Latitude|Longitude|Location|
+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+------+--------------+--------+------------+------------+----+----------+--------+---------+--------+
|  0|          0|   0|    0|   0|           0|          0|               15559|     0|       0|   0|      47|614819|        613686|       0|       94605|       94605|   0|         0|   94605|    94605|   94605|
+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+------+--------------+--------+------------+---

In [10]:
parquet_path = "/content/drive/MyDrive/crimes_parquet"


In [11]:
df.write \
  .mode("overwrite") \
  .partitionBy("Year") \
  .parquet(parquet_path)

In [12]:
df = spark.read.parquet(parquet_path)
df.count()

8493086

In [13]:
columns_to_drop = [
    "Case Number",
    "Updated On",
    "Location",
    "X Coordinate",
    "Y Coordinate"
]

df = df.drop(*columns_to_drop)


In [14]:
from pyspark.sql.functions import to_timestamp

df = df.withColumn(
    "Date",
    to_timestamp("Date", "MM/dd/yyyy hh:mm:ss a")
)

In [15]:
df.printSchema()

root
 |-- ID: long (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Year: integer (nullable = true)



In [16]:
from pyspark.sql.functions import hour, dayofweek, month, when, col

df = df.withColumn("Hour", hour("Date")) \
       .withColumn("DayOfWeek", dayofweek("Date")) \
       .withColumn("Month", month("Date")) \
       .withColumn(
           "IsWeekend",
           when(col("DayOfWeek").isin([1,7]), 1).otherwise(0)
       )

In [17]:
df.select("Date", "Hour", "DayOfWeek", "Month", "IsWeekend").show(5)


+-------------------+----+---------+-----+---------+
|               Date|Hour|DayOfWeek|Month|IsWeekend|
+-------------------+----+---------+-----+---------+
|2002-01-03 06:00:00|   6|        5|    1|        0|
|2002-02-05 22:30:00|  22|        3|    2|        0|
|2002-01-18 16:45:00|  16|        6|    1|        0|
|2002-02-07 07:45:00|   7|        5|    2|        0|
|2002-01-31 23:55:00|  23|        5|    1|        0|
+-------------------+----+---------+-----+---------+
only showing top 5 rows


In [18]:
# Drop rows missing coordinates
df = df.dropna(subset=["Latitude", "Longitude"])

# Fill missing Ward & Community Area
df = df.fillna({
    "Ward": -1,
    "Community Area": -1
})


In [19]:
df.count()

8398481

In [20]:
from pyspark.sql.functions import col

df = df.withColumn("label", col("Arrest").cast("integer"))


In [21]:
df.select("Arrest", "label").show(5)


+------+-----+
|Arrest|label|
+------+-----+
| false|    0|
| false|    0|
|  true|    1|
| false|    0|
|  true|    1|
+------+-----+
only showing top 5 rows


In [22]:
df = df.withColumn("label", col("Arrest").cast("integer"))


In [23]:
columns_to_drop = [
    "ID",
    "Block",
    "Description",
    "IUCR",
    "FBI Code",
    "Arrest"
]

df = df.drop(*columns_to_drop)


In [24]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder


In [25]:
categorical_cols = [
    "Primary Type",
    "Location Description"
]


In [26]:
indexers = [
    StringIndexer(
        inputCol=col,
        outputCol=col + "_index",
        handleInvalid="keep"
    )
    for col in categorical_cols
]


In [27]:
encoder = OneHotEncoder(
    inputCols=[col + "_index" for col in categorical_cols],
    outputCols=[col + "_vec" for col in categorical_cols]
)


In [28]:
from pyspark.ml.feature import VectorAssembler


In [29]:
numeric_cols = [
    "District",
    "Community Area",
    "Beat",
    "Domestic",
    "Hour",
    "DayOfWeek",
    "Month",
    "IsWeekend",
    "Latitude",
    "Longitude"
]


In [30]:
assembler = VectorAssembler(
    inputCols=[col + "_vec" for col in categorical_cols] + numeric_cols,
    outputCol="features"
)


In [31]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

print("Training count:", train_df.count())
print("Test count:", test_df.count())


Training count: 6718113
Test count: 1680368


In [32]:
train_df.groupBy("label").count().show()


+-----+-------+
|label|  count|
+-----+-------+
|    1|1691883|
|    0|5026230|
+-----+-------+



In [33]:
test_df.groupBy("label").count().show()


+-----+-------+
|label|  count|
+-----+-------+
|    1| 423638|
|    0|1256730|
+-----+-------+



In [35]:
from pyspark.sql.functions import sum, col

# Fill missing Location Description
df = df.fillna({
    "Location Description": "Unknown"
})


In [37]:
df = df.dropna(subset=["District"])


In [38]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

print("Training count:", train_df.count())
print("Test count:", test_df.count())


Training count: 6718078
Test count: 1680356


In [40]:
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Define categorical columns
categorical_cols = [
    "Primary Type",
    "Location Description"
]

# Create indexers
indexers = [
    StringIndexer(
        inputCol=col_name,
        outputCol=col_name + "_index",
        handleInvalid="keep"
    )
    for col_name in categorical_cols
]

# Create encoder
encoder = OneHotEncoder(
    inputCols=[col_name + "_index" for col_name in categorical_cols],
    outputCols=[col_name + "_vec" for col_name in categorical_cols]
)

# Define numeric columns
numeric_cols = [
    "District",
    "Community Area",
    "Beat",
    "Domestic",
    "Hour",
    "DayOfWeek",
    "Month",
    "IsWeekend",
    "Latitude",
    "Longitude"
]

# Create assembler
assembler = VectorAssembler(
    inputCols=[col_name + "_vec" for col_name in categorical_cols] + numeric_cols,
    outputCol="features",
    handleInvalid="keep"
)

# Logistic Regression model
lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=20
)

# Build pipeline
pipeline = Pipeline(
    stages=indexers + [encoder, assembler, lr]
)

# Train model
lr_model = pipeline.fit(train_df)



In [41]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = lr_model.transform(test_df)

evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    metricName="areaUnderROC"
)

roc_auc = evaluator.evaluate(predictions)

print("ROC-AUC:", roc_auc)


ROC-AUC: 0.8706727366334376


In [42]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

accuracy_eval = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

precision_eval = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="weightedPrecision"
)

recall_eval = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="weightedRecall"
)

f1_eval = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
)

accuracy = accuracy_eval.evaluate(predictions)
precision = precision_eval.evaluate(predictions)
recall = recall_eval.evaluate(predictions)
f1 = f1_eval.evaluate(predictions)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1-score:", f1)


Accuracy: 0.8657332136761495
Precision: 0.8669323668040327
Recall: 0.8657332136761495
F1-score: 0.8540834306178485


In [43]:
predictions.groupBy("label", "prediction").count().show()


+-----+----------+-------+
|label|prediction|  count|
+-----+----------+-------+
|    1|       0.0| 193049|
|    0|       0.0|1223987|
|    1|       1.0| 230753|
|    0|       1.0|  32567|
+-----+----------+-------+



In [44]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=50,
    maxDepth=10,
    seed=42
)

rf_pipeline = Pipeline(
    stages=indexers + [encoder, assembler, rf]
)

rf_model = rf_pipeline.fit(train_df)


In [45]:
rf_predictions = rf_model.transform(test_df)

roc_auc_rf = evaluator.evaluate(rf_predictions)

print("Random Forest ROC-AUC:", roc_auc_rf)


Random Forest ROC-AUC: 0.8666502071486141


In [46]:
accuracy_rf = accuracy_eval.evaluate(rf_predictions)
precision_rf = precision_eval.evaluate(rf_predictions)
recall_rf = recall_eval.evaluate(rf_predictions)
f1_rf = f1_eval.evaluate(rf_predictions)

print("RF Accuracy:", accuracy_rf)
print("RF Precision:", precision_rf)
print("RF Recall:", recall_rf)
print("RF F1-score:", f1_rf)


RF Accuracy: 0.8453482476332396
RF Precision: 0.8696445802195975
RF Recall: 0.8453482476332397
RF F1-score: 0.8190155817534345


In [47]:
rf_predictions.groupBy("label", "prediction").count().show()


+-----+----------+-------+
|label|prediction|  count|
+-----+----------+-------+
|    1|       0.0| 257999|
|    0|       0.0|1254683|
|    1|       1.0| 165803|
|    0|       1.0|   1871|
+-----+----------+-------+



In [48]:
sample_train = train_df.sample(fraction=0.3, seed=42)

print("Sample training count:", sample_train.count())


Sample training count: 2017017


In [49]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(
    featuresCol="features",
    labelCol="label",
    maxIter=20,
    maxDepth=5,
    seed=42
)

gbt_pipeline = Pipeline(
    stages=indexers + [encoder, assembler, gbt]
)

gbt_model = gbt_pipeline.fit(sample_train)


In [50]:
gbt_predictions = gbt_model.transform(test_df)

roc_auc_gbt = evaluator.evaluate(gbt_predictions)

print("GBT ROC-AUC:", roc_auc_gbt)


GBT ROC-AUC: 0.8711270960288389


In [51]:
accuracy_gbt = accuracy_eval.evaluate(gbt_predictions)
precision_gbt = precision_eval.evaluate(gbt_predictions)
recall_gbt = recall_eval.evaluate(gbt_predictions)
f1_gbt = f1_eval.evaluate(gbt_predictions)

print("GBT Accuracy:", accuracy_gbt)
print("GBT Precision:", precision_gbt)
print("GBT Recall:", recall_gbt)
print("GBT F1-score:", f1_gbt)


GBT Accuracy: 0.8712225266550659
GBT Precision: 0.8789138729298276
GBT Recall: 0.8712225266550659
GBT F1-score: 0.8578116756891243


In [52]:
gbt_predictions.groupBy("label", "prediction").count().show()


+-----+----------+-------+
|label|prediction|  count|
+-----+----------+-------+
|    1|       0.0| 199951|
|    0|       0.0|1240113|
|    1|       1.0| 223851|
|    0|       1.0|  16441|
+-----+----------+-------+



In [53]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


In [54]:
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.0, 0.01, 0.1])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())


In [55]:
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2
)


In [56]:
tune_train = train_df.sample(fraction=0.3, seed=42)

print("Tuning dataset size:", tune_train.count())


Tuning dataset size: 2017017


In [None]:
cv_model = crossval.fit(tune_train)


In [61]:
best_lr_model = cv_model.bestModel


In [62]:
best_lr = best_lr_model.stages[-1]

print("Best regParam:", best_lr.getRegParam())
print("Best elasticNetParam:", best_lr.getElasticNetParam())


Best regParam: 0.0
Best elasticNetParam: 0.0


In [63]:
tuned_predictions = best_lr_model.transform(test_df)

roc_auc_tuned = evaluator.evaluate(tuned_predictions)

accuracy_tuned = accuracy_eval.evaluate(tuned_predictions)
precision_tuned = precision_eval.evaluate(tuned_predictions)
recall_tuned = recall_eval.evaluate(tuned_predictions)
f1_tuned = f1_eval.evaluate(tuned_predictions)

print("Tuned ROC-AUC:", roc_auc_tuned)
print("Tuned Accuracy:", accuracy_tuned)
print("Tuned Precision:", precision_tuned)
print("Tuned Recall:", recall_tuned)
print("Tuned F1:", f1_tuned)


Tuned ROC-AUC: 0.8705789938390706
Tuned Accuracy: 0.8657558279317001
Tuned Precision: 0.8669528420028717
Tuned Recall: 0.8657558279317003
Tuned F1: 0.854112464140273


In [64]:
def evaluate_model(seed_value):

    train_temp, test_temp = df.randomSplit([0.8, 0.2], seed=seed_value)

    model = pipeline.fit(train_temp)
    predictions_temp = model.transform(test_temp)

    roc = evaluator.evaluate(predictions_temp)

    return roc


In [65]:
roc_1 = evaluate_model(10)
roc_2 = evaluate_model(20)
roc_3 = evaluate_model(30)

print("ROC 1:", roc_1)
print("ROC 2:", roc_2)
print("ROC 3:", roc_3)

mean_roc = (roc_1 + roc_2 + roc_3) / 3

import math
std_roc = math.sqrt(((roc_1 - mean_roc)**2 +
                     (roc_2 - mean_roc)**2 +
                     (roc_3 - mean_roc)**2) / 3)

print("Mean ROC:", mean_roc)
print("Std Dev ROC:", std_roc)


ROC 1: 0.871468789856865
ROC 2: 0.8715095320213736
ROC 3: 0.8711949287745907
Mean ROC: 0.8713910835509431
Std Dev ROC: 0.00013969610638002572


In [66]:
import time

def timed_training(fraction):

    subset = train_df.sample(fraction=fraction, seed=42)

    start_time = time.time()
    model = pipeline.fit(subset)
    end_time = time.time()

    duration = end_time - start_time

    predictions = model.transform(test_df)
    roc = evaluator.evaluate(predictions)

    return duration, roc


In [67]:
time_25, roc_25 = timed_training(0.25)
time_50, roc_50 = timed_training(0.5)
time_100, roc_100 = timed_training(1.0)

print("25% - Time:", time_25, "ROC:", roc_25)
print("50% - Time:", time_50, "ROC:", roc_50)
print("100% - Time:", time_100, "ROC:", roc_100)


25% - Time: 174.8285310268402 ROC: 0.8705699122049707
50% - Time: 186.88108086585999 ROC: 0.8705883607838043
100% - Time: 238.69779586791992 ROC: 0.8706729695179269


In [68]:
best_stage = lr_model.stages[-1]

coefficients = best_stage.coefficients
intercept = best_stage.intercept

print("Number of features:", len(coefficients))
print("Intercept:", intercept)


Number of features: 258
Intercept: -32.77642481573792


In [69]:
feature_names = assembler.getInputCols()

for name, coef in zip(feature_names, coefficients):
    print(name, coef)


Primary Type_vec -1.495255690435285
Location Description_vec -0.17885887053423954
District -1.448728398721341
Community Area 6.2078117821602286
Beat -0.29602744295999744
Domestic -0.21218264857574773
Hour -1.4064708253833655
DayOfWeek -1.2676389582907774
Month -0.9923014607416903
IsWeekend -1.3461975243253037
Latitude 1.7638522754587238
Longitude 2.232930967326232


In [70]:
numeric_feature_names = numeric_cols

numeric_coefs = coefficients[-len(numeric_cols):]

for name, coef in zip(numeric_feature_names, numeric_coefs):
    print(name, coef)


District -0.01371088178643883
Community Area -0.0014558517786160369
Beat 0.00012246195900275124
Domestic 0.33331409160203607
Hour 0.0030291751020072805
DayOfWeek -0.003703488115111709
Month -0.0072528574861321984
IsWeekend 0.015809468683874
Latitude -0.05377717390768997
Longitude -0.3868178983467636


In [71]:
lr_model.write().overwrite().save("/content/drive/MyDrive/final_lr_model")


In [72]:
import os
os.listdir("/content/drive/MyDrive/")


['Getting started.pdf',
 'fluid_test[1].pdf',
 'workshop_test[1].pdf',
 '18eng06019 maths exam.pdf',
 '18eng06019 EIS EXAM.pdf',
 '18ENG0619 DAVIES FREDRICK.pdf',
 'MECHANICAL ENGINEERING 18ENG06019 MECHANICS.pdf',
 'ENG DRAWING EXAM 18ENG06019.pdf',
 'mechanical engineering 18eng06019 elect.pdf',
 'FLUID COMPLAINT.pdf',
 'structured programming complaint .pdf',
 '18ENG06019 (3).pdf',
 '18ENG06019 (2).pdf',
 '18ENG06019 (1).pdf',
 '18ENG06019.pdf',
 'ENGMATHS 18ENG06019.xlsx',
 '18 ENG06 019.docx',
 'Adobe Scan Feb 03, 2023.pdf',
 'jamb admission letter .pdf',
 'laptop receipt.pdf',
 'zikora NIN',
 'dumebi NIN',
 'Fred NIN.pdf',
 'Kosi NIN.pdf',
 'Copy of Fred NIN.pdf',
 'Photo from Fred',
 'tech proof.jpeg',
 'Untitled form (File responses)',
 'Untitled form (1).gform',
 'Untitled form.gform',
 'Portfolio.pdf',
 'vtu',
 'introduction-to-graphics.pdf',
 'Adobe Scan 16 Jan 2025.pdf',
 'THE-ULTIMATE-SNIPE-GUIDE.pdf',
 'Saved from Chrome',
 'Transcript.pdf',
 'reference letter .pdf',
 'PE