In [1]:
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("clipper-pyspark").getOrCreate()

sc = spark.sparkContext
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
np.random.seed(60)

Dataframe Structure
----------------------------------
root
 |-- Category: string (nullable = true)
 |-- Description: string (nullable = true)

None
 
Dataframe preview
+-------------+--------------------+
|     Category|         Description|
+-------------+--------------------+
|      robbery|robbery, bodily f...|
|vehicle theft|   stolen automobile|
|vehicle theft|   stolen automobile|
|        arson|               arson|
|      assault|             battery|
+-------------+--------------------+
only showing top 5 rows

None
 
----------------------------------
Total number of rows 2129525


In [22]:
#Read the data into spark datafrome
from pyspark.sql.functions import col, lower
df = spark.read.format('csv')\
          .option('header','true')\
          .option('inferSchema', 'true')\
          .option('timestamp', 'true')\
          .load('./data/train_data.csv')

data = df.select(lower(col('Category')),lower(col('Descript')),col('Date'),lower(col('DayOfWeek')),lower(col('Resolution')))\
        .withColumnRenamed('lower(Category)','Category')\
        .withColumnRenamed('lower(Descript)', 'Description')\
        .withColumnRenamed('lower(DayOfWeek)', 'Day of Week')\
        .withColumnRenamed('lower(Resolution)', 'Resolution')
data.cache()
print('Dataframe Structure')
print('----------------------------------')
print(data.printSchema())
print(' ')
print('Dataframe preview')
print(data.show(5))
print(' ')
print('----------------------------------')
print('Total number of rows', df.count())

Dataframe Structure
----------------------------------
root
 |-- Category: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Day of Week: string (nullable = true)
 |-- Resolution: string (nullable = true)

None
 
Dataframe preview
+-------------+--------------------+----------+-----------+----------+
|     Category|         Description|      Date|Day of Week|Resolution|
+-------------+--------------------+----------+-----------+----------+
|      robbery|robbery, bodily f...|11/22/2004|     monday|      none|
|vehicle theft|   stolen automobile|10/18/2005|    tuesday|      none|
|vehicle theft|   stolen automobile|02/15/2004|     sunday|      none|
|        arson|               arson|02/18/2011|     friday|      none|
|      assault|             battery|11/21/2010|     sunday|      none|
+-------------+--------------------+----------+-----------+----------+
only showing top 5 rows

None
 
----------------------------------
Tota

In [None]:
# TOP MOST Categories of crime

In [23]:
top_crime = data.groupby('Category').count()
top_crime.sort(col("count").desc()).show()

+--------------------+------+
|            Category| count|
+--------------------+------+
|       larceny/theft|477975|
|      other offenses|301874|
|        non-criminal|236928|
|             assault|167042|
|       vehicle theft|126228|
|       drug/narcotic|117821|
|           vandalism|114718|
|            warrants| 99821|
|            burglary| 91067|
|      suspicious occ| 79087|
|             robbery| 54467|
|      missing person| 44268|
|               fraud| 41348|
|forgery/counterfe...| 22995|
|     secondary codes| 22378|
|         weapon laws| 21004|
|            trespass| 19194|
|        prostitution| 16501|
|     stolen property| 11450|
|  disorderly conduct|  9932|
+--------------------+------+
only showing top 20 rows



In [None]:
# Which days of the week appear the most crimes

In [24]:
topday = data.groupby('Day of Week').count().withColumnRenamed('Day of Week','Day')
topday.sort(col("count").desc()).show()

+---------+------+
|      Day| count|
+---------+------+
|   friday|324151|
|wednesday|311601|
| saturday|308907|
| thursday|303893|
|  tuesday|302455|
|   monday|294530|
|   sunday|283988|
+---------+------+



In [None]:
# Check resultions for each of the crimes from most to least

In [31]:
res = data.groupby('Category','Resolution').count().withColumnRenamed('Day of Week','Day')
res.filter(col('count')>1000).sort(col("Category").desc(),col("count").desc()).show(100)

+--------------------+--------------------+------+
|            Category|          Resolution| count|
+--------------------+--------------------+------+
|         weapon laws|      arrest, booked| 13334|
|         weapon laws|                none|  6070|
|         weapon laws|       arrest, cited|  1157|
|            warrants|      arrest, booked| 93092|
|            warrants|                none|  5482|
|       vehicle theft|                none|115980|
|       vehicle theft|      arrest, booked|  5064|
|       vehicle theft|           unfounded|  4789|
|           vandalism|                none|101778|
|           vandalism|      arrest, booked|  8734|
|           vandalism|       arrest, cited|  2967|
|            trespass|      arrest, booked|  7598|
|            trespass|                none|  6287|
|            trespass|       arrest, cited|  5053|
|      suspicious occ|                none| 70514|
|      suspicious occ|      arrest, booked|  2765|
|      suspicious occ|         

In [53]:
unres = data.filter(col('Resolution')=='none').groupby('Category','Resolution').count().withColumnRenamed('Day of Week','Day')

unres = unres.select(col('Category'),col('count')).sort(col("count").desc()).withColumnRenamed('count','count of unresolved')

unres.show(1000)

res = data.filter(col('Resolution')!='none')
res=res.groupby('Category').count().withColumnRenamed('Day of Week','Day')
res = res.select(col('Category'),col('count')).sort(col("count").desc()).withColumnRenamed('count','count of resolved')
res.show(1000)


joined= res.join(unres, res.Category == unres.Category,'inner').select(res.Category, 'count of resolved','count of unresolved')     
joined = joined.sort(col("count of unresolved").desc())

joined.show(1000)

+--------------------+-------------------+
|            Category|count of unresolved|
+--------------------+-------------------+
|       larceny/theft|             437927|
|        non-criminal|             184375|
|       vehicle theft|             115980|
|             assault|             104543|
|           vandalism|             101778|
|      other offenses|              86815|
|            burglary|              76653|
|      suspicious occ|              70514|
|             robbery|              43082|
|               fraud|              32057|
|      missing person|              21630|
|forgery/counterfe...|              14449|
|     secondary codes|              12836|
|       drug/narcotic|              10256|
|   recovered vehicle|               8113|
|            trespass|               6287|
|         weapon laws|               6070|
|sex offenses, for...|               5575|
|            warrants|               5482|
|  disorderly conduct|               3458|
|          

In [None]:
#Read the data into spark datafrome
from pyspark.sql.functions import col, lower
df = spark.read.format('csv')\
          .option('header','true')\
          .option('inferSchema', 'true')\
          .option('timestamp', 'true')\
          .load('./data/train_data.csv')

data = df.select(lower(col('Category')),lower(col('Descript')))\
        .withColumnRenamed('lower(Category)','Category')\
        .withColumnRenamed('lower(Descript)', 'Description')
data.cache()
print('Dataframe Structure')
print('----------------------------------')
print(data.printSchema())
print(' ')
print('Dataframe preview')
print(data.show(5))
print(' ')
print('----------------------------------')
print('Total number of rows', df.count())

In [18]:
def top_n_list(df,var, N):
    '''
    This function determine the top N numbers of the list
    '''
    print("Total number of unique value of"+' '+var+''+':'+' '+str(df.select(var).distinct().count()))
    print(' ')
    print('Top'+' '+str(N)+' '+'Crime'+' '+var)
    df.groupBy(var).count().withColumnRenamed('count','totalValue')\
    .orderBy(col('totalValue').desc()).show(N)
    
    
top_n_list(data, 'Category',10)
print(' ')
print(' ')
top_n_list(data,'Description',10)

Total number of unique value of Category: 37
 
Top 10 Crime Category
+--------------+----------+
|      Category|totalValue|
+--------------+----------+
| larceny/theft|    477975|
|other offenses|    301874|
|  non-criminal|    236928|
|       assault|    167042|
| vehicle theft|    126228|
| drug/narcotic|    117821|
|     vandalism|    114718|
|      warrants|     99821|
|      burglary|     91067|
|suspicious occ|     79087|
+--------------+----------+
only showing top 10 rows

 
 
Total number of unique value of Description: 847
 
Top 10 Crime Description
+--------------------+----------+
|         Description|totalValue|
+--------------------+----------+
|grand theft from ...|    178776|
|       lost property|     77947|
|             battery|     66321|
|   stolen automobile|     64616|
|drivers license, ...|     62372|
|aided case, menta...|     56203|
|      warrant arrest|     55123|
|suspicious occurr...|     52253|
|petty theft from ...|     51912|
|petty theft of pr...|   

In [19]:
data.select('Category').distinct().count()

37

In [20]:
training, test = data.randomSplit([0.7,0.3], seed=60)
#trainingSet.cache()
print("Training Dataset Count:", training.count())
print("Test Dataset Count:", test.count())

Training Dataset Count: 1489572
Test Dataset Count: 639953


In [21]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, OneHotEncoder, StringIndexer, VectorAssembler, HashingTF, IDF, Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, NaiveBayes 

#----------------Define tokenizer with regextokenizer()------------------
regex_tokenizer = RegexTokenizer(pattern='\\W')\
                  .setInputCol("Description")\
                  .setOutputCol("tokens")

#----------------Define stopwords with stopwordsremover()---------------------
extra_stopwords = ['http','amp','rt','t','c','the']
stopwords_remover = StopWordsRemover()\
                    .setInputCol('tokens')\
                    .setOutputCol('filtered_words')\
                    .setStopWords(extra_stopwords)
                    

#----------Define bags of words using countVectorizer()---------------------------
count_vectors = CountVectorizer(vocabSize=10000, minDF=5)\
               .setInputCol("filtered_words")\
               .setOutputCol("features")


#-----------Using TF-IDF to vectorise features instead of countVectoriser-----------------
hashingTf = HashingTF(numFeatures=10000)\
            .setInputCol("filtered_words")\
            .setOutputCol("raw_features")
            
#Use minDocFreq to remove sparse terms
idf = IDF(minDocFreq=5)\
        .setInputCol("raw_features")\
        .setOutputCol("features")

#---------------Define bag of words using Word2Vec---------------------------
word2Vec = Word2Vec(vectorSize=1000, minCount=0)\
           .setInputCol("filtered_words")\
           .setOutputCol("features")

#-----------Encode the Category variable into label using StringIndexer-----------
label_string_idx = StringIndexer()\
                  .setInputCol("Category")\
                  .setOutputCol("label")

#-----------Define classifier structure for logistic Regression--------------
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

#---------Define classifier structure for Naive Bayes----------
nb = NaiveBayes(smoothing=1)

def metrics_ev(labels, metrics):
    '''
    List of all performance metrics
    '''
    # Confusion matrix
    print("---------Confusion matrix-----------------")
    print(metrics.confusionMatrix)
    print(' ')    
    # Overall statistics
    print('----------Overall statistics-----------')
    print("Precision = %s" %  metrics.precision())
    print("Recall = %s" %  metrics.recall())
    print("F1 Score = %s" % metrics.fMeasure())
    print(' ')
    # Statistics by class
    print('----------Statistics by class----------')
    for label in sorted(labels):
       print("Class %s precision = %s" % (label, metrics.precision(label)))
       print("Class %s recall = %s" % (label, metrics.recall(label)))
       print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)))
    print(' ')
    # Weighted stats
    print('----------Weighted stats----------------')
    print("Weighted recall = %s" % metrics.weightedRecall)
    print("Weighted precision = %s" % metrics.weightedPrecision)
    print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
    print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
    print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)

In [22]:
pipeline_cv_lr = Pipeline().setStages([regex_tokenizer,stopwords_remover,count_vectors,label_string_idx, lr])
model_cv_lr = pipeline_cv_lr.fit(training)
predictions_cv_lr = model_cv_lr.transform(test)

In [23]:
print('-----------------------------Check Top 5 predictions----------------------------------')
print(' ')
predictions_cv_lr.select('Description','Category',"probability","label","prediction")\
                                        .orderBy("probability", ascending=False)\
                                        .show(n=5, truncate=30)

-----------------------------Check Top 5 predictions----------------------------------
 
+------------------------------+-------------+------------------------------+-----+----------+
|                   Description|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|theft, bicycle, <$50, no se...|larceny/theft|[0.8760164820663662,0.02102...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8760164820663662,0.02102...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8760164820663662,0.02102...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8760164820663662,0.02102...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8760164820663662,0.02102...|  0.0|       0.0|
+------------------------------+-------------+------------------------------+-----+----------+
only showing top 5 rows



In [24]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 
evaluator_cv_lr = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_cv_lr)
print(' ')
print('------------------------------Accuracy----------------------------------')
print(' ')
print('                       accuracy:{}:'.format(evaluator_cv_lr))

 
------------------------------Accuracy----------------------------------
 
                       accuracy:0.9779452699308976:


In [25]:
### Secondary model using NaiveBayes
pipeline_cv_nb = Pipeline().setStages([regex_tokenizer,stopwords_remover,count_vectors,label_string_idx, nb])
model_cv_nb = pipeline_cv_nb.fit(training)
predictions_cv_nb = model_cv_nb.transform(test)

In [26]:
evaluator_cv_nb = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_cv_nb)
print(' ')
print('--------------------------Accuracy-----------------------------')
print(' ')
print('                      accuracy:{}:'.format(evaluator_cv_nb))

 
--------------------------Accuracy-----------------------------
 
                      accuracy:0.9943617840097905:


In [27]:
pipeline_idf_lr = Pipeline().setStages([regex_tokenizer,stopwords_remover,hashingTf, idf, label_string_idx, lr])
model_idf_lr = pipeline_idf_lr.fit(training)
predictions_idf_lr = model_idf_lr.transform(test)

In [28]:
print('-----------------------------Check Top 5 predictions----------------------------------')
print(' ')
predictions_idf_lr.select('Description','Category',"probability","label","prediction")\
                                        .orderBy("probability", ascending=False)\
                                        .show(n=5, truncate=30)

-----------------------------Check Top 5 predictions----------------------------------
 
+------------------------------+-------------+------------------------------+-----+----------+
|                   Description|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|theft, bicycle, <$50, no se...|larceny/theft|[0.8836390132319979,0.01982...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8836390132319979,0.01982...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8836390132319979,0.01982...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8836390132319979,0.01982...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8836390132319979,0.01982...|  0.0|       0.0|
+------------------------------+-------------+------------------------------+-----+----------+
only showing top 5 rows



In [29]:
evaluator_idf_lr = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_idf_lr)
print(' ')
print('-------------------------------Accuracy---------------------------------')
print(' ')
print('                        accuracy:{}:'.format(evaluator_idf_lr))

 
-------------------------------Accuracy---------------------------------
 
                        accuracy:0.9772466482808572:


In [30]:
pipeline_idf_nb = Pipeline().setStages([regex_tokenizer,stopwords_remover,hashingTf, idf, label_string_idx, nb])
model_idf_nb = pipeline_idf_nb.fit(training)
predictions_idf_nb = model_idf_nb.transform(test)

In [31]:
evaluator_idf_nb = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_idf_nb)
print(' ')
print('-----------------------------Accuracy-----------------------------')
print(' ')
print('                          accuracy:{}:'.format(evaluator_idf_nb))

 
-----------------------------Accuracy-----------------------------
 
                          accuracy:0.9949479475778635:
