<a href="https://colab.research.google.com/github/Nas25s/Nas25s/blob/main/ChicagoCrime.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Library installation
%%capture
!pip install pyspark==3.5.0
!pip install gdown

Get Data


In [None]:
data_link = 'https://drive.google.com/u/0/uc?id=1wRGDX25AyYWw2eA771CCbvugxYrRQOtw'

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
compressed_file_path = '/content/drive/MyDrive/CrimeInsightsRawData/Crimes_2001_to_Present.csv.zip'
!mkdir -p /content/drive/MyDrive/CrimeInsightsRawData

In [None]:
%%capture
import gdown
gdown.download(data_link,compressed_file_path)

In [None]:
!unzip -o /content/drive/MyDrive/CrimeInsightsRawData/Crimes_2001_to_Present.csv.zip -d /content/drive/MyDrive/CrimeInsightsRawData/

Archive:  /content/drive/MyDrive/CrimeInsightsRawData/Crimes_2001_to_Present.csv.zip
  inflating: /content/drive/MyDrive/CrimeInsightsRawData/Crimes_2001_to_Present.csv  
  inflating: /content/drive/MyDrive/CrimeInsightsRawData/__MACOSX/._Crimes_2001_to_Present.csv  


In [None]:
file_path = '/content/drive/MyDrive/CrimeInsightsRawData/Crimes_2001_to_Present.csv'

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
spark = SparkSession.builder.appName("CrimeInsights")\
.config("spark.driver.memory", "10g")\
.getOrCreate()

In [None]:
schema = StructType([StructField("ID", IntegerType(), True),
                            StructField("Case Number", StringType(), True),
                            StructField("Date", StringType(), True ),
                            StructField("Block", StringType(), True),
                            StructField("IUCR", StringType(), True),
                            StructField("Primary Type", StringType(), True  ),
                            StructField("Description", StringType(), True ),
                            StructField("Location Description", StringType(), True ),
                            StructField("Arrest", BooleanType(), True),
                            StructField("Domestic", BooleanType(), True),
                            StructField("Beat", StringType(), True),
                            StructField("District", StringType(), True),
                            StructField("Ward", IntegerType(), True),
                            StructField("Community Area", IntegerType(), True),
                            StructField("FBI Code", StringType(), True ),
                            StructField("X Coordinate", DoubleType(), True),
                            StructField("Y Coordinate", DoubleType(), True ),
                            StructField("Year", IntegerType(), True),
                            StructField("Updated On", DateType(), True ),
                            StructField("Latitude", DoubleType(), True),
                            StructField("Longitude", DoubleType(), True),
                            StructField("Location", StringType(), True )
                            ])

In [None]:
df = spark.read.csv(file_path,header=True,schema=schema)
df

DataFrame[ID: int, Case Number: string, Date: string, Block: string, IUCR: string, Primary Type: string, Description: string, Location Description: string, Arrest: boolean, Domestic: boolean, Beat: string, District: string, Ward: int, Community Area: int, FBI Code: string, X Coordinate: double, Y Coordinate: double, Year: int, Updated On: date, Latitude: double, Longitude: double, Location: string]

In [None]:
drop_columns = ['Case Number','FBI Code','Updated On','IUCR','X Coordinate','Y Coordinate','Location','Domestic','Beat','Description','District','Community Area','Block']
trimmed_df = df.drop(*drop_columns)
casted_df = trimmed_df.withColumn('Date',to_timestamp(col('date'),'MM/dd/yyyy hh:mm:ss a'))
casted_df.show()

+--------+-------------------+--------------------+--------------------+------+----+----+------------+-------------+
|      ID|               Date|        Primary Type|Location Description|Arrest|Ward|Year|    Latitude|    Longitude|
+--------+-------------------+--------------------+--------------------+------+----+----+------------+-------------+
|11646166|2018-09-01 00:01:00|               THEFT|           RESIDENCE| false|   8|2018|        NULL|         NULL|
|11645836|2016-05-01 00:25:00|  DECEPTIVE PRACTICE|                NULL| false|  15|2016|        NULL|         NULL|
|11449702|2018-07-31 13:30:00|           NARCOTICS|              STREET|  true|   5|2018|        NULL|         NULL|
|11643334|2018-12-19 16:30:00|     CRIMINAL DAMAGE|              STREET| false|  31|2018|        NULL|         NULL|
|11645527|2015-02-02 10:00:00|  DECEPTIVE PRACTICE|               OTHER| false|  23|2015|        NULL|         NULL|
|11034701|2001-01-01 11:00:00|  DECEPTIVE PRACTICE|           RE

In [None]:
casted_df.show()

In [None]:
parquet_path = '/content/drive/MyDrive/CrimeInsightParquet/'

In [None]:
casted_df.write.parquet(parquet_path,mode='overwrite')

In [None]:
parqued_df = spark.read.parquet(parquet_path)
parqued_df.count()

7884044

In [None]:
non_na_df = parqued_df.na.drop()
non_na_df.count()

7181628

In [None]:
filter_values = {'STALKING','OBSCENITY','NON-CRIMINAL (SUBJECT SPECIFIED)','GAMBLING','NON - CRIMINAL','LIQUOR LAW VIOLATION','PUBLIC INDECENCY','HUMAN TRAFFICKING','INTIMIDATION','CONCEALED CARRY LICENSE VIOLATION','NON-CRIMINAL','OTHER NARCOTIC VIOLATION'}
non_na_df = non_na_df.withColumnRenamed('Primary Type','Primary_Type')
filtered_df = non_na_df.filter(~non_na_df.Primary_Type.isin(filter_values))
filtered_df.count()

7143859

In [None]:

filtered_df = filtered_df.withColumn("Primary_Type",
          when((col("Primary_Type").isin ('CRIM SEXUAL ASSAULT','PROSTITUTION')) ,'SEX OFFENSE')
          .when((col("Primary_Type").isin('WEAPONS VIOLATION','INTERFERENCE WITH PUBLIC OFFICER')) ,'PUBLIC PEACE VIOLATION')
          .when((col("Primary_Type") == 'ARSON') ,'CRIMINAL TRESPASS')
          .otherwise(col("Primary_Type")))

filtered_df.show()

+--------+-------------------+--------------------+--------------------+------+----+----+------------+-------------+
|      ID|               Date|        Primary_Type|Location Description|Arrest|Ward|Year|    Latitude|    Longitude|
+--------+-------------------+--------------------+--------------------+------+----+----+------------+-------------+
|13127758|2023-07-03 03:16:00|PUBLIC PEACE VIOL...|              STREET| false|  27|2023|41.893709466| -87.66233707|
|13127761|2023-07-02 12:00:00|               THEFT|               ALLEY| false|  46|2023|41.955426474|-87.656160751|
|13127767|2023-06-20 11:34:00|CRIMINAL SEXUAL A...|           RESIDENCE| false|  37|2023| 41.90148192|-87.725043718|
|13127770|2023-07-03 03:30:00|     CRIMINAL DAMAGE|              STREET| false|  27|2023|41.893709466| -87.66233707|
|13127771|2023-07-03 10:30:00|             BATTERY|CHA PARKING LOT /...| false|  28|2023|41.865223036|-87.657458571|
|13127772|2023-07-03 08:33:00|  DECEPTIVE PRACTICE|       HOTEL 

In [None]:
filtered_parquet_path = '/content/drive/MyDrive/CrimeInsightParquetFiltered/'

In [None]:
filtered_df.write.parquet(filtered_parquet_path,mode='overwrite')

In [None]:
transformed_df= spark.read.parquet(filtered_parquet_path)

In [None]:
transformed_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Primary_Type: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



ML


In [None]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, IndexToString
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.mllib.tree import RandomForest
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [None]:
transformed_df=transformed_df.withColumn("Week", weekofyear(col('date')))
transformed_df = transformed_df.withColumn("Primary_Type",
                                     when(col("Primary_Type").isin('KIDNAPPING','HOMICIDE'),'OTHER OFFENSE')
                                     .otherwise(col('Primary_Type')))

transformed_df = transformed_df.drop('date')
transformed_df.show()

+-------+-----------------+--------------------+------+----+----+------------+-------------+----+
|     ID|     Primary_Type|Location Description|Arrest|Ward|Year|    Latitude|    Longitude|Week|
+-------+-----------------+--------------------+------+----+----+------------+-------------+----+
|2627839|          BATTERY|SCHOOL, PUBLIC, B...| false|   4|2003|41.805078225|-87.590555884|  11|
|2627840|  CRIMINAL DAMAGE|           APARTMENT|  true|  29|2003|41.894733978|-87.773638244|  11|
|2627841|         BURGLARY|               OTHER| false|  11|2003|  41.8304572| -87.64609749|  11|
|2627842|          BATTERY|            SIDEWALK| false|   7|2003|41.754968838|-87.559974619|  11|
|2627844|            THEFT|    DEPARTMENT STORE|  true|  42|2003|41.883500187|-87.627876698|  10|
|2627845|        NARCOTICS|CHA PARKING LOT/G...|  true|  28|2003|41.862457487|-87.692520826|  10|
|2627847|          BATTERY|           APARTMENT|  true|   5|2003|41.770030877|-87.587998374|  11|
|2627849|  CRIMINAL 

In [None]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, IndexToString


In [None]:
vector_assembler = VectorAssembler(inputCols = ["Week"], outputCol="features")

In [None]:
vector_assembled_df = vector_assembler.transform(transformed_df)

In [None]:
featureIndexer =  VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(vector_assembled_df)

In [None]:
labelIndexer = StringIndexer(inputCol="Primary_Type", outputCol="indexedLabel").fit(vector_assembled_df)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

In [None]:
(trainingData, testData) = vector_assembled_df.randomSplit([0.7, 0.3])
rf  = RandomForestClassifier(labelCol="indexedLabel",
                             featuresCol="indexedFeatures",
                             numTrees=100,impurity='gini', maxBins=128)


In [None]:
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf,labelConverter])


In [None]:
model = pipeline.fit(trainingData)

predictions = model.transform(testData)


In [None]:
predictions.select("predictedLabel", "Primary_Type").show()

evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

evaluator2 = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="f1")


accuracy = evaluator.evaluate(predictions)
f1 = evaluator2.evaluate(predictions)


print("accuracy = %g" % (accuracy))
print("f1 score = %g" % (f1))

predictions.groupBy("predictedLabel").count().show()

+--------------+-------------+
|predictedLabel| Primary_Type|
+--------------+-------------+
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
+--------------+-------------+
only showing top 20 rows

accuracy = 0.213314
f1 score = 0.075006
+--------------+-------+
|predictedLabel|  count|
+--------------+-------+
|         THEFT|2144335|
+--------------+-------+



In [None]:

from pyspark.ml.feature import OneHotEncoder
locationIndexer = StringIndexer(inputCol="Location Description", outputCol="indexedLocation")\
                .fit(transformed_df)\
                .transform(transformed_df)
encoded_data = OneHotEncoder(inputCols=["indexedLocation"],outputCols=["encodedLocation"])\
        .fit(locationIndexer)\
        .transform(locationIndexer)

In [None]:
vector_assembler = VectorAssembler(inputCols = ["Latitude", "Longitude", "Arrest", "encodedLocation"], outputCol="features")
df_temp = vector_assembler.transform(encoded_data)
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(df_temp)


labelIndexer = StringIndexer(inputCol="Primary_Type", outputCol="indexedLabel").fit(df_temp)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)


(trainingData, testData) = df_temp.randomSplit([0.7, 0.3])
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10,impurity='gini', maxBins=32)

pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf,labelConverter])

model = pipeline.fit(trainingData)

predictions = model.transform(testData)
predictions.select("predictedLabel", "Primary_Type").show()

evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

evaluator2 = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="f1")


accuracy = evaluator.evaluate(predictions)
f1 = evaluator2.evaluate(predictions)


print("accuracy = %g" % (accuracy))
print("f1 score = %g" % (f1))

predictions.groupBy("predictedLabel").count().show()

+--------------+-------------+
|predictedLabel| Primary_Type|
+--------------+-------------+
|         THEFT|OTHER OFFENSE|
|     NARCOTICS|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|     NARCOTICS|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|       BATTERY|OTHER OFFENSE|
|     NARCOTICS|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|     NARCOTICS|OTHER OFFENSE|
|     NARCOTICS|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|       BATTERY|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
|       BATTERY|OTHER OFFENSE|
|       BATTERY|OTHER OFFENSE|
|         THEFT|OTHER OFFENSE|
+--------------+-------------+
only showing top 20 rows

accuracy = 0.325511
f1 score = 0.216875
+--------------+-------+
|predictedLabel|  count|
+--------------+-------+
|         THEFT|1331379|
|       BATTERY| 608987|
|     NARCOTICS| 204640|
+--------------+-------+



In [None]:
from pyspark.sql.types import IntegerType
transformed_df = transformed_df.withColumn("Ward", transformed_df["Ward"].cast(IntegerType()))


In [None]:
from pyspark.ml.feature import OneHotEncoder
locationIndexer = StringIndexer(inputCol="Location Description", outputCol="indexedLocation")\
                .fit(transformed_df)\
                .transform(transformed_df)
encoded_data = OneHotEncoder(inputCols=["indexedLocation"],outputCols=["encodedLocation"])\
        .fit(locationIndexer)\
        .transform(locationIndexer)

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

vector_assembler = VectorAssembler(inputCols = ["Latitude", \
                            "Longitude", "Arrest", "Week", "Ward","encodedLocation"], outputCol="features")

indexed_data = vector_assembler.transform(encoded_data)
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(indexed_data)


labelIndexer = StringIndexer(inputCol="Primary_Type", outputCol="indexedLabel").fit(indexed_data)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)


(trainingData, testData) = indexed_data.randomSplit([0.7, 0.3])
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf,labelConverter])

paramGrid = ParamGridBuilder()\
            .addGrid(rf.numTrees, [3,10])\
            .addGrid(rf.maxBins, [32,64])\
            .addGrid(rf.maxDepth, [5,10])\
            .addGrid(rf.impurity,['gini','entropy'])\
            .build()

evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

crossval = CrossValidator(\
                          estimator=pipeline,\
                         estimatorParamMaps=paramGrid,\
                         evaluator=evaluator,\
                         numFolds=4)


model = crossval.fit(trainingData)


predictions = model.transform(testData)
predictions.select("predictedLabel", "Primary_Type").show()

evaluator2 = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
accuracy = evaluator.evaluate(predictions)
f1 = evaluator2.evaluate(predictions)
print("accuracy = %g" % (accuracy))
print("f1 score = %g" % (f1))

predictions.groupBy("predictedLabel").count().show()

bestPipeline = model.bestModel
bestLRModel = bestPipeline.stages[2]
bestParams = bestLRModel.extractParamMap()
print("\n".join("{}\t{}".format(k, v) for k, v in bestParams.items()))