In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.3.4/spark-2.3.4-bin-hadoop2.7.tgz
!tar xf spark-2.3.4-bin-hadoop2.7.tgz 
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
import os
#spark imports
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame

from pyspark.sql.types import *
from pyspark.sql.functions import format_number, when, col, array, udf, lit

import csv
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import pyspark.sql.functions as F

from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, IndexToString
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.mllib.tree import RandomForest
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [6]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
spark = SparkSession.builder.appName("Project - Chicago crime")\
.config("spark.some.config.option", "some-value")\
.config("spark.driver.memory", "20g")\
.getOrCreate()

In [0]:
crimes_schema = StructType([StructField("Date", StringType(), True ),
                            StructField("Primary Type", StringType(), True  ),
                            StructField("Location Description", StringType(), True ),
                            StructField("Arrest", BooleanType(), True),
                            StructField("Ward", StringType(), True),
                            StructField("Year", IntegerType(), True),
                            StructField("Latitude", DoubleType(), True),
                            StructField("Longitude", DoubleType(), True),        
                            ])

In [0]:
data = spark.read.csv('/content/drive/My Drive/Colab Notebooks/my_csv2.csv',header = True,schema = crimes_schema)

In [11]:
dataset=data.withColumn("Day", F.split(data.Date, " ")[0])
dataset=dataset.withColumn("Day", F.to_date(dataset.Day, "MM/dd/yyyy"))
dataset=dataset.withColumn("Month", F.month(dataset.Day))
dataset=dataset.withColumn("Week", F.weekofyear(dataset.Day))
dataset=dataset.drop('Day')
dataset=dataset.drop('Date')
dataset.take(5)

[Row(Primary Type='PUBLIC PEACE VIOLATION', Location Description='AIRCRAFT', Arrest=False, Ward='41', Year=2019, Latitude=42.002816387, Longitude=-87.90609433, Month=1, Week=1),
 Row(Primary Type='BATTERY', Location Description='STREET', Arrest=False, Ward='42', Year=2019, Latitude=41.88336939, Longitude=-87.633860272, Month=3, Week=11),
 Row(Primary Type='THEFT', Location Description='RESIDENTIAL YARD (FRONT/BACK)', Arrest=False, Ward='4', Year=2019, Latitude=41.825346902, Longitude=-87.606780575, Month=3, Week=11),
 Row(Primary Type='BATTERY', Location Description='RESIDENCE', Arrest=False, Ward='49', Year=2019, Latitude=42.016541612, Longitude=-87.672499325, Month=3, Week=11),
 Row(Primary Type='OTHER OFFENSE', Location Description='STREET', Arrest=False, Ward='4', Year=2019, Latitude=41.825298645, Longitude=-87.6069609, Month=3, Week=11)]

In [0]:
targetDf = dataset.withColumn("Primary Type", \
              when((dataset["Primary Type"] == 'KIDNAPPING') | (dataset["Primary Type"] == 'HOMICIDE'),'OTHER OFFENSE').otherwise(dataset['Primary Type']))

In [13]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, IndexToString, OneHotEncoderEstimator
targetDf.printSchema()

root
 |-- Primary Type: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Week: integer (nullable = true)



**RANDOM FOREST : Use "Week" as feature -**


In [14]:
vector_assembler = VectorAssembler(inputCols = ["Week"], outputCol="features")

df_temp = vector_assembler.transform(targetDf)

featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(df_temp)

labelIndexer = StringIndexer(inputCol="Primary Type", outputCol="indexedLabel").fit(df_temp)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

(trainingData, testData) = df_temp.randomSplit([0.7, 0.3])
rf  = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=100,impurity='gini', maxBins=128)

pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf,labelConverter])

model = pipeline.fit(trainingData)

predictions = model.transform(testData)
predictions.select("predictedLabel", "Primary Type").show()

evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

evaluator2 = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="f1")


accuracy = evaluator.evaluate(predictions)
f1 = evaluator2.evaluate(predictions)


print("accuracy = %g" % (accuracy))
print("f1 score = %g" % (f1))

predictions.groupBy("predictedLabel").count().show()

+--------------+------------+
|predictedLabel|Primary Type|
+--------------+------------+
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
+--------------+------------+
only showing top 20 rows

accuracy = 0.226515
f1 score = 0.0836666
+--------------+------+
|predictedLabel| count|
+--------------+------+
|         THEFT|879124|
+--------------+------+



**RANDOM FOREST : Added more features -**

In [0]:
from pyspark.ml.feature import OneHotEncoderEstimator
locationIndexer = StringIndexer(inputCol="Location Description", outputCol="indexedLocation")\
                .fit(targetDf)\
                .transform(targetDf)
encoded_data = OneHotEncoderEstimator(inputCols=["indexedLocation"],outputCols=["encodedLocation"])\
        .fit(locationIndexer)\
        .transform(locationIndexer)

In [17]:
#locationDescIndexer = StringIndexer(inputCol="Location Description", outputCol="locDespFeatures")

#loc_des_indexer_model = locationDescInd#exer.fit(targetDf)
#loc_des_indexed_data= loc_des_indexer_model.transform(targetDf)

vector_assembler = VectorAssembler(inputCols = ["Latitude", "Longitude", "Arrest", "encodedLocation"], outputCol="features")
df_temp = vector_assembler.transform(encoded_data)
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(df_temp)


labelIndexer = StringIndexer(inputCol="Primary Type", outputCol="indexedLabel").fit(df_temp)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)


(trainingData, testData) = df_temp.randomSplit([0.7, 0.3])
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10,impurity='gini', maxBins=32)

pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf,labelConverter])

model = pipeline.fit(trainingData)

predictions = model.transform(testData)
predictions.select("predictedLabel", "Primary Type").show()

evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

evaluator2 = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="f1")


accuracy = evaluator.evaluate(predictions)
f1 = evaluator2.evaluate(predictions)


print("accuracy = %g" % (accuracy))
print("f1 score = %g" % (f1))

predictions.groupBy("predictedLabel").count().show()

+--------------+------------+
|predictedLabel|Primary Type|
+--------------+------------+
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
+--------------+------------+
only showing top 20 rows

accuracy = 0.291977
f1 score = 0.177746
+------------------+------+
|    predictedLabel| count|
+------------------+------+
|             THEFT|649816|
|           BATTERY|219221|
|DECEPTIVE PRACTICE|  1592|
|         NARCOTICS|  8195|
+-----------------

**Using a grid search and 4-fold cross validation -** 

In [0]:
from pyspark.sql.types import IntegerType
targetDf = targetDf.withColumn("Ward", targetDf["Ward"].cast(IntegerType()))

In [0]:
from pyspark.ml.feature import OneHotEncoderEstimator
locationIndexer = StringIndexer(inputCol="Location Description", outputCol="indexedLocation")\
                .fit(targetDf)\
                .transform(targetDf)
encoded_data = OneHotEncoderEstimator(inputCols=["indexedLocation"],outputCols=["encodedLocation"])\
        .fit(locationIndexer)\
        .transform(locationIndexer)

In [22]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

vector_assembler = VectorAssembler(inputCols = ["Latitude", \
                            "Longitude", "Arrest", "Week", "Ward","encodedLocation"], outputCol="features")

indexed_data = vector_assembler.transform(encoded_data)
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(indexed_data)


labelIndexer = StringIndexer(inputCol="Primary Type", outputCol="indexedLabel").fit(indexed_data)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)


(trainingData, testData) = indexed_data.randomSplit([0.7, 0.3])
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf,labelConverter])

paramGrid = ParamGridBuilder()\
            .addGrid(rf.numTrees, [3,10])\
            .addGrid(rf.maxBins, [32,64])\
            .addGrid(rf.maxDepth, [5,10])\
            .addGrid(rf.impurity,['gini','entropy'])\
            .build()

evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

crossval = CrossValidator(\
                          estimator=pipeline,\
                         estimatorParamMaps=paramGrid,\
                         evaluator=evaluator,\
                         numFolds=4)


model = crossval.fit(trainingData)


predictions = model.transform(testData)
predictions.select("predictedLabel", "Primary Type").show()

evaluator2 = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
accuracy = evaluator.evaluate(predictions)
f1 = evaluator2.evaluate(predictions)
print("accuracy = %g" % (accuracy))
print("f1 score = %g" % (f1))

predictions.groupBy("predictedLabel").count().show()

bestPipeline = model.bestModel
bestLRModel = bestPipeline.stages[2]
bestParams = bestLRModel.extractParamMap()
print("\n".join("{}\t{}".format(k, v) for k, v in bestParams.items()))

+--------------+------------+
|predictedLabel|Primary Type|
+--------------+------------+
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|     NARCOTICS|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|     NARCOTICS|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
|         THEFT|     ASSAULT|
+--------------+------------+
only showing top 20 rows

accuracy = 0.346422
f1 score = 0.241841
+------------------+------+
|    predictedLabel| count|
+------------------+------+
| CRIMINAL TRESPASS|  1251|
|             THEFT|517190|
|           BATTERY|171714|
|DECEPTIVE PRACTICE|  5971|
|         NARCOTIC