In [1]:
import findspark
findspark.init()
import pyspark
findspark.find()

'C:\\Spark\\spark-3.2.3-bin-hadoop2.7'

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col, count, when, year, month, dayofmonth, to_timestamp, hour
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [3]:
# Read in dependencies
from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler
from pyspark.sql.types import * 

from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql.functions import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [4]:
App_maker = SparkSession.builder.appName("Crime Analysis").getOrCreate()


In [5]:
cores = App_maker._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
App_maker

You are working with 1 core(s)


In [6]:
Crime_DB = App_maker.read.csv('crime_in_la.csv',inferSchema=True,header=True)
type(Crime_DB)

pyspark.sql.dataframe.DataFrame

In [7]:
Crime_DB.limit(6).show()

+---------+----------+--------+----+-----------+-----------+------+--------------------+-------------------+--------+--------+------------+--------------------+--------------+--------------------+------+-----------+--------------------+-------+---------+
|    DR_NO|  DATE OCC|TIME OCC|AREA|  AREA NAME|Rpt Dist No|Crm Cd|         Crm Cd Desc|            Mocodes|Vict Age|Vict Sex|Vict Descent|         Premis Desc|Weapon Used Cd|         Weapon Desc|Status|Status Desc|            LOCATION|    LAT|      LON|
+---------+----------+--------+----+-----------+-----------+------+--------------------+-------------------+--------+--------+------------+--------------------+--------------+--------------------+------+-----------+--------------------+-------+---------+
| 10304468|08/01/2020|   22:30|   3|  Southwest|        377|   624|BATTERY - SIMPLE ...|          0444 0913|      36|       F|           B|SINGLE FAMILY DWE...|         400.0|STRONG-ARM (HANDS...|    AO|Adult Other|1100 W  39TH     ...

In [8]:
Crime_DB.limit(6).toPandas() # since representation in jupiter is looking shabby, i decided to use the pandas library within

Unnamed: 0,DR_NO,DATE OCC,TIME OCC,AREA,AREA NAME,Rpt Dist No,Crm Cd,Crm Cd Desc,Mocodes,Vict Age,Vict Sex,Vict Descent,Premis Desc,Weapon Used Cd,Weapon Desc,Status,Status Desc,LOCATION,LAT,LON
0,10304468,08/01/2020,22:30,3,Southwest,377,624,BATTERY - SIMPLE ASSAULT,0444 0913,36,F,B,SINGLE FAMILY DWELLING,400.0,"STRONG-ARM (HANDS, FIST, FEET OR BODILY FORCE)",AO,Adult Other,1100 W 39TH PL,34.0141,-118.2978
1,190101086,01/01/2020,03:30,1,Central,163,624,BATTERY - SIMPLE ASSAULT,0416 1822 1414,25,M,H,SIDEWALK,500.0,UNKNOWN WEAPON/OTHER WEAPON,IC,Invest Cont,700 S HILL ST,34.0459,-118.2545
2,191501505,01/01/2020,17:30,15,N Hollywood,1543,745,VANDALISM - MISDEAMEANOR ($399 OR UNDER),0329 1402,76,F,W,"MULTI-UNIT DWELLING (APARTMENT, DUPLEX, ETC)",500.0,UNKNOWN WEAPON/OTHER WEAPON,IC,Invest Cont,5400 CORTEEN PL,34.1685,-118.4019
3,191921269,01/01/2020,04:15,19,Mission,1998,740,"VANDALISM - FELONY ($400 & OVER, ALL CHURCH VA...",0329,31,X,X,BEAUTY SUPPLY STORE,500.0,UNKNOWN WEAPON/OTHER WEAPON,IC,Invest Cont,14400 TITUS ST,34.2198,-118.4468
4,200100501,01/01/2020,00:30,1,Central,163,121,"RAPE, FORCIBLE",0413 1822 1262 1415,25,F,H,NIGHT CLUB (OPEN EVENINGS ONLY),500.0,UNKNOWN WEAPON/OTHER WEAPON,IC,Invest Cont,700 S BROADWAY,34.0452,-118.2534
5,200100502,02/01/2020,13:15,1,Central,161,442,SHOPLIFTING - PETTY THEFT ($950 & UNDER),1402 2004 0344 0387,23,M,H,DEPARTMENT STORE,500.0,UNKNOWN WEAPON/OTHER WEAPON,IC,Invest Cont,700 S FIGUEROA ST,34.0483,-118.2631


In [9]:
# Number of rows
num_rows = Crime_DB.count()

# Number of columns
num_columns = len(Crime_DB.columns)

# Print the shape as a tuple (rows, columns)
print((num_rows, num_columns))

(495410, 20)


In [10]:
Crime_DB.printSchema()

root
 |-- DR_NO: integer (nullable = true)
 |-- DATE OCC: string (nullable = true)
 |-- TIME OCC: string (nullable = true)
 |-- AREA: integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Crm Cd: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: double (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- LAT: double (nullable = true)
 |-- LON: double (nullable = true)



In [11]:
Crime_DB.describe().toPandas()

Unnamed: 0,summary,DR_NO,DATE OCC,TIME OCC,AREA,AREA NAME,Rpt Dist No,Crm Cd,Crm Cd Desc,Mocodes,...,Vict Sex,Vict Descent,Premis Desc,Weapon Used Cd,Weapon Desc,Status,Status Desc,LOCATION,LAT,LON
0,count,495410.0,495410,495410,495410.0,495410,495410.0,495410.0,495410,495410,...,495410,495410,495197,495410.0,495410,495410,495410,495410,495410.0,495410.0
1,mean,209146527.51808804,,,10.768563412123292,,1123.1374296037625,504.6749661896207,,336.21128368453964,...,,,,450.56462525988576,,,,10.0,33.917113663227525,-117.81127728023286
2,stddev,7395200.17626745,,,6.068705496455603,,606.946832394658,209.1590619674228,,553.2213722458159,...,,,,99.54303567290512,,,,,2.305901851064038,8.000843601228736
3,min,817.0,01/01/2020,00:01,1.0,77th Street,101.0,110.0,ARSON,0,...,F,A,7TH AND METRO CENTER (NOT LINE SPECIFIC),101.0,AIR PISTOL/REVOLVER/RIFLE/BB GUN,AA,Adult Arrest,00 17TH AV,0.0,-118.6676
4,max,229910450.0,31/12/2021,23:59,21.0,Wilshire,2199.0,956.0,WEAPONS POSSESSION/BOMBING,9999 2004,...,X,Z,YARD (RESIDENTIAL/BUSINESS),516.0,VERBAL THREAT,JO,Juv Other,ZOO DR,34.3343,0.0


### Data Preprocessing

In [12]:
# check for duplicates using the add and subtract method

# Original row count
original_row_count = Crime_DB.count()

# Drop duplicates
data_no_duplicates = Crime_DB.dropDuplicates()

# Row count after dropping duplicates
new_row_count = data_no_duplicates.count()

# Calculate the number of duplicate rows
duplicate_rows = original_row_count - new_row_count

print(f"Original row count: {original_row_count}")
print(f"New row count: {new_row_count}")
print(f"Duplicate rows: {duplicate_rows}")


Original row count: 495410
New row count: 495410
Duplicate rows: 0


In [13]:
# Calculate the number of missing values for each column
missing_values = Crime_DB.select([count(when(col(c).isNull(), c)).alias(c) for c in Crime_DB.columns])

# Show the results
missing_values.toPandas()


Unnamed: 0,DR_NO,DATE OCC,TIME OCC,AREA,AREA NAME,Rpt Dist No,Crm Cd,Crm Cd Desc,Mocodes,Vict Age,Vict Sex,Vict Descent,Premis Desc,Weapon Used Cd,Weapon Desc,Status,Status Desc,LOCATION,LAT,LON
0,0,0,0,0,0,0,0,0,0,0,0,0,213,0,0,0,0,0,0,0


In [14]:
# Replace 'column_name' with the actual name of the string column to be transformed in the code
column_name = "Premis Desc"

New_CDB = Crime_DB.withColumn(column_name, when(col(column_name).isNull(), "Unknown").otherwise(col(column_name)))

In [15]:
# check if application worked
missing_values = New_CDB.select([count(when(col(c).isNull(), c)).alias(c) for c in New_CDB.columns])
missing_values_dict = missing_values.collect()[0].asDict()

columns_with_missing_values = {k: v for k, v in missing_values_dict.items() if v > 0}
print(columns_with_missing_values)

{}


In [16]:
# date and time columns are represented as strings, thus, we should extract and transform into date-time stamps
from pyspark.sql.functions import unix_timestamp

CDBase1 = New_CDB.withColumn("datetime_occ_str", concat_ws(" ", "DATE OCC", "TIME OCC"))
CDBase2 = CDBase1.withColumn("datetime_occ", to_timestamp("datetime_occ_str", "dd/MM/yyyy HH:mm"))

CDBase = CDBase2.drop("DATE OCC", "TIME OCC")

In [29]:
CDBase2.limit(5).toPandas()

Unnamed: 0,DR_NO,DATE OCC,TIME OCC,AREA,AREA NAME,Rpt Dist No,Crm Cd,Crm Cd Desc,Mocodes,Vict Age,...,Premis Desc,Weapon Used Cd,Weapon Desc,Status,Status Desc,LOCATION,LAT,LON,datetime_occ_str,datetime_occ
0,10304468,08/01/2020,22:30,3,Southwest,377,624,BATTERY - SIMPLE ASSAULT,0444 0913,36,...,SINGLE FAMILY DWELLING,400.0,"STRONG-ARM (HANDS, FIST, FEET OR BODILY FORCE)",AO,Adult Other,1100 W 39TH PL,34.0141,-118.2978,08/01/2020 22:30,2020-01-08 22:30:00
1,190101086,01/01/2020,03:30,1,Central,163,624,BATTERY - SIMPLE ASSAULT,0416 1822 1414,25,...,SIDEWALK,500.0,UNKNOWN WEAPON/OTHER WEAPON,IC,Invest Cont,700 S HILL ST,34.0459,-118.2545,01/01/2020 03:30,2020-01-01 03:30:00
2,191501505,01/01/2020,17:30,15,N Hollywood,1543,745,VANDALISM - MISDEAMEANOR ($399 OR UNDER),0329 1402,76,...,"MULTI-UNIT DWELLING (APARTMENT, DUPLEX, ETC)",500.0,UNKNOWN WEAPON/OTHER WEAPON,IC,Invest Cont,5400 CORTEEN PL,34.1685,-118.4019,01/01/2020 17:30,2020-01-01 17:30:00
3,191921269,01/01/2020,04:15,19,Mission,1998,740,"VANDALISM - FELONY ($400 & OVER, ALL CHURCH VA...",0329,31,...,BEAUTY SUPPLY STORE,500.0,UNKNOWN WEAPON/OTHER WEAPON,IC,Invest Cont,14400 TITUS ST,34.2198,-118.4468,01/01/2020 04:15,2020-01-01 04:15:00
4,200100501,01/01/2020,00:30,1,Central,163,121,"RAPE, FORCIBLE",0413 1822 1262 1415,25,...,NIGHT CLUB (OPEN EVENINGS ONLY),500.0,UNKNOWN WEAPON/OTHER WEAPON,IC,Invest Cont,700 S BROADWAY,34.0452,-118.2534,01/01/2020 00:30,2020-01-01 00:30:00


In [17]:
# check for null values within the datetime column
problematic_rows = CDBase2.filter(CDBase2["datetime_occ"].isNull())
problematic_rows.select("datetime_occ_str").distinct().show(truncate=False)

+----------------+
|datetime_occ_str|
+----------------+
+----------------+



In [18]:
# now apply for the extraction of the unique date and time stamp
CDBase = CDBase.withColumn("year", year("datetime_occ"))
CDBase = CDBase.withColumn("month", month("datetime_occ"))
CDBase = CDBase.withColumn("day", dayofmonth("datetime_occ"))
CDBase = CDBase.withColumn("hour", hour("datetime_occ"))

In [21]:
# CDBase = CDBase.dropna()
CDBase.limit(5).toPandas()

Unnamed: 0,DR_NO,AREA,AREA NAME,Rpt Dist No,Crm Cd,Crm Cd Desc,Mocodes,Vict Age,Vict Sex,Vict Descent,...,Status Desc,LOCATION,LAT,LON,datetime_occ_str,datetime_occ,year,month,day,hour
0,10304468,3,Southwest,377,624,BATTERY - SIMPLE ASSAULT,0444 0913,36,F,B,...,Adult Other,1100 W 39TH PL,34.0141,-118.2978,08/01/2020 22:30,2020-01-08 22:30:00,2020,1,8,22
1,190101086,1,Central,163,624,BATTERY - SIMPLE ASSAULT,0416 1822 1414,25,M,H,...,Invest Cont,700 S HILL ST,34.0459,-118.2545,01/01/2020 03:30,2020-01-01 03:30:00,2020,1,1,3
2,191501505,15,N Hollywood,1543,745,VANDALISM - MISDEAMEANOR ($399 OR UNDER),0329 1402,76,F,W,...,Invest Cont,5400 CORTEEN PL,34.1685,-118.4019,01/01/2020 17:30,2020-01-01 17:30:00,2020,1,1,17
3,191921269,19,Mission,1998,740,"VANDALISM - FELONY ($400 & OVER, ALL CHURCH VA...",0329,31,X,X,...,Invest Cont,14400 TITUS ST,34.2198,-118.4468,01/01/2020 04:15,2020-01-01 04:15:00,2020,1,1,4
4,200100501,1,Central,163,121,"RAPE, FORCIBLE",0413 1822 1262 1415,25,F,H,...,Invest Cont,700 S BROADWAY,34.0452,-118.2534,01/01/2020 00:30,2020-01-01 00:30:00,2020,1,1,0


In [22]:
# check for validation of application
CDBase.printSchema()

root
 |-- DR_NO: integer (nullable = true)
 |-- AREA: integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Crm Cd: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: double (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- LAT: double (nullable = true)
 |-- LON: double (nullable = true)
 |-- datetime_occ_str: string (nullable = false)
 |-- datetime_occ: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)



In [23]:
CDBase.groupBy("Crm Cd", "Crm Cd Desc").count().show(100)

+------+--------------------+-----+
|Crm Cd|         Crm Cd Desc|count|
+------+--------------------+-----+
|   626|INTIMATE PARTNER ...|25788|
|   230|ASSAULT WITH DEAD...|29314|
|   654|CREDIT CARDS, FRA...|   42|
|   821|SODOMY/SEXUAL CON...|  280|
|   943|  CRUELTY TO ANIMALS|  132|
|   237|CHILD NEGLECT (SE...|  624|
|   649|DOCUMENT FORGERY ...| 1603|
|   810|SEX,UNLAWFUL(INC ...|  550|
|   473|THEFT, COIN MACHI...|    5|
|   822|HUMAN TRAFFICKING...|  255|
|   888|         TRESPASSING| 6725|
|   434|  FALSE IMPRISONMENT|  170|
|   420|THEFT FROM MOTOR ...|21146|
|   235|CHILD ABUSE (PHYS...|  338|
|   320| BURGLARY, ATTEMPTED| 2096|
|   670|EMBEZZLEMENT, PET...|   49|
|   956|LETTERS, LEWD  - ...| 4138|
|   652|DOCUMENT WORTHLES...|   19|
|   933|             PROWLER|  103|
|   850|   INDECENT EXPOSURE|  634|
|   331|THEFT FROM MOTOR ...|15613|
|   310|            BURGLARY|30507|
|   814|   CHILD PORNOGRAPHY|  137|
|   485|BIKE - ATTEMPTED ...|    6|
|   666|      BUNCO, ATTEMPT

In [24]:
from pyspark.sql import functions as F

# Group by 'Crm Cd' column, count the rows, and sort the result in ascending order
result = CDBase.groupBy("Crm Cd").count().orderBy(F.col("count").asc())

# Show the result
result.show()


+------+-----+
|Crm Cd|count|
+------+-----+
|   349|    1|
|   432|    1|
|   924|    2|
|   884|    2|
|   906|    3|
|   452|    3|
|   904|    3|
|   830|    3|
|   347|    4|
|   840|    4|
|   948|    4|
|   473|    5|
|   113|    5|
|   942|    5|
|   475|    5|
|   470|    6|
|   485|    6|
|   880|    7|
|   451|    7|
|   446|    7|
+------+-----+
only showing top 20 rows



In [25]:
# Group by 'Crm Cd' column, count the rows, and sort the result in ascending order
result = CDBase.groupBy("Crm Cd").count().orderBy(F.col("count").desc())

# Show the result
result.show()


+------+-----+
|Crm Cd|count|
+------+-----+
|   510|54601|
|   624|39612|
|   330|32071|
|   740|31969|
|   310|30507|
|   230|29314|
|   440|26244|
|   626|25788|
|   354|25295|
|   420|21146|
|   210|17605|
|   331|15613|
|   745|15455|
|   341|15053|
|   930|10354|
|   442| 8090|
|   761| 7831|
|   236| 7073|
|   888| 6725|
|   901| 6481|
+------+-----+
only showing top 20 rows



## Feature Engineering

In [26]:
# Calculate the counts for each 'Crm Cd'
counts = CDBase.groupBy("Crm Cd").count()

# Define the ranges and create a new column 'group'
counts = counts.withColumn(
    "Crm_type_group",
    F.when((F.col("count") >= 15000), "1")
    .when((F.col("count") >= 6000) & (F.col("count") < 15000), "2")
    .when((F.col("count") >= 1000) & (F.col("count") < 6000), "3")
    .otherwise("4"),
)

# Show the result
counts.show()


+------+-----+--------------+
|Crm Cd|count|Crm_type_group|
+------+-----+--------------+
|   471|   13|             4|
|   623| 1474|             3|
|   251|  998|             4|
|   756|   21|             4|
|   210|17605|             1|
|   762|  371|             4|
|   626|25788|             1|
|   806|   62|             4|
|   236| 7073|             2|
|   350| 2266|             3|
|   860| 2215|             3|
|   822|  255|             4|
|   330|32071|             1|
|   230|29314|             1|
|   625| 2434|             3|
|   122|  168|             4|
|   654|   42|             4|
|   660|   78|             4|
|   473|    5|             4|
|   435|   11|             4|
+------+-----+--------------+
only showing top 20 rows



In [27]:
# Join the CDBase DataFrame with the counts DataFrame on the 'Crm Cd' column
CDBasex = CDBase.join(counts, on="Crm Cd", how="inner")

# Drop the 'count' column as it's not needed anymore
CDBase = CDBasex.drop("count")

# Show the result
CDBase.show(5)


+------+---------+----+---------+-----------+--------------------+--------------------+--------+--------+------------+--------------------+--------------+--------------------+------+-----------+--------------------+-------+---------+----------------+-------------------+----+-----+---+----+--------------+
|Crm Cd|    DR_NO|AREA|AREA NAME|Rpt Dist No|         Crm Cd Desc|             Mocodes|Vict Age|Vict Sex|Vict Descent|         Premis Desc|Weapon Used Cd|         Weapon Desc|Status|Status Desc|            LOCATION|    LAT|      LON|datetime_occ_str|       datetime_occ|year|month|day|hour|Crm_type_group|
+------+---------+----+---------+-----------+--------------------+--------------------+--------+--------+------------+--------------------+--------------+--------------------+------+-----------+--------------------+-------+---------+----------------+-------------------+----+-----+---+----+--------------+
|   471|200507137|   5|   Harbor|        505|TILL TAP - PETTY ...|0342 0380 0104 1

In [28]:
CDBase.limit(5).toPandas()

Unnamed: 0,Crm Cd,DR_NO,AREA,AREA NAME,Rpt Dist No,Crm Cd Desc,Mocodes,Vict Age,Vict Sex,Vict Descent,...,LOCATION,LAT,LON,datetime_occ_str,datetime_occ,year,month,day,hour,Crm_type_group
0,471,200507137,5,Harbor,505,TILL TAP - PETTY ($950 & UNDER),0342 0380 0104 1027 0324 0344 1236 1309,0,X,X,...,900 W 190TH ST,33.8592,-118.2899,13/03/2020 17:10,2020-03-13 17:10:00,2020,3,13,17,4
1,623,200104047,1,Central,153,BATTERY POLICE (SIMPLE),2004 1212 0917 0910 1822 0417 2048,0,X,X,...,500 S HILL ST,34.0488,-118.2518,01/01/2020 09:05,2020-01-01 09:05:00,2020,1,1,9,3
2,623,200104048,1,Central,153,BATTERY POLICE (SIMPLE),2004 1212 0917 0910 1822 0417 2048,0,X,X,...,500 S HILL ST,34.0488,-118.2518,01/01/2020 09:05,2020-01-01 09:05:00,2020,1,1,9,3
3,623,200104319,1,Central,127,BATTERY POLICE (SIMPLE),1212 0356,0,X,X,...,100 N LOS ANGELES ST,34.0515,-118.2424,04/01/2020 23:45,2020-01-04 23:45:00,2020,1,4,23,3
4,623,200105481,1,Central,163,BATTERY POLICE (SIMPLE),0417 1822 1212,48,M,H,...,700 S BROADWAY,34.0452,-118.2534,22/01/2020 17:55,2020-01-22 17:55:00,2020,1,22,17,3


In [26]:
CDBase.printSchema()

root
 |-- Crm Cd: integer (nullable = true)
 |-- DR_NO: integer (nullable = true)
 |-- AREA: integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: double (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- LAT: double (nullable = true)
 |-- LON: double (nullable = true)
 |-- datetime_occ_str: string (nullable = false)
 |-- datetime_occ: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- Crm_type_group: s

In [27]:
CDBase_new = CDBase.withColumn('Crm_type_group', col('Crm_type_group').cast('integer'))

In [28]:
CDBase_new.printSchema()

root
 |-- Crm Cd: integer (nullable = true)
 |-- DR_NO: integer (nullable = true)
 |-- AREA: integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: double (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- LAT: double (nullable = true)
 |-- LON: double (nullable = true)
 |-- datetime_occ_str: string (nullable = false)
 |-- datetime_occ: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- Crm_type_group: i

In [29]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

categorical_columns = ["Vict Sex", "Vict Descent", "Premis Desc"]

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in categorical_columns]

# set the stages from the list of indexers above 
pipeline = Pipeline(stages=indexers)


In [30]:
model = pipeline.fit(CDBase_new)
# here we fit the 

In [31]:
# then transform the dataset using the fitted pipeline
CDBase_indexed = model.transform(CDBase_new)


In [32]:
CDBase_indexed.printSchema()

root
 |-- Crm Cd: integer (nullable = true)
 |-- DR_NO: integer (nullable = true)
 |-- AREA: integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: double (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- LAT: double (nullable = true)
 |-- LON: double (nullable = true)
 |-- datetime_occ_str: string (nullable = false)
 |-- datetime_occ: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- Crm_type_group: i

In [33]:
CDBase_new = CDBase_indexed[["Vict Sex_index", "Vict Descent_index", "Premis Desc_index", "AREA", "Crm Cd", "Vict Age", 
    "Weapon Used Cd", "LAT", "LON", "year", "month", "day", "hour", "Crm_type_group"]]

In [34]:
CDBase_new.show(10)

+--------------+------------------+-----------------+----+------+--------+--------------+-------+---------+----+-----+---+----+--------------+
|Vict Sex_index|Vict Descent_index|Premis Desc_index|AREA|Crm Cd|Vict Age|Weapon Used Cd|    LAT|      LON|year|month|day|hour|Crm_type_group|
+--------------+------------------+-----------------+----+------+--------+--------------+-------+---------+----+-----+---+----+--------------+
|           2.0|               1.0|              4.0|   5|   471|       0|         500.0|33.8592|-118.2899|2020|    3| 13|  17|             4|
|           2.0|               1.0|            114.0|   1|   623|       0|         400.0|34.0488|-118.2518|2020|    1|  1|   9|             3|
|           2.0|               1.0|            114.0|   1|   623|       0|         400.0|34.0488|-118.2518|2020|    1|  1|   9|             3|
|           2.0|               1.0|            109.0|   1|   623|       0|         400.0|34.0515|-118.2424|2020|    1|  4|  23|             3|

In [35]:
input_columns = CDBase_new.columns
input_columns

['Vict Sex_index',
 'Vict Descent_index',
 'Premis Desc_index',
 'AREA',
 'Crm Cd',
 'Vict Age',
 'Weapon Used Cd',
 'LAT',
 'LON',
 'year',
 'month',
 'day',
 'hour',
 'Crm_type_group']

In [36]:
input_columns = input_columns[0:-1]
input_columns

['Vict Sex_index',
 'Vict Descent_index',
 'Premis Desc_index',
 'AREA',
 'Crm Cd',
 'Vict Age',
 'Weapon Used Cd',
 'LAT',
 'LON',
 'year',
 'month',
 'day',
 'hour']

In [37]:
target_variable = "Crm_type_group"

In [38]:
rename = CDBase_new.withColumn("Crm_type_group", CDBase_new[target_variable]) 
indexer = StringIndexer(inputCol="Crm_type_group", outputCol="label") #Pyspark is expecting the this naming convention 
CDBase_indexed = indexer.fit(rename).transform(rename)

In [39]:
# select the columns to be used for the vector assembly
input_cols = ["Vict Sex_index", "Vict Descent_index", "Premis Desc_index", "AREA", "Crm Cd", "Vict Age", 
    "Weapon Used Cd", "LAT", "LON", "year", "month", "day", "hour"]

# create a VectorAssembler object with the input columns
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")

# transform the DataFrame using the VectorAssembler
output_CDB = assembler.transform(CDBase_indexed).select(["features", "Crm_type_group"])

# display the transformed DataFrame
output_CDB.show()

+--------------------+--------------+
|            features|Crm_type_group|
+--------------------+--------------+
|[2.0,1.0,4.0,5.0,...|             4|
|[2.0,1.0,114.0,1....|             3|
|[2.0,1.0,114.0,1....|             3|
|[2.0,1.0,109.0,1....|             3|
|[0.0,0.0,5.0,1.0,...|             3|
|[0.0,0.0,5.0,1.0,...|             3|
|[0.0,2.0,0.0,1.0,...|             3|
|[2.0,1.0,0.0,2.0,...|             3|
|[1.0,0.0,51.0,1.0...|             3|
|[0.0,0.0,1.0,2.0,...|             3|
|[0.0,0.0,1.0,2.0,...|             3|
|[2.0,1.0,0.0,2.0,...|             3|
|[2.0,1.0,5.0,3.0,...|             3|
|[0.0,3.0,39.0,3.0...|             3|
|[2.0,1.0,0.0,3.0,...|             3|
|[2.0,1.0,0.0,3.0,...|             3|
|[2.0,1.0,39.0,3.0...|             3|
|[0.0,0.0,1.0,4.0,...|             3|
|[2.0,1.0,2.0,4.0,...|             3|
|[0.0,0.0,2.0,4.0,...|             3|
+--------------------+--------------+
only showing top 20 rows



In [40]:
CDB_count = output_CDB.groupBy('Crm_type_group').count()
CDB_count.show()

+--------------+------+
|Crm_type_group| count|
+--------------+------+
|             1|380273|
|             3| 48506|
|             4| 20077|
|             2| 46554|
+--------------+------+



In [41]:
from pyspark.sql.functions import col

def undersample(df, column, majority_class, minority_class, ratio=1.0):
    # Calculate the number of rows to be taken from the majority class
    majority_class_count = df.filter(col(column) == majority_class).count()
    minority_class_count = df.filter(col(column) == minority_class).count()
    majority_sample_count = int(minority_class_count * ratio)

    # Sample the majority class
    majority_sampled_df = df.filter(col(column) == majority_class).sample(
        withReplacement=False, fraction=majority_sample_count / majority_class_count, seed=42
    )

    # Combine the majority sampled data with the minority class data
    minority_df = df.filter(col(column) == minority_class)
    balanced_df = majority_sampled_df.union(minority_df)
    
    return balanced_df


In [42]:
balanced_df = undersample(output_CDB, "Crm_type_group", 1, 2, ratio=1.0)
balanced_df.show()


+--------------------+--------------+
|            features|Crm_type_group|
+--------------------+--------------+
|[0.0,0.0,0.0,1.0,...|             1|
|[0.0,3.0,5.0,1.0,...|             1|
|[0.0,0.0,0.0,1.0,...|             1|
|[1.0,0.0,2.0,2.0,...|             1|
|[0.0,3.0,2.0,2.0,...|             1|
|[1.0,0.0,122.0,2....|             1|
|[0.0,2.0,3.0,2.0,...|             1|
|[0.0,3.0,93.0,2.0...|             1|
|[0.0,0.0,0.0,2.0,...|             1|
|[1.0,3.0,0.0,3.0,...|             1|
|[0.0,4.0,13.0,3.0...|             1|
|[0.0,3.0,0.0,3.0,...|             1|
|[0.0,0.0,5.0,3.0,...|             1|
|[2.0,1.0,20.0,3.0...|             1|
|[0.0,0.0,0.0,3.0,...|             1|
|[1.0,3.0,136.0,3....|             1|
|[0.0,0.0,16.0,2.0...|             1|
|[1.0,5.0,5.0,1.0,...|             1|
|[1.0,0.0,5.0,4.0,...|             1|
|[0.0,0.0,0.0,2.0,...|             1|
+--------------------+--------------+
only showing top 20 rows



In [43]:
# You can adjust the ratio to control the balance between the classes
# Group by column and count occurrences
balanced_df = balanced_df.groupBy('Crm_type_group').agg(count('Crm_type_group').alias("count"))

# Show the resulting balanced DataFrame
balanced_df.show()

+--------------+-----+
|Crm_type_group|count|
+--------------+-----+
|             1|46601|
|             2|46554|
+--------------+-----+



In [44]:
balanced_df2 = undersample(output_CDB, "Crm_type_group", 3, 4, ratio=1.0)
balanced_df2.show()


+--------------------+--------------+
|            features|Crm_type_group|
+--------------------+--------------+
|[0.0,0.0,5.0,1.0,...|             3|
|[1.0,0.0,51.0,1.0...|             3|
|[0.0,3.0,39.0,3.0...|             3|
|[0.0,0.0,1.0,4.0,...|             3|
|[0.0,0.0,2.0,4.0,...|             3|
|[1.0,2.0,39.0,2.0...|             3|
|[0.0,0.0,51.0,1.0...|             3|
|[2.0,1.0,39.0,2.0...|             3|
|[2.0,1.0,0.0,5.0,...|             3|
|[2.0,1.0,0.0,1.0,...|             3|
|[0.0,2.0,0.0,1.0,...|             3|
|[2.0,1.0,16.0,2.0...|             3|
|[1.0,0.0,109.0,1....|             3|
|[2.0,1.0,177.0,3....|             3|
|[0.0,0.0,109.0,1....|             3|
|[0.0,2.0,0.0,1.0,...|             3|
|[2.0,1.0,39.0,3.0...|             3|
|[0.0,2.0,92.0,1.0...|             3|
|[2.0,1.0,0.0,3.0,...|             3|
|[0.0,0.0,92.0,1.0...|             3|
+--------------------+--------------+
only showing top 20 rows



In [45]:
balanced_df2 = balanced_df2.groupBy('Crm_type_group').agg(count('Crm_type_group').alias("count"))

# Show the resulting balanced DataFrame
balanced_df2.show()

+--------------+-----+
|Crm_type_group|count|
+--------------+-----+
|             3|20125|
|             4|20077|
+--------------+-----+



In [46]:
balanced_df_final = balanced_df.union(balanced_df2)
balanced_df_final.show()


+--------------+-----+
|Crm_type_group|count|
+--------------+-----+
|             1|46601|
|             2|46554|
|             3|20125|
|             4|20077|
+--------------+-----+



In [47]:
output_CDB.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Crm_type_group: integer (nullable = true)



In [48]:
# Now create your final features list
features_list = input_cols
# Create your vector assembler object
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
# And call on the vector assembler to transform your dataframe
output = assembler.transform(CDBase_indexed).select('features','label')

In [49]:
output.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[2.0,1.0,4.0,5.0,...|  3.0|
|[2.0,1.0,114.0,1....|  1.0|
|[2.0,1.0,114.0,1....|  1.0|
|[2.0,1.0,109.0,1....|  1.0|
|[0.0,0.0,5.0,1.0,...|  1.0|
|[0.0,0.0,5.0,1.0,...|  1.0|
|[0.0,2.0,0.0,1.0,...|  1.0|
|[2.0,1.0,0.0,2.0,...|  1.0|
|[1.0,0.0,51.0,1.0...|  1.0|
|[0.0,0.0,1.0,2.0,...|  1.0|
|[0.0,0.0,1.0,2.0,...|  1.0|
|[2.0,1.0,0.0,2.0,...|  1.0|
|[2.0,1.0,5.0,3.0,...|  1.0|
|[0.0,3.0,39.0,3.0...|  1.0|
|[2.0,1.0,0.0,3.0,...|  1.0|
|[2.0,1.0,0.0,3.0,...|  1.0|
|[2.0,1.0,39.0,3.0...|  1.0|
|[0.0,0.0,1.0,4.0,...|  1.0|
|[2.0,1.0,2.0,4.0,...|  1.0|
|[0.0,0.0,2.0,4.0,...|  1.0|
+--------------------+-----+
only showing top 20 rows



In [50]:
# Replace "input_features" with a list of your feature column names.
vector_assembler = VectorAssembler(inputCols=input_cols, outputCol="unscaled_features")
# Before we correct for negative values that may have been found above, 
# We need to vectorize our df
# because the function that we use to make that correction requires a vector. 
# Now create your final features list
features_list = input_cols
# Create your vector assembler object
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
# And call on the vector assembler to transform your dataframe
output = assembler.transform(CDBase_indexed).select('features','label')


In [51]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
print("scale range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))

scale range: [0.000000, 1.000000]


In [52]:
scaleMod = scaler.fit(output, params=None)
# we now rescale to range[min, max]
ScaleCCDB = scaleMod.transform(output)
ScaleCCDB.show()

+--------------------+-----+--------------------+
|            features|label|     scaled_features|
+--------------------+-----+--------------------+
|[2.0,1.0,4.0,5.0,...|  3.0|[0.66666666666666...|
|[2.0,1.0,114.0,1....|  1.0|[0.66666666666666...|
|[2.0,1.0,114.0,1....|  1.0|[0.66666666666666...|
|[2.0,1.0,109.0,1....|  1.0|[0.66666666666666...|
|[0.0,0.0,5.0,1.0,...|  1.0|[0.0,0.0,0.016339...|
|[0.0,0.0,5.0,1.0,...|  1.0|[0.0,0.0,0.016339...|
|[0.0,2.0,0.0,1.0,...|  1.0|[0.0,0.1111111111...|
|[2.0,1.0,0.0,2.0,...|  1.0|[0.66666666666666...|
|[1.0,0.0,51.0,1.0...|  1.0|[0.33333333333333...|
|[0.0,0.0,1.0,2.0,...|  1.0|[0.0,0.0,0.003267...|
|[0.0,0.0,1.0,2.0,...|  1.0|[0.0,0.0,0.003267...|
|[2.0,1.0,0.0,2.0,...|  1.0|[0.66666666666666...|
|[2.0,1.0,5.0,3.0,...|  1.0|[0.66666666666666...|
|[0.0,3.0,39.0,3.0...|  1.0|[0.0,0.1666666666...|
|[2.0,1.0,0.0,3.0,...|  1.0|[0.66666666666666...|
|[2.0,1.0,0.0,3.0,...|  1.0|[0.66666666666666...|
|[2.0,1.0,39.0,3.0...|  1.0|[0.66666666666666...|


In [53]:
OUTPUT = ScaleCCDB.select('label', 'scaled_features')
OUTPUT = OUTPUT.withColumnRenamed("scaled_features", "features")
OUTPUT.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  3.0|[0.66666666666666...|
|  1.0|[0.66666666666666...|
|  1.0|[0.66666666666666...|
|  1.0|[0.66666666666666...|
|  1.0|[0.0,0.0,0.016339...|
|  1.0|[0.0,0.0,0.016339...|
|  1.0|[0.0,0.1111111111...|
|  1.0|[0.66666666666666...|
|  1.0|[0.33333333333333...|
|  1.0|[0.0,0.0,0.003267...|
|  1.0|[0.0,0.0,0.003267...|
|  1.0|[0.66666666666666...|
|  1.0|[0.66666666666666...|
|  1.0|[0.0,0.1666666666...|
|  1.0|[0.66666666666666...|
|  1.0|[0.66666666666666...|
|  1.0|[0.66666666666666...|
|  1.0|[0.0,0.0,0.003267...|
|  1.0|[0.66666666666666...|
|  1.0|[0.0,0.0,0.006535...|
+-----+--------------------+
only showing top 20 rows



In [54]:
# scaled train-test
train_data, test_data = OUTPUT.randomSplit([0.7, 0.3], seed=42)
train_data.describe().show()
test_data.describe().show()

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|             347476|
|   mean|0.40769434435759594|
| stddev| 0.8202073906978963|
|    min|                0.0|
|    max|                3.0|
+-------+-------------------+

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|             147934|
|   mean|0.40680979355658603|
| stddev| 0.8197799904364722|
|    min|                0.0|
|    max|                3.0|
+-------+-------------------+



our class labels have been identified as multiclass therefore we are restricted to a certain algorithms to create our models. also, hence the inclusion of the minmax scaler preprocessing technique applied.

## TRAINING THE MODEL

#### Logistic Regression

In [55]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import FloatType



In [56]:
LR = LogisticRegression()
LR_model = LR.fit(train_data)

predictionsLR = LR_model.transform(test_data)

MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") 
accuracy = (MC_evaluator.evaluate(predictionsLR))*100

MC_evaluator.setMetricName("weightedPrecision")
precision = MC_evaluator.evaluate(predictionsLR)

MC_evaluator.setMetricName("weightedRecall")
recall = MC_evaluator.evaluate(predictionsLR)

MC_evaluator.setMetricName("f1")
f1 = MC_evaluator.evaluate(predictionsLR)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1-score:", f1)

Accuracy: 78.37211188773372
Precision: 0.7152571107427438
Recall: 0.7837211188773372
F1-score: 0.7040274413105562


In [60]:
# Convert DataFrame columns to float
predictionsLR = predictionsLR.withColumn("label", F.col("label").cast(FloatType()))
predictionsLR = predictionsLR.withColumn("prediction", F.col("prediction").cast(FloatType()))

# Compute the confusion matrix
metrics = MulticlassMetrics(predictionsLR.select("prediction", "label").rdd.map(tuple))
confusion_matrix = metrics.confusionMatrix().toArray()
print("Confusion Matrix:\n", confusion_matrix)


Confusion Matrix:
 [[1.13426e+05 2.00000e+00 1.89000e+02 0.00000e+00]
 [1.26850e+04 1.90000e+01 1.74400e+03 0.00000e+00]
 [1.13700e+04 1.00000e+01 2.49400e+03 0.00000e+00]
 [5.58400e+03 1.00000e+00 4.10000e+02 0.00000e+00]]


In [61]:
import numpy as np

num_classes = len(confusion_matrix)
for i in range(num_classes):
    tp = confusion_matrix[i][i]
    fn = np.sum(confusion_matrix[i, :]) - tp
    fp = np.sum(confusion_matrix[:, i]) - tp
    tn = np.sum(confusion_matrix) - tp - fn - fp

    precision = tp / (tp + fp)*100
    recall = tp / (tp + fn)*100
    f1_score = 2 * (precision * recall) / (precision + recall)

    print(f"Class {i} - Precision: {precision}, Recall: {recall}, F1-score: {f1_score}")


Class 0 - Precision: 79.28284346276168, Recall: 99.83189135428677, F1-score: 88.37861634240033
Class 1 - Precision: 59.375, Recall: 0.13150609080841638, F1-score: 0.26243093922651933
Class 2 - Precision: 51.56088484597892, Recall: 17.976070347412428, F1-score: 26.658115547004435
Class 3 - Precision: nan, Recall: 0.0, F1-score: nan


  precision = tp / (tp + fp)*100


### Decision Tree Classifier

In [62]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


# Create the model
dt = DecisionTreeClassifier(maxBins=35000)
# Train the model
dt_model = dt.fit(train_data)

predictionsDT = dt_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
accuracy = evaluator.evaluate(predictionsDT)

evaluator.setMetricName("weightedPrecision")
precision = evaluator.evaluate(predictionsDT)

evaluator.setMetricName("weightedRecall")
recall = evaluator.evaluate(predictionsDT)

evaluator.setMetricName("f1")
f1 = evaluator.evaluate(predictionsDT)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1-score:", f1)



Accuracy: 0.911886381764841
Precision: 0.9137783623715694
Recall: 0.911886381764841
F1-score: 0.9011481728743134


In [63]:
# Convert DataFrame columns to float
predictionsDT = predictionsDT.withColumn("label", F.col("label").cast(FloatType()))
predictionsDT = predictionsDT.withColumn("prediction", F.col("prediction").cast(FloatType()))

# Compute the confusion matrix
metrics = MulticlassMetrics(predictionsDT.select("prediction", "label").rdd.map(tuple))
DTconfusion_matrix = metrics.confusionMatrix().toArray()
print("Confusion Matrix:\n", confusion_matrix)


Confusion Matrix:
 [[1.13426e+05 2.00000e+00 1.89000e+02 0.00000e+00]
 [1.26850e+04 1.90000e+01 1.74400e+03 0.00000e+00]
 [1.13700e+04 1.00000e+01 2.49400e+03 0.00000e+00]
 [5.58400e+03 1.00000e+00 4.10000e+02 0.00000e+00]]


In [64]:
import numpy as np

num_classes = len(DTconfusion_matrix)
for i in range(num_classes):
    tp = DTconfusion_matrix[i][i]
    fn = np.sum(DTconfusion_matrix[i, :]) - tp
    fp = np.sum(DTconfusion_matrix[:, i]) - tp
    tn = np.sum(DTconfusion_matrix) - tp - fn - fp

    precision = tp / (tp + fp)*100
    recall = tp / (tp + fn)*100
    f1_score = 2 * (precision * recall) / (precision + recall)

    print(f"Class {i} - Precision: {precision}, Recall: {recall}, F1-score: {f1_score}")

Class 0 - Precision: 92.47981376570945, Recall: 100.0, F1-score: 96.09300004651695
Class 1 - Precision: 91.29789864029667, Recall: 51.12126245847176, F1-score: 65.54263909841158
Class 2 - Precision: 78.71109887083449, Recall: 82.39873144010379, F1-score: 80.51271216282838
Class 3 - Precision: 100.0, Recall: 41.10091743119266, F1-score: 58.25747724317296


## Naive Bayes

In [65]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes()
nb_model = nb.fit(train_data)


In [66]:
predictionsNB = nb_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
accuracy = evaluator.evaluate(predictionsNB)

evaluator.setMetricName("weightedPrecision")
precision = evaluator.evaluate(predictionsNB)

evaluator.setMetricName("weightedRecall")
recall = evaluator.evaluate(predictionsNB)

evaluator.setMetricName("f1")
f1 = evaluator.evaluate(predictionsNB)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1-score:", f1)

Accuracy: 0.7680249300363675
Precision: 0.5898622931573673
Recall: 0.7680249300363675
F1-score: 0.667255628737355


In [67]:
# Convert DataFrame columns to float
predictionsNB = predictionsNB.withColumn("label", F.col("label").cast(FloatType()))
predictionsNB = predictionsNB.withColumn("prediction", F.col("prediction").cast(FloatType()))

# Compute the confusion matrix
metrics = MulticlassMetrics(predictionsNB.select("prediction", "label").rdd.map(tuple))
NBconfusion_matrix = metrics.confusionMatrix().toArray()
print("Confusion Matrix:\n", confusion_matrix)


Confusion Matrix:
 [[1.13426e+05 2.00000e+00 1.89000e+02 0.00000e+00]
 [1.26850e+04 1.90000e+01 1.74400e+03 0.00000e+00]
 [1.13700e+04 1.00000e+01 2.49400e+03 0.00000e+00]
 [5.58400e+03 1.00000e+00 4.10000e+02 0.00000e+00]]


In [68]:
import numpy as np

num_classes = len(NBconfusion_matrix)
for i in range(num_classes):
    tp = NBconfusion_matrix[i][i]
    fn = np.sum(NBconfusion_matrix[i, :]) - tp
    fp = np.sum(NBconfusion_matrix[:, i]) - tp
    tn = np.sum(NBconfusion_matrix) - tp - fn - fp

    precision = tp / (tp + fp)*100
    recall = tp / (tp + fn)*100
    f1_score = 2 * (precision * recall) / (precision + recall)

    print(f"Class {i} - Precision: {precision}, Recall: {recall}, F1-score: {f1_score}")

Class 0 - Precision: 76.80249300363676, Recall: 100.0, F1-score: 86.87942313353801
Class 1 - Precision: nan, Recall: 0.0, F1-score: nan
Class 2 - Precision: nan, Recall: 0.0, F1-score: nan
Class 3 - Precision: nan, Recall: 0.0, F1-score: nan


  precision = tp / (tp + fp)*100


### Random Forest Classifier

In [72]:
from pyspark.ml.classification import RandomForestClassifier
# Create the model
rf = RandomForestClassifier()
RF_model = rf.fit(train_data)

predictionsRF = RF_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
accuracy = evaluator.evaluate(predictionsRF)

evaluator.setMetricName("weightedPrecision")
precision = evaluator.evaluate(predictionsRF)

evaluator.setMetricName("weightedRecall")
recall = evaluator.evaluate(predictionsRF)

evaluator.setMetricName("f1")
f1 = evaluator.evaluate(predictionsRF)

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1-score:", f1)

Accuracy: 0.8558816769640515
Precision: 0.8511710919474837
Recall: 0.8558816769640515
F1-score: 0.821697289010639


In [70]:
# Convert DataFrame columns to float
predictionsRF = predictionsRF.withColumn("label", F.col("label").cast(FloatType()))
predictionsRF = predictionsRF.withColumn("prediction", F.col("prediction").cast(FloatType()))

# Compute the confusion matrix
metrics = MulticlassMetrics(predictionsRF.select("prediction", "label").rdd.map(tuple))
RFconfusion_matrix = metrics.confusionMatrix().toArray()
print("Confusion Matrix:\n", confusion_matrix)

Confusion Matrix:
 [[1.13426e+05 2.00000e+00 1.89000e+02 0.00000e+00]
 [1.26850e+04 1.90000e+01 1.74400e+03 0.00000e+00]
 [1.13700e+04 1.00000e+01 2.49400e+03 0.00000e+00]
 [5.58400e+03 1.00000e+00 4.10000e+02 0.00000e+00]]


In [71]:
num_classes = len(RFconfusion_matrix)
for i in range(num_classes):
    tp = RFconfusion_matrix[i][i]
    fn = np.sum(RFconfusion_matrix[i, :]) - tp
    fp = np.sum(RFconfusion_matrix[:, i]) - tp
    tn = np.sum(RFconfusion_matrix) - tp - fn - fp

    precision = tp / (tp + fp)*100
    recall = tp / (tp + fn)*100
    f1_score = 2 * (precision * recall) / (precision + recall)

    print(f"Class {i} - Precision: {precision}, Recall: {recall}, F1-score: {f1_score}")

Class 0 - Precision: 87.45083550773161, Recall: 100.0, F1-score: 93.30535686422652
Class 1 - Precision: 81.58038147138964, Recall: 20.72259136212625, F1-score: 33.050005519373
Class 2 - Precision: 68.44855905629387, Recall: 66.08043822978232, F1-score: 67.24365556696493
Class 3 - Precision: 87.98735511064278, Recall: 13.928273561301086, F1-score: 24.049539170506918


save in csv file format to visuale results on tableau and make analysis

In [None]:
import csv

header = ['Model', 'Accuracy', 'Weighted Precision', 'Weighted Recall', 'F1 Score']
results = [
    ['Logistic Regression', 0.7837211188773372, 0.7152571107427438, 0.7837211188773372, 0.7040274413105562],
    ['Decision Tree', 0.911886381764841, 0.9137783623715694, 0.911886381764841, 0.9011481728743134],
    ['Naive Bayes', 0.7680249300363675, 0.5898622931573673, 0.7680249300363675, 0.667255628737355],
    ['Random Forest Classifier', 0.8521367636919167, 0.8429959372749076, 0.8521367636919166, 0.8151786907872403]
]

with open('Models_Comparison.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(header)
    writer.writerows(results)

# this is for only the results of models that hyperparameter tuning were applied to