## Creating PySpark Enviroment

In [4]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.0-bin-hadoop3.2"

In [6]:
import findspark
findspark.init()

In [7]:
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("Smooth_Criminal_pyspark").getOrCreate()

sc = spark.sparkContext

### Importing Libraries

In [1]:
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
np.random.seed(60)

### Know your data

In [8]:
%%sh
#Let see the first 5 rows
head -5 train.csv

Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,X,Y
2015-05-13 23:53:00,WARRANTS,WARRANT ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",OAK ST / LAGUNA ST,-122.425891675136,37.7745985956747
2015-05-13 23:53:00,OTHER OFFENSES,TRAFFIC VIOLATION ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",OAK ST / LAGUNA ST,-122.425891675136,37.7745985956747
2015-05-13 23:33:00,OTHER OFFENSES,TRAFFIC VIOLATION ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",VANNESS AV / GREENWICH ST,-122.42436302145,37.8004143219856
2015-05-13 23:30:00,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Wednesday,NORTHERN,NONE,1500 Block of LOMBARD ST,-122.42699532676599,37.80087263276921


In [9]:
df = pd.read_csv("/content/train.csv")
df.head()

Unnamed: 0,Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,X,Y
0,2015-05-13 23:53:00,WARRANTS,WARRANT ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",OAK ST / LAGUNA ST,-122.425892,37.774599
1,2015-05-13 23:53:00,OTHER OFFENSES,TRAFFIC VIOLATION ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",OAK ST / LAGUNA ST,-122.425892,37.774599
2,2015-05-13 23:33:00,OTHER OFFENSES,TRAFFIC VIOLATION ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",VANNESS AV / GREENWICH ST,-122.424363,37.800414
3,2015-05-13 23:30:00,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Wednesday,NORTHERN,NONE,1500 Block of LOMBARD ST,-122.426995,37.800873
4,2015-05-13 23:30:00,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Wednesday,PARK,NONE,100 Block of BRODERICK ST,-122.438738,37.771541


In [10]:
#Read the data into spark datafrome
from pyspark.sql.functions import col, lower
df = spark.read.format('csv')\
          .option('header','true')\
          .option('inferSchema', 'true')\
          .option('timestamp', 'true')\
          .load('train.csv')

data = df.select(lower(col('Category')),lower(col('Descript')))\
        .withColumnRenamed('lower(Category)','Category')\
        .withColumnRenamed('lower(Descript)', 'Description')
data.cache()
print('Dataframe Structure')
print('----------------------------------')
print(data.printSchema())
print(' ')
print('Dataframe preview')
print(data.show(5))
print(' ')
print('----------------------------------')
print('Total number of rows', df.count())

Dataframe Structure
----------------------------------
root
 |-- Category: string (nullable = true)
 |-- Description: string (nullable = true)

None
 
Dataframe preview
+--------------+--------------------+
|      Category|         Description|
+--------------+--------------------+
|      warrants|      warrant arrest|
|other offenses|traffic violation...|
|other offenses|traffic violation...|
| larceny/theft|grand theft from ...|
| larceny/theft|grand theft from ...|
+--------------+--------------------+
only showing top 5 rows

None
 
----------------------------------
Total number of rows 878049


In [11]:
def top_n_list(df,var, N):
    '''
    This function determine the top N numbers of the list
    '''
    print("Total number of unique value of"+' '+var+''+':'+' '+str(df.select(var).distinct().count()))
    print(' ')
    print('Top'+' '+str(N)+' '+'Crime'+' '+var)
    df.groupBy(var).count().withColumnRenamed('count','totalValue')\
    .orderBy(col('totalValue').desc()).show(N)


top_n_list(data, 'Category',10)
print(' ')
print(' ')
top_n_list(data,'Description',10)

Total number of unique value of Category: 39
 
Top 10 Crime Category
+--------------+----------+
|      Category|totalValue|
+--------------+----------+
| larceny/theft|    174900|
|other offenses|    126182|
|  non-criminal|     92304|
|       assault|     76876|
| drug/narcotic|     53971|
| vehicle theft|     53781|
|     vandalism|     44725|
|      warrants|     42214|
|      burglary|     36755|
|suspicious occ|     31414|
+--------------+----------+
only showing top 10 rows

 
 
Total number of unique value of Description: 879
 
Top 10 Crime Description
+--------------------+----------+
|         Description|totalValue|
+--------------------+----------+
|grand theft from ...|     60022|
|       lost property|     31729|
|             battery|     27441|
|   stolen automobile|     26897|
|drivers license, ...|     26839|
|      warrant arrest|     23754|
|suspicious occurr...|     21891|
|aided case, menta...|     21497|
|petty theft from ...|     19771|
|malicious mischie...|   

In [12]:
training, test = data.randomSplit([0.7,0.3], seed=60)
#trainingSet.cache()
print("Training Dataset Count:", training.count())
print("Test Dataset Count:", test.count())

Training Dataset Count: 614667
Test Dataset Count: 263382


In [13]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, OneHotEncoder, StringIndexer, VectorAssembler, HashingTF, IDF, Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, NaiveBayes
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier


# Tokenization using regextokenizer
regex_tokenizer = RegexTokenizer(pattern='\\W')\
                  .setInputCol("Description")\
                  .setOutputCol("tokens")

# Stopwords using stopwordsremover
extra_stopwords = ['http','amp','rt','t','c','the']
stopwords_remover = StopWordsRemover()\
                    .setInputCol('tokens')\
                    .setOutputCol('filtered_words')\
                    .setStopWords(extra_stopwords)


# Bags of words using countVectorizer
count_vectors = CountVectorizer(vocabSize=10000, minDF=5)\
               .setInputCol("filtered_words")\
               .setOutputCol("features")


# Using TF-IDF to vectorise features
hashingTf = HashingTF(numFeatures=10000)\
            .setInputCol("filtered_words")\
            .setOutputCol("raw_features")

#Use minDocFreq to remove sparse terms
idf = IDF(minDocFreq=5)\
        .setInputCol("raw_features")\
        .setOutputCol("features")

# Bag of words using Word2Vec (Only for experimenting/ not required)
# word2Vec = Word2Vec(vectorSize=1000, minCount=0)\
#            .setInputCol("filtered_words")\
#            .setOutputCol("features")

# Encode the Category variable into label using StringIndexer
label_string_idx = StringIndexer()\
                  .setInputCol("Category")\
                  .setOutputCol("label")

# Logistic Regression
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)


# Decision tree classifier 
dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')


# Random Forest Classifier
rf = RandomForestClassifier(featuresCol='features', labelCol='label')


# Naive Bayes
nb = NaiveBayes(smoothing=1)

# def metrics_ev(labels, metrics):
#     '''
#     List of all performance metrics
#     '''
#     # Confusion matrix
#     print("---------Confusion matrix-----------------")
#     print(metrics.confusionMatrix)
#     print(' ')
#     # Overall statistics
#     print('----------Overall statistics-----------')
#     print("Precision = %s" %  metrics.precision())
#     print("Recall = %s" %  metrics.recall())
#     print("F1 Score = %s" % metrics.fMeasure())
#     print(' ')
#     # Statistics by class
#     print('----------Statistics by class----------')
#     for label in sorted(labels):
#        print("Class %s precision = %s" % (label, metrics.precision(label)))
#        print("Class %s recall = %s" % (label, metrics.recall(label)))
#        print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)))
#     print(' ')
#     # Weighted stats
#     print('----------Weighted stats----------------')
#     print("Weighted recall = %s" % metrics.weightedRecall)
#     print("Weighted precision = %s" % metrics.weightedPrecision)
#     print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
#     print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
#     print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)


### Logistic Regression - Count Vector

In [14]:
# Create pipeline for logistic Regression with count vectors
pipeline_cv_lr = Pipeline().setStages([regex_tokenizer,stopwords_remover,count_vectors,label_string_idx, lr])
model_cv_lr = pipeline_cv_lr.fit(training)
predictions_cv_lr = model_cv_lr.transform(test)

In [21]:
print('-----------------------------Check Top 5 predictions----------------------------------')
print(' ')
predictions_cv_lr.select('Description','Category',"probability","label","prediction")\
                                        .orderBy("probability", ascending=False)\
                                        .show(n=5, truncate=30)

-----------------------------Check Top 5 predictions----------------------------------
 
+------------------------------+-------------+------------------------------+-----+----------+
|                   Description|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|theft, bicycle, <$50, no se...|larceny/theft|[0.8738390105608383,0.02048...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8738390105608383,0.02048...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8738390105608383,0.02048...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8738390105608383,0.02048...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8738390105608383,0.02048...|  0.0|       0.0|
+------------------------------+-------------+------------------------------+-----+----------+
only showing top 5 rows



In [22]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator_cv_lr = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_cv_lr)
print(' ')
print('------------------------------Accuracy----------------------------------')
print(' ')
print('                       accuracy:{}:'.format(evaluator_cv_lr))

 
------------------------------Accuracy----------------------------------
 
                       accuracy:0.9720379224200315:


### Decision Tree Classifier

In [23]:
# Create pipeline for Decision Tree Classifier
pipeline_dt = Pipeline().setStages([regex_tokenizer, stopwords_remover, count_vectors, label_string_idx, dt])
model_dt = pipeline_dt.fit(training)
predictions_dt = model_dt.transform(test)

In [24]:
print('-----------------------------Check Top 5 predictions----------------------------------')
print(' ')
predictions_dt.select('Description','Category',"probability","label","prediction")\
                                        .orderBy("probability", ascending=False)\
                                        .show(n=5, truncate=30)

-----------------------------Check Top 5 predictions----------------------------------
 
+------------------------------+-------------+------------------------------+-----+----------+
|                   Description|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|     short change, grand theft|        fraud|[0.9995580219024701,0.0,0.0...| 12.0|       0.0|
|attempted grand theft from ...|larceny/theft|[0.9995580219024701,0.0,0.0...|  0.0|       0.0|
|     short change, petty theft|        fraud|[0.9995580219024701,0.0,0.0...| 12.0|       0.0|
|     short change, petty theft|        fraud|[0.9995580219024701,0.0,0.0...| 12.0|       0.0|
|attempted grand theft from ...|larceny/theft|[0.9995580219024701,0.0,0.0...|  0.0|       0.0|
+------------------------------+-------------+------------------------------+-----+----------+
only showing top 5 rows



In [25]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator_dt = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_dt)
print(' ')
print('------------------------------Accuracy----------------------------------')
print(' ')
print('                       accuracy:{}:'.format(evaluator_dt))

 
------------------------------Accuracy----------------------------------
 
                       accuracy:0.4718012120269327:


### Random Forest Classifier

In [26]:
# Create pipeline for Random Forest
pipeline_rf = Pipeline().setStages([regex_tokenizer, stopwords_remover, count_vectors, label_string_idx, rf])
model_rf = pipeline_rf.fit(training)
predictions_rf = model_rf.transform(test)


In [27]:
print('-----------------------------Check Top 5 predictions----------------------------------')
print(' ')
predictions_rf.select('Description','Category',"probability","label","prediction")\
                                        .orderBy("probability", ascending=False)\
                                        .show(n=5, truncate=30)

-----------------------------Check Top 5 predictions----------------------------------
 
+------------------------------+-------------+------------------------------+-----+----------+
|                   Description|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|petty theft from unlocked auto|larceny/theft|[0.7734294608709094,0.04009...|  0.0|       0.0|
|petty theft from unlocked auto|larceny/theft|[0.7734294608709094,0.04009...|  0.0|       0.0|
|petty theft from unlocked auto|larceny/theft|[0.7734294608709094,0.04009...|  0.0|       0.0|
|petty theft from unlocked auto|larceny/theft|[0.7734294608709094,0.04009...|  0.0|       0.0|
|petty theft from unlocked auto|larceny/theft|[0.7734294608709094,0.04009...|  0.0|       0.0|
+------------------------------+-------------+------------------------------+-----+----------+
only showing top 5 rows



In [28]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator_rf = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_rf)
print(' ')
print('------------------------------Accuracy----------------------------------')
print(' ')
print('                       accuracy:{}:'.format(evaluator_rf))

 
------------------------------Accuracy----------------------------------
 
                       accuracy:0.7356064093405957:


### Naive Bayes

In [29]:
pipeline_cv_nb = Pipeline().setStages([regex_tokenizer,stopwords_remover,count_vectors,label_string_idx, nb])
model_cv_nb = pipeline_cv_nb.fit(training)
predictions_cv_nb = model_cv_nb.transform(test)

In [30]:
evaluator_cv_nb = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_cv_nb)
print(' ')
print('--------------------------Accuracy-----------------------------')
print(' ')
print('                      accuracy:{}:'.format(evaluator_cv_nb))

 
--------------------------Accuracy-----------------------------
 
                      accuracy:0.99350875457078:


### Logistic Regression - TF-IDF

In [31]:
pipeline_idf_lr = Pipeline().setStages([regex_tokenizer,stopwords_remover,hashingTf, idf, label_string_idx, lr])
model_idf_lr = pipeline_idf_lr.fit(training)
predictions_idf_lr = model_idf_lr.transform(test)

In [32]:
print('-----------------------------Check Top 5 predictions----------------------------------')
print(' ')
predictions_idf_lr.select('Description','Category',"probability","label","prediction")\
                                        .orderBy("probability", ascending=False)\
                                        .show(n=5, truncate=30)

-----------------------------Check Top 5 predictions----------------------------------
 
+------------------------------+-------------+------------------------------+-----+----------+
|                   Description|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|theft, bicycle, <$50, no se...|larceny/theft|[0.8845322339589106,0.01879...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8845322339589106,0.01879...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8845322339589106,0.01879...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8845322339589106,0.01879...|  0.0|       0.0|
|theft, bicycle, <$50, no se...|larceny/theft|[0.8845322339589106,0.01879...|  0.0|       0.0|
+------------------------------+-------------+------------------------------+-----+----------+
only showing top 5 rows



In [33]:
evaluator_idf_lr = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_idf_lr)
print(' ')
print('-------------------------------Accuracy---------------------------------')
print(' ')
print('                        accuracy:{}:'.format(evaluator_idf_lr))

 
-------------------------------Accuracy---------------------------------
 
                        accuracy:0.9719229068954107:


### Naive bayes - TF-IDF

In [34]:
pipeline_idf_nb = Pipeline().setStages([regex_tokenizer,stopwords_remover,hashingTf, idf, label_string_idx, nb])
model_idf_nb = pipeline_idf_nb.fit(training)
predictions_idf_nb = model_idf_nb.transform(test)

In [35]:
evaluator_idf_nb = MulticlassClassificationEvaluator().setPredictionCol("prediction").evaluate(predictions_idf_nb)
print(' ')
print('-----------------------------Accuracy-----------------------------')
print(' ')
print('                          accuracy:{}:'.format(evaluator_idf_nb))

 
-----------------------------Accuracy-----------------------------
 
                          accuracy:0.994973005560035:


### Trying with KNN model

In [19]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

data = spark.read.csv("/content/train.csv", header=True, inferSchema=True)

selected_columns = ["Category", "Descript", "Resolution"]
data = data.select(selected_columns)


indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(data) for column in ["Category", "Descript", "Resolution"]]
for indexer in indexers:
    data = indexer.transform(data)


feature_columns = ["Category_index", "Descript_index"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

(training_data, test_data) = data.randomSplit([0.8, 0.2], seed=42)


rf_model = RandomForestClassifier(labelCol="Resolution_index", featuresCol="features", numTrees=10, maxBins=1000)
model = rf_model.fit(training_data)
predictions = model.transform(test_data)

In [20]:
evaluator = MulticlassClassificationEvaluator(labelCol="Resolution_index", predictionCol="prediction", metricName="accuracy")
accuracy_knn = evaluator.evaluate(predictions)
print(' ')
print('-----------------------------Accuracy-----------------------------')
print(' ')
print('                          accuracy:{}:'.format(accuracy_knn))

 
-----------------------------Accuracy-----------------------------
 
                          accuracy:0.7555894453517552:


### Showcasing results

In [38]:
import pandas as pd
data = {
    "Model": ["Logistic Regression", "Naive Bayes", "Decision Tree Classifier", "Random Forest Classifier", "KNN"],
    "Count Vectorizer": ["97.2%", "99.3%", "47.1%","73.5%", "75.5%"],
    "TF-IDF": ["97.2%", "99.5%", "-", "-", "-"],
}

df = pd.DataFrame(data)
df.set_index("Model", inplace=True)
print(df)

                         Count Vectorizer TF-IDF
Model                                           
Logistic Regression                 97.2%  97.2%
Naive Bayes                         99.3%  99.5%
Decision Tree Classifier            47.1%      -
Random Forest Classifier            73.5%      -
KNN                                 75.5%      -


### Explanation: As you can see, TF-IDF proves to be best vectoriser for this dataset, while Naive Bayes proves to be better algorithm for text analysis than Logistic regression, and classifier(s) prove out to be poor to decent in this case.