In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *


spark = SparkSession.builder.appName('DATA228_Project_1').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/05/17 17:42:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/17 17:42:03 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Importing all necessary libraries

In [2]:
from pyspark.ml.feature import StopWordsRemover, CountVectorizer,Tokenizer, StringIndexer, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes,LogisticRegression 
from pyspark.sql.functions import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 
from pyspark.mllib.evaluation import MulticlassMetrics


## Loading data into pyspark data frame

In [3]:


df = spark.read.format('csv')\
          .option('header','true')\
          .option('inferSchema', 'true')\
          .option('timestamp', 'true')\
          .load('SF_data.csv')


                                                                                

In [4]:
df.count()

2129525

In [4]:
df1=df.select(str.strip('PdId'),'Incident Code','Category','Descript','DayOfWeek','Date','Time','PdDistrict','X','Y')
#display(df1)

### Data Cleaning

In [5]:
df22=df1.dropna()

In [6]:
df2=df22.filter(col('Category')!=col('Descript'))

In [7]:
dft=df2.withColumn('Date', to_date(col('Date'), "M/d/y"))\
.withColumn('Month', month(col('Date'))).withColumn('Year', year(col('Date')))

dfh = dft.withColumn('Hour', hour(dft.Time))



In [8]:
d1=dfh.filter("PdDistrict !='NA'").select(col('Category'),col('Descript'))

df_data=d1.withColumn('Category_1',lower(col('Category'))).withColumn('Description',lower(col('Descript'))).drop('Category','Descript').withColumnRenamed('Category_1', 'Category')

df_data=df_data.select(str.strip('Description'),str.strip('Category'))
                 

In [9]:
df_data.show()

+--------------------+--------------+
|         Description|      Category|
+--------------------+--------------+
|robbery, bodily f...|       robbery|
|   stolen automobile| vehicle theft|
|   stolen automobile| vehicle theft|
|             battery|       assault|
|             battery|       assault|
|             battery|       assault|
|stolen and recove...| vehicle theft|
|             battery|       assault|
|         trespassing|      trespass|
|burglary of resid...|      burglary|
|grand theft from ...| larceny/theft|
|enroute to depart...|      warrants|
|drivers license, ...|other offenses|
|drivers license, ...|other offenses|
|         trespassing|      trespass|
|petty theft shopl...| larceny/theft|
|robbery of a comm...|       robbery|
|possession of heroin| drug/narcotic|
|grand theft of pr...| larceny/theft|
|suspicious occurr...|suspicious occ|
+--------------------+--------------+
only showing top 20 rows



## String Indexer

In [10]:
data_str_indx = StringIndexer(inputCol="Category", outputCol="label")

In [11]:
data_ind=data_str_indx.fit(df_data).transform(df_data).sort('label')
(data_ind.show())

[Stage 6:>                                                          (0 + 8) / 8]

+--------------------+-------------+-----+
|         Description|     Category|label|
+--------------------+-------------+-----+
|petty theft from ...|larceny/theft|  0.0|
|grand theft from ...|larceny/theft|  0.0|
|grand theft from ...|larceny/theft|  0.0|
|grand theft pickp...|larceny/theft|  0.0|
|grand theft from ...|larceny/theft|  0.0|
|petty theft from ...|larceny/theft|  0.0|
|grand theft from ...|larceny/theft|  0.0|
|petty theft from ...|larceny/theft|  0.0|
|grand theft from ...|larceny/theft|  0.0|
|grand theft from ...|larceny/theft|  0.0|
|petty theft from ...|larceny/theft|  0.0|
|grand theft from ...|larceny/theft|  0.0|
|grand theft from ...|larceny/theft|  0.0|
|grand theft from ...|larceny/theft|  0.0|
|grand theft from ...|larceny/theft|  0.0|
|grand theft from ...|larceny/theft|  0.0|
|lost property, gr...|larceny/theft|  0.0|
|petty theft from ...|larceny/theft|  0.0|
|grand theft from ...|larceny/theft|  0.0|
|grand theft from ...|larceny/theft|  0.0|
+----------

                                                                                

## Label Dictionary

In [12]:
label_dict=data_ind.select(col('label'),col('Category')).distinct().sort('label')

In [13]:
label_dict.show(truncate=False)

[Stage 7:>                                                          (0 + 8) / 8]

+-----+----------------------+
|label|Category              |
+-----+----------------------+
|0.0  |larceny/theft         |
|1.0  |other offenses        |
|2.0  |non-criminal          |
|3.0  |assault               |
|4.0  |vehicle theft         |
|5.0  |drug/narcotic         |
|6.0  |vandalism             |
|7.0  |warrants              |
|8.0  |burglary              |
|9.0  |suspicious occ        |
|10.0 |robbery               |
|11.0 |missing person        |
|12.0 |fraud                 |
|13.0 |forgery/counterfeiting|
|14.0 |secondary codes       |
|15.0 |weapon laws           |
|16.0 |trespass              |
|17.0 |prostitution          |
|18.0 |stolen property       |
|19.0 |disorderly conduct    |
+-----+----------------------+
only showing top 20 rows



                                                                                

## Tokenizer

In [14]:
tokenizer = Tokenizer(outputCol="words",inputCol=("Description"))

In [15]:
data_tok=tokenizer.transform(data_ind)
(data_tok.show())

[Stage 10:>                                                         (0 + 8) / 8]

+--------------------+-------------+-----+--------------------+
|         Description|     Category|label|               words|
+--------------------+-------------+-----+--------------------+
|petty theft from ...|larceny/theft|  0.0|[petty, theft, fr...|
|grand theft from ...|larceny/theft|  0.0|[grand, theft, fr...|
|grand theft from ...|larceny/theft|  0.0|[grand, theft, fr...|
|grand theft pickp...|larceny/theft|  0.0|[grand, theft, pi...|
|grand theft from ...|larceny/theft|  0.0|[grand, theft, fr...|
|petty theft from ...|larceny/theft|  0.0|[petty, theft, fr...|
|grand theft from ...|larceny/theft|  0.0|[grand, theft, fr...|
|petty theft from ...|larceny/theft|  0.0|[petty, theft, fr...|
|grand theft from ...|larceny/theft|  0.0|[grand, theft, fr...|
|grand theft from ...|larceny/theft|  0.0|[grand, theft, fr...|
|petty theft from ...|larceny/theft|  0.0|[petty, theft, fr...|
|grand theft from ...|larceny/theft|  0.0|[grand, theft, fr...|
|grand theft from ...|larceny/theft|  0.



## Stop words removal

In [16]:
remover = StopWordsRemover().setInputCol("words").setOutputCol("words_after_stopwords")
#stopwords = remover.getStopWords()


In [17]:
data_rem=remover.transform(data_tok)
(data_rem.show())

                                                                                

+--------------------+-------------+-----+--------------------+---------------------+
|         Description|     Category|label|               words|words_after_stopwords|
+--------------------+-------------+-----+--------------------+---------------------+
|grand theft from ...|larceny/theft|  0.0|[grand, theft, fr...| [grand, theft, lo...|
|petty theft shopl...|larceny/theft|  0.0|[petty, theft, sh...| [petty, theft, sh...|
|grand theft of pr...|larceny/theft|  0.0|[grand, theft, of...| [grand, theft, pr...|
|grand theft pickp...|larceny/theft|  0.0|[grand, theft, pi...| [grand, theft, pi...|
|petty theft of pr...|larceny/theft|  0.0|[petty, theft, of...| [petty, theft, pr...|
|grand theft from ...|larceny/theft|  0.0|[grand, theft, fr...| [grand, theft, pe...|
|petty theft from ...|larceny/theft|  0.0|[petty, theft, fr...| [petty, theft, bu...|
|petty theft of pr...|larceny/theft|  0.0|[petty, theft, of...| [petty, theft, pr...|
|grand theft from ...|larceny/theft|  0.0|[grand, thef

### Count vectorizer

In [18]:
cv = CountVectorizer(minDF=2).setInputCol("words_after_stopwords").setOutputCol("features")


In [19]:
model = cv.fit(data_rem)

data_cv=model.transform(data_rem)
(data_cv.show())## sparse

[Stage 24:>                                                         (0 + 8) / 8]

+--------------------+-------------+-----+--------------------+---------------------+--------------------+
|         Description|     Category|label|               words|words_after_stopwords|            features|
+--------------------+-------------+-----+--------------------+---------------------+--------------------+
|grand theft from ...|larceny/theft|  0.0|[grand, theft, fr...| [grand, theft, lo...|(997,[0,1,2,3],[1...|
|petty theft shopl...|larceny/theft|  0.0|[petty, theft, sh...| [petty, theft, sh...|(997,[0,5,51],[1....|
|grand theft of pr...|larceny/theft|  0.0|[grand, theft, of...| [grand, theft, pr...|(997,[0,1,4],[1.0...|
|grand theft pickp...|larceny/theft|  0.0|[grand, theft, pi...| [grand, theft, pi...|(997,[0,1,97],[1....|
|petty theft of pr...|larceny/theft|  0.0|[petty, theft, of...| [petty, theft, pr...|(997,[0,4,5],[1.0...|
|grand theft from ...|larceny/theft|  0.0|[grand, theft, fr...| [grand, theft, pe...|(997,[0,1,31],[1....|
|petty theft from ...|larceny/theft| 



## TF-IDF

In [20]:
hashed_df=HashingTF(inputCol="words_after_stopwords", outputCol="hash_features",numFeatures=9000)
idf = IDF(inputCol="hash_features", outputCol="features")

In [21]:
hash_data=hashed_df.transform(data_rem)

In [22]:
idf_data = idf.fit(hash_data).transform(hash_data)

                                                                                

In [43]:
(idf_data.select("label", "features").show()) ## sparse

[Stage 51:>                                                         (0 + 8) / 8]

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(9000,[2434,2785,...|
|  0.0|(9000,[590,2434,2...|
|  0.0|(9000,[590,5182,6...|
|  0.0|(9000,[590,5182,5...|
|  0.0|(9000,[590,2434,2...|
|  0.0|(9000,[2434,2785,...|
|  0.0|(9000,[590,2434,2...|
|  0.0|(9000,[2434,2785,...|
|  0.0|(9000,[590,2434,2...|
|  0.0|(9000,[590,2434,2...|
|  0.0|(9000,[2434,2785,...|
|  0.0|(9000,[590,5182,6...|
|  0.0|(9000,[590,2434,2...|
|  0.0|(9000,[590,2434,2...|
|  0.0|(9000,[590,2434,2...|
|  0.0|(9000,[590,2434,2...|
|  0.0|(9000,[590,1233,3...|
|  0.0|(9000,[2434,2785,...|
|  0.0|(9000,[590,2434,2...|
|  0.0|(9000,[590,2434,2...|
+-----+--------------------+
only showing top 20 rows



                                                                                

### Train and test data

In [23]:
training, test = data_ind.randomSplit([0.7,0.3], seed=60)
#training, test_i = idf_data.randomSplit([0.7,0.3], seed=60)
#training_cv, test_cv = data_cv.randomSplit([0.7,0.3], seed=60)

## Logistic regression model

In [24]:
lr = LogisticRegression(family="multinomial",maxIter=20, regParam=0.4, elasticNetParam=0)
#rf = RandomForestClassifier(featuresCol="features", labelCol="label")


### Pipeline for logistic regression with count vectorizer

In [25]:
pipe_cv_lr = Pipeline(stages=[tokenizer,remover,cv, lr])
model_cv_lr = pipe_cv_lr.fit(training)
pred_cv_lr = model_cv_lr.transform(test)

                                                                                

23/05/17 17:43:43 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


[Stage 48:>                                                         (0 + 7) / 7]

23/05/17 17:43:44 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/05/17 17:43:44 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

23/05/17 17:43:46 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/05/17 17:43:46 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


[Stage 111:>                                                        (0 + 8) / 8]

### Pipeline for logistic regression with TF-IDF

In [26]:
pipe_idf_lr = Pipeline(stages=[tokenizer,remover,hashed_df,idf, lr])
model_idf_lr = pipe_idf_lr.fit(training)
pred_idf_lr = model_idf_lr.transform(test)

[Stage 188:>                                                        (0 + 8) / 8]

### Model Evaluation

In [27]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(pred_cv_lr)


# Other metrics
precision = evaluator.evaluate(pred_cv_lr, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(pred_cv_lr, {evaluator.metricName: "weightedRecall"})
f1_score = evaluator.evaluate(pred_cv_lr, {evaluator.metricName: "f1"})


                                                                                

### Evaluation metrices for logistic regression with count vectorizer

In [28]:
print("Accuracy: {:.4}".format(accuracy))
print("Precision: {:.4} ".format(precision))
print("Recall: {:.4}".format(recall))
print("F1: {:.4}".format(f1_score))

Accuracy: 0.958
Precision: 0.9562 
Recall: 0.958
F1: 0.9463


In [29]:

accuracy_lr_idf = evaluator.evaluate(pred_idf_lr)


# Other metrics
precision_lr_idf = evaluator.evaluate(pred_idf_lr, {evaluator.metricName: "weightedPrecision"})
recall_lr_idf = evaluator.evaluate(pred_idf_lr, {evaluator.metricName: "weightedRecall"})
f1_score_lr_idf = evaluator.evaluate(pred_idf_lr, {evaluator.metricName: "f1"})




                                                                                

### Evaluation metrices for logistic regression with TF-IDF


In [30]:
print("Accuracy: {:.4}".format(accuracy_lr_idf))
print("Precision: {:.4} ".format(precision_lr_idf))
print("Recall: {:.4}".format(recall_lr_idf))
print("F1: {:.4}".format(f1_score_lr_idf))


Accuracy: 0.9575
Precision: 0.9557 
Recall: 0.9575
F1: 0.9457


## Naive Bayes Model

In [37]:
nb = NaiveBayes(smoothing=1)

### Pipeline for Naive Bayes model with count vectorizer

In [38]:
pipe_cv_nv = Pipeline(stages=[tokenizer,remover,cv, nb])
model_cv_nv = pipe_cv_nv.fit(training)
pred_cv_nv = model_cv_nv.transform(test)

                                                                                

### Pipeline for Naive Bayes model with TF-IDF

In [39]:
pipe_idf_nb = Pipeline(stages=[tokenizer,remover,hashed_df,idf, nb])
model_idf_nb = pipe_idf_nb.fit(training)
pred_idf_nb = model_idf_nb.transform(test)

                                                                                

### Model Evaluation

In [40]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


evaluator_nb = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_nb = evaluator_nb.evaluate(pred_cv_nv)


# Other metrics
precision_nb = evaluator_nb.evaluate(pred_cv_nv, {evaluator_nb.metricName: "weightedPrecision"})
recall_nb = evaluator_nb.evaluate(pred_cv_nv, {evaluator_nb.metricName: "weightedRecall"})
f1_score_nb = evaluator_nb.evaluate(pred_cv_nv, {evaluator_nb.metricName: "f1"})


                                                                                

### Evaluation metrices for Naive Bayes model with count vectorizer

In [41]:
print("Accuracy: {:.4}".format(accuracy_nb))
print("Precision: {:.4} ".format(precision_nb))
print("Recall: {:.4}".format(recall_nb))
print("F1: {:.4}".format(f1_score_nb))

Accuracy: 0.9946
Precision: 0.9958 
Recall: 0.9946
F1: 0.995


In [42]:
accuracy_nb_idf = evaluator.evaluate(pred_cv_nv)


# Other metrics
precision_nb_idf = evaluator_nb.evaluate(pred_idf_nb, {evaluator_nb.metricName: "weightedPrecision"})
recall_nb_idf = evaluator_nb.evaluate(pred_idf_nb, {evaluator_nb.metricName: "weightedRecall"})
f1_score_nb_idf = evaluator_nb.evaluate(pred_idf_nb, {evaluator_nb.metricName: "f1"})


[Stage 292:>                                                        (0 + 8) / 8]

23/05/17 17:51:50 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB




23/05/17 17:51:54 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB




23/05/17 17:51:57 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB


                                                                                

### Evaluation metrices for Naive Bayes with TF-IDF

In [43]:
print("Accuracy: {:.4}".format(accuracy_nb_idf))
print("Precision: {:.4} ".format(precision_nb_idf))
print("Recall: {:.4}".format(recall_nb_idf))
print("F1: {:.4}".format(f1_score_nb_idf))

Accuracy: 0.9946
Precision: 0.9955 
Recall: 0.993
F1: 0.9939


## How does model classify  new data sample?

In [52]:
sample_data = spark.createDataFrame([("stolen vehicle from downtown",StringType())
],
["Description"]

)
pred_sample_cv_lr = model_cv_lr.transform(sample_data)
pred_sample_idf_lr = model_idf_lr.transform(sample_data) ##  was selling drugs 


In [53]:
pred_sample_cv_lr.select('Description',"probability","prediction").show()

+--------------------+--------------------+----------+
|         Description|         probability|prediction|
+--------------------+--------------------+----------+
|stolen vehicle fr...|[0.13098373506775...|       4.0|
+--------------------+--------------------+----------+



In [54]:
pred_sample_idf_lr.select('Description',"probability","prediction").show()

+--------------------+--------------------+----------+
|         Description|         probability|prediction|
+--------------------+--------------------+----------+
|stolen vehicle fr...|[0.10889471793392...|       4.0|
+--------------------+--------------------+----------+



In [55]:
label_dict.show(40,truncate=False)

[Stage 327:>                                                        (0 + 8) / 8]

+-----+---------------------------+
|label|Category                   |
+-----+---------------------------+
|0.0  |larceny/theft              |
|1.0  |other offenses             |
|2.0  |non-criminal               |
|3.0  |assault                    |
|4.0  |vehicle theft              |
|5.0  |drug/narcotic              |
|6.0  |vandalism                  |
|7.0  |warrants                   |
|8.0  |burglary                   |
|9.0  |suspicious occ             |
|10.0 |robbery                    |
|11.0 |missing person             |
|12.0 |fraud                      |
|13.0 |forgery/counterfeiting     |
|14.0 |secondary codes            |
|15.0 |weapon laws                |
|16.0 |trespass                   |
|17.0 |prostitution               |
|18.0 |stolen property            |
|19.0 |disorderly conduct         |
|20.0 |drunkenness                |
|21.0 |sex offenses, forcible     |
|22.0 |recovered vehicle          |
|23.0 |driving under the influence|
|24.0 |kidnapping           

