In [1]:
import pandas as pd
import numpy as nm
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType, FloatType
from pyspark.sql.functions import col, column
from pyspark.sql.functions import expr
from pyspark.sql.functions import split
from pyspark.sql import Row
import csv
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, IndexToString, StandardScaler, PCA
from pyspark.ml.classification import DecisionTreeClassifier

---

## Initialize a SparkSession, define the csv Schema

In [2]:
ss = SparkSession.builder.appName("Project").getOrCreate()

In [3]:
schema = StructType([ StructField("ID", IntegerType(), False ), \
                        StructField("Case Number", StringType(), False), \
                        StructField("Date", StringType(), False ), \
                        StructField("Block", StringType(), False ), \
                        StructField("IUCR", StringType(), False), \
                        StructField("Primary Type", StringType(), False), \
                        StructField("Description", StringType(), False),\
                        StructField("Location Description", StringType(), False), \
                        StructField("Arrest", StringType(), False), \
                        StructField("Domestic", StringType(), False), \
                        StructField("District", StringType(), False) ,\
                        StructField("Ward", StringType(), False ), \
                        StructField("Community Area", StringType(), False ), \
                        StructField("FBI Code", StringType(), False), \
                        StructField("Year", StringType(), False), \
                        StructField("Latitude", StringType(), False),\
                        StructField("Longitude", StringType(), False)
                           ])

In [4]:
# new_file3.csv is the file generated by using Suvarna's code
data = ss.read.csv("./new_file3.csv",schema=schema,header=True, inferSchema=False)

In [5]:
data = data.drop('Case Number').drop('ID')

In [6]:
# Convert 'Date' to hours
data = data.withColumn('Hour', pyspark.sql.functions.hour(pyspark.sql.functions.to_timestamp('Date', 'MM/dd/yyyy hh:mm:ss a')))

---

## Apply PCA Reduction (for model training only, not for visualization)
#### Reason: I tried directly using Lab8's code before. However, I continously got an error saying the data's cardinality is too high and aborted the task, which means the dataset's size is too large. To reduce the dataset's size, I need to use PCA reduction first to reduce the size and then use the reduced data to train the model.

`Step 1: Transforms a column of string to a new column of index (type double)`

In [7]:
# This block will create a new column for each indexed column, for example, will create a 'i_date' for indexed 'Date' column
columns_to_index = ["Date", "Primary Type", "Description", "IUCR","Year","Block",
                    "Location Description", "Arrest", "Domestic", "District","Ward","FBI Code","Community Area", 
                    "Latitude", "Longitude", "Hour"]
label_indexers = {col: StringIndexer(inputCol=col, outputCol=f"i_{col.replace(' ', '_').lower()}").fit(data) for col in columns_to_index}

In [8]:
# This block will transform data
transformed_data = data
for indexer in label_indexers.values():
    transformed_data = indexer.transform(transformed_data)

In [9]:
# data2 will contain the indexed values only, which means they are all integer -> ready for training
data2 = transformed_data.select("i_date","i_iucr","i_primary_type","i_description","i_location_description",\
                    "i_arrest","i_year","i_block","i_domestic","i_district","i_ward","i_fbi_code","i_community_area","i_latitude","i_longitude","i_hour")

In [10]:
input_features = ["i_date","i_iucr","i_primary_type","i_description","i_location_description","i_year","i_block",\
                    "i_domestic","i_district","i_ward","i_fbi_code","i_community_area","i_latitude","i_longitude","i_arrest"]

`Step 2: Create a vector-assembler to combine individual features into a vector in a new column, which called 'features_for_arrest'.`

In [11]:
assembler = VectorAssembler(inputCols=input_features, outputCol="features_for_arrest")

In [12]:
assembled_data = assembler.transform(data2)

`Step 3: Use a StandardScaler to center the data - saved the centered data into a new column called "scaled_features"`

In [13]:
scaler = StandardScaler(inputCol="features_for_arrest", outputCol="scaled_features", withStd=True, withMean=True)
scaler_model = scaler.fit(assembled_data)
scaled_data = scaler_model.transform(assembled_data)

In [14]:
print(scaled_data)

DataFrame[i_date: double, i_iucr: double, i_primary_type: double, i_description: double, i_location_description: double, i_arrest: double, i_year: double, i_block: double, i_domestic: double, i_district: double, i_ward: double, i_fbi_code: double, i_community_area: double, i_latitude: double, i_longitude: double, i_hour: double, features_for_arrest: vector, scaled_features: vector]


---

## Train the model

####  Train the model with the reduced data. I used RandomForestClassifier instead of PySpark DecisionTree.

In [15]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler

In [16]:
# Convert PySpark DataFrame to Pandas DataFrame
# "pcaFeatures" is the hyperparameter that predicts arrest or not
# "i_arrest" is our target variable
pandas_df = scaled_data.select("scaled_features", "i_hour").toPandas()

In [17]:
# Extract PCA features and target variable
X = pandas_df["scaled_features"].tolist()
y = pandas_df["i_hour"].tolist()

In [18]:
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

In [19]:
# Standardize the data again just in case
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

In [20]:
# Train the RandomForestClassifier
model = RandomForestClassifier()
model.fit(X_train_scaled, y_train)

`Above is the model that predicts the value of Arrest`

In [21]:
# Make predictions on the test set
predictions = model.predict(X_test_scaled)

---

## Evaluate the model

In [22]:
# Use test_data to evaluate our model 
accuracy = accuracy_score(y_test, predictions)
print("Accuracy: {:.2f}%".format(accuracy * 100))

Accuracy: 8.59%
