In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler

In [2]:
crime_schema = StructType([StructField("IncidentName",IntegerType(),True),
                          StructField("Category",StringType(),True),
                          StructField("Descript",StringType(),True),
                          StructField("DayofWeek",StringType(),True),
                          StructField("Date",StringType(),True),
                          StructField("Time",StringType(),True),
                          StructField("PdDistrict",StringType(),True),
                          StructField("Resolution",StringType(),True),
                          StructField("Address",StringType(),True),
                          StructField("X",DoubleType(),True),
                          StructField("Y",DoubleType(),True),
                          StructField("Location",StringType(),True),
                          StructField("PdID",StringType(),True)])

In [3]:
# Load and parse the data file, converting it to a DataFrame.
crimeDF = spark.read.csv('s3a://crimedatafyp/crimeData/Sample9.csv',header=True,schema=crime_schema)
crimeDF.count()

249999

# Data Cleaning

# Data Transformation

In [4]:
crimeDF.printSchema()

root
 |-- IncidentName: integer (nullable = true)
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayofWeek: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- PdID: string (nullable = true)



In [5]:
dropList = ["IncidentName","Descript","Resolution","Address","Location","PdID","X","Y"]
crimeDF = crimeDF.select([column for column in crimeDF.columns if column not in dropList])
crimeDF.printSchema()

root
 |-- Category: string (nullable = true)
 |-- DayofWeek: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- PdDistrict: string (nullable = true)



In [6]:
from pyspark.sql.functions import *
crimeNewDF = crimeDF.withColumn('Year',year(unix_timestamp('Date', 'MM/dd/yyyy').cast("timestamp")))\
                    .withColumn('Month',month(unix_timestamp('Date', 'MM/dd/yyyy').cast("timestamp")))\
                    .withColumn('Day',dayofmonth(unix_timestamp('Date', 'MM/dd/yyyy').cast("timestamp")))\
                    .withColumn('Hour',hour('Time'))
crimeNewDF.show()

+--------------+---------+----------+-----+----------+----+-----+---+----+
|      Category|DayofWeek|      Date| Time|PdDistrict|Year|Month|Day|Hour|
+--------------+---------+----------+-----+----------+----+-----+---+----+
|     VANDALISM|   Friday|08/04/2006|01:00|   TARAVAL|2006|    8|  4|   1|
|       ASSAULT|  Tuesday|05/25/2004|16:30|      PARK|2004|    5| 25|  16|
|OTHER OFFENSES|Wednesday|09/10/2003|18:20|   BAYVIEW|2003|    9| 10|  18|
| LARCENY/THEFT|   Monday|11/23/2009|04:00|   TARAVAL|2009|   11| 23|   4|
| LARCENY/THEFT|  Tuesday|06/28/2011|04:47|  NORTHERN|2011|    6| 28|   4|
| LARCENY/THEFT|Wednesday|11/03/2004|13:45|  SOUTHERN|2004|   11|  3|  13|
| VEHICLE THEFT|Wednesday|06/09/2010|12:55|   MISSION|2010|    6|  9|  12|
| DRUG/NARCOTIC| Thursday|04/14/2011|08:15| INGLESIDE|2011|    4| 14|   8|
|  NON-CRIMINAL| Saturday|12/04/2004|10:00|   CENTRAL|2004|   12|  4|  10|
| LARCENY/THEFT| Saturday|01/22/2005|17:00|  SOUTHERN|2005|    1| 22|  17|
|       RUNAWAY|   Monday

In [7]:
crimeNewDF=crimeNewDF.drop('Date').drop('Time').drop('X').drop('Y')
crimeNewDF.show()

+--------------+---------+----------+----+-----+---+----+
|      Category|DayofWeek|PdDistrict|Year|Month|Day|Hour|
+--------------+---------+----------+----+-----+---+----+
|     VANDALISM|   Friday|   TARAVAL|2006|    8|  4|   1|
|       ASSAULT|  Tuesday|      PARK|2004|    5| 25|  16|
|OTHER OFFENSES|Wednesday|   BAYVIEW|2003|    9| 10|  18|
| LARCENY/THEFT|   Monday|   TARAVAL|2009|   11| 23|   4|
| LARCENY/THEFT|  Tuesday|  NORTHERN|2011|    6| 28|   4|
| LARCENY/THEFT|Wednesday|  SOUTHERN|2004|   11|  3|  13|
| VEHICLE THEFT|Wednesday|   MISSION|2010|    6|  9|  12|
| DRUG/NARCOTIC| Thursday| INGLESIDE|2011|    4| 14|   8|
|  NON-CRIMINAL| Saturday|   CENTRAL|2004|   12|  4|  10|
| LARCENY/THEFT| Saturday|  SOUTHERN|2005|    1| 22|  17|
|       RUNAWAY|   Monday|   TARAVAL|2013|    4|  1|  11|
|OTHER OFFENSES|   Sunday|  RICHMOND|2006|    7| 16|  12|
|       ASSAULT|   Monday|  RICHMOND|2013|    6| 10|  21|
| DRUG/NARCOTIC|  Tuesday|TENDERLOIN|2006|    9|  5|  23|
|     VANDALIS

In [8]:
cat_cols = [item[0] for item in crimeNewDF.dtypes if item[1].startswith('string')] 
print(str(len(cat_cols)) + '  categorical features')
num_cols = [item[0] for item in crimeNewDF.dtypes if item[1].startswith('int') | item[1].startswith('double')][1:]
print(str(len(num_cols)) + '  numerical features')

3  categorical features
3  numerical features


In [9]:
#Index crime category
from pyspark.ml.feature import StringIndexer,IndexToString
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

(trainingData,testData) = crimeNewDF.randomSplit([0.7,0.3],seed= 100)

catIndexer = StringIndexer(inputCol="Category",outputCol="label",handleInvalid='keep')

indexers = [StringIndexer(inputCol= column, outputCol=column+"_index") for column in list(set(crimeNewDF.columns)-set(['Year','Month','Day','Hour','Category']))]

encoders = [OneHotEncoderEstimator(
    inputCols=[indexer.getOutputCol()],
    outputCols=[indexer.getOutputCol()+"_encoded"]) for indexer in indexers]

assemblerInputs = [column + "_index_encoded" for column in list(set(crimeNewDF.columns)-set(['Year','Month','Day','Hour','Category']))] + num_cols

assembler = VectorAssembler(
            inputCols=assemblerInputs,
            outputCol="features")

#create the trainer
nb = NaiveBayes(smoothing=3.0,modelType="multinomial")

pipeline = Pipeline(stages=indexers+encoders+[catIndexer,assembler,nb])
modelDF = pipeline.fit(trainingData)
pr = modelDF.transform(testData)

        
#nb_model = nb.fit(trainingData)
#predictions = nb_model.transform(testData)
#predictions.select("Category","label","probability","prediction").show(n=10)

In [10]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(pr)

0.168234749295272

In [11]:
pr.printSchema()

root
 |-- Category: string (nullable = true)
 |-- DayofWeek: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- DayofWeek_index: double (nullable = false)
 |-- PdDistrict_index: double (nullable = false)
 |-- DayofWeek_index_encoded: vector (nullable = true)
 |-- PdDistrict_index_encoded: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [15]:
pr.select("prediction","probability","label","Category").show()

+----------+--------------------+-----+--------+
|prediction|         probability|label|Category|
+----------+--------------------+-----+--------+
|       0.0|[0.53448437557947...| 24.0|   ARSON|
|       0.0|[0.24542309309522...| 24.0|   ARSON|
|       0.0|[0.51895192873252...| 24.0|   ARSON|
|       0.0|[0.42951776593071...| 24.0|   ARSON|
|       0.0|[0.16712635744569...| 24.0|   ARSON|
|       0.0|[0.24984065042701...| 24.0|   ARSON|
|       0.0|[0.23482257786858...| 24.0|   ARSON|
|       1.0|[0.14408813907401...| 24.0|   ARSON|
|       0.0|[0.25634529121662...| 24.0|   ARSON|
|       0.0|[0.39984210470407...| 24.0|   ARSON|
|       0.0|[0.15056380399601...| 24.0|   ARSON|
|       1.0|[0.07546281810829...| 24.0|   ARSON|
|       0.0|[0.31182654883825...| 24.0|   ARSON|
|       0.0|[0.19398871961872...|  3.0| ASSAULT|
|       0.0|[0.31101393615506...|  3.0| ASSAULT|
|       1.0|[0.14587969938541...|  3.0| ASSAULT|
|       0.0|[0.23007237095400...|  3.0| ASSAULT|
|       1.0|[0.14081