In This notebook we will **explore** and **classify San Francisco Crime Description into 33 pre-defined categories**. 

The data can be downloaded from [Kaggle](https://www.kaggle.com/c/sf-crime/data).

## The Data :

Given a new crime description comes in, we want to assign it to one of 33 categories. The classifier makes the assumption that each new crime description is assigned to one and only one category. This is multi-class text classification problem.
    
    ``Input: Descript``

Example: “ STOLEN AUTOMOBILE”

    ``Output: Category``

Example: VEHICLE THEFT

In [1]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
# sc =SparkContext()
# sqlContext = SQLContext(sc)
data = spark.read.csv('../data/train.csv',header=True )

In [2]:
# Show the structure of columns 
data.printSchema()

root
 |-- Dates: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- X: string (nullable = true)
 |-- Y: string (nullable = true)



In [3]:
# Show data Simple 
data.show()

+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+------------------+
|              Dates|      Category|            Descript|DayOfWeek|PdDistrict|    Resolution|             Address|                  X|                 Y|
+-------------------+--------------+--------------------+---------+----------+--------------+--------------------+-------------------+------------------+
|2015-05-13 23:53:00|      WARRANTS|      WARRANT ARREST|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST|  -122.425891675136|  37.7745985956747|
|2015-05-13 23:53:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|  OAK ST / LAGUNA ST|  -122.425891675136|  37.7745985956747|
|2015-05-13 23:33:00|OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|VANNESS AV / GREE...|   -122.42436302145|  37.8004143219856|
|2015-05-13 23:30:00| LARCENY/THEFT|GRAND THEFT FROM ...|Wednesday|  NORTHER

Let's focus on both columns that we really need " Category" and "Descript"

In [4]:
drop_list = ['Dates', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y']
data = data.select([column for column in data.columns if column not in drop_list])
data.show(5)

+--------------+--------------------+
|      Category|            Descript|
+--------------+--------------------+
|      WARRANTS|      WARRANT ARREST|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
+--------------+--------------------+
only showing top 5 rows



Top 20 crime categories:

In [5]:
from pyspark.sql.functions import col
data.groupBy("Category") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+--------------------+------+
|            Category| count|
+--------------------+------+
|       LARCENY/THEFT|174900|
|      OTHER OFFENSES|126182|
|        NON-CRIMINAL| 92304|
|             ASSAULT| 76876|
|       DRUG/NARCOTIC| 53971|
|       VEHICLE THEFT| 53781|
|           VANDALISM| 44725|
|            WARRANTS| 42214|
|            BURGLARY| 36755|
|      SUSPICIOUS OCC| 31414|
|      MISSING PERSON| 25989|
|             ROBBERY| 23000|
|               FRAUD| 16679|
|FORGERY/COUNTERFE...| 10609|
|     SECONDARY CODES|  9985|
|         WEAPON LAWS|  8555|
|        PROSTITUTION|  7484|
|            TRESPASS|  7326|
|     STOLEN PROPERTY|  4540|
|SEX OFFENSES FORC...|  4388|
+--------------------+------+
only showing top 20 rows



Top 20 crime descriptions:

In [6]:
data.groupBy("Descript") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+--------------------+-----+
|            Descript|count|
+--------------------+-----+
|GRAND THEFT FROM ...|60022|
|       LOST PROPERTY|31729|
|             BATTERY|27441|
|   STOLEN AUTOMOBILE|26897|
|DRIVERS LICENSE, ...|26839|
|      WARRANT ARREST|23754|
|SUSPICIOUS OCCURR...|21891|
|AIDED CASE, MENTA...|21497|
|PETTY THEFT FROM ...|19771|
|MALICIOUS MISCHIE...|17789|
|   TRAFFIC VIOLATION|16471|
|PETTY THEFT OF PR...|16196|
|MALICIOUS MISCHIE...|15957|
|THREATS AGAINST LIFE|14716|
|      FOUND PROPERTY|12146|
|ENROUTE TO OUTSID...|11470|
|GRAND THEFT OF PR...|11010|
|POSSESSION OF NAR...|10050|
|PETTY THEFT FROM ...|10029|
|PETTY THEFT SHOPL...| 9571|
+--------------------+-----+
only showing top 20 rows



## Model Pipeline

Spark Machine Learning Pipelines API is similar to Scikit-Learn. Our pipeline includes three pre-processing steps:

  - 1  **regexTokenizer**: Tokenization (with Regular Expression)
  
  - 2  **stopwordsRemover**: Remove Stop Words
  
  - 3  **countVectors**: Count vectors (“document-term vectors”)

In [7]:
## libs import
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression# regular expression tokenizer

regexTokenizer = RegexTokenizer(inputCol="Descript", outputCol="words", pattern="\\W")# stop words

add_stopwords = ["http","https","amp","rt","t","c","the"] # You can add others stop words if you want

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)# bag of words count

countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

## StringIndexer

StringIndexer encodes a string column of labels to a column of label indices. 

The indices are in [0, numLabels),ordered by label frequencies, so the most frequent label gets index 0.

In our case, the label column (Category) will be encoded to label indices, from 0 to 32; the most frequent label (LARCENY/THEFT) will be indexed as 0.

In [8]:
## libs import
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

label_stringIdx = StringIndexer(inputCol = "Category", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|      Category|            Descript|               words|            filtered|            features|label|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|      WARRANTS|      WARRANT ARREST|   [warrant, arrest]|   [warrant, arrest]|(809,[17,32],[1.0...|  7.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(809,[11,17,35],[...|  1.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(809,[11,17,35],[...|  1.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(809,[0,2,3,4,6],...|  0.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(809,[0,2,3,4,6],...|  0.0|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



## Partition Training & Test sets

In [9]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 615087
Test Dataset Count: 262962


## Model Training and Evaluation

### Logistic Regression using Count Vector Features

In [10]:
## Linear regression Model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0) ## Calling Logistic regression
lrModel = lr.fit(trainingData) ## Fit The data on it
predictions = lrModel.transform(testData) ## Predict using the model
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+-------------+------------------------------+-----+----------+
|                      Descript|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.871761743000378,0.021247...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.871761743000378,0.021247...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.871761743000378,0.021247...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.871761743000378,0.021247...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.871761743000378,0.021247...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.871761743000378,0.021247...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.871761743000378,0.021247...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, SERIA...|LARCENY/THEFT|[0.8

Model Evalution 
each time when we want to evaluate the models we will use MulticlassClassificationEvaluator for mor info [link_to_visit](https://spark.apache.org/docs/2.3.0/mllib-evaluation-metrics.html)

In [11]:
## Libs import
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9722407474605252

### Logistic Regression using TF-IDF Features

In [12]:
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+-------------+------------------------------+-----+----------+
|                      Descript|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8717617430003752,0.02124...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8717617430003752,0.02124...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8717617430003752,0.02124...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8717617430003752,0.02124...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8717617430003752,0.02124...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8717617430003752,0.02124...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8717617430003752,0.02124...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, SERIA...|LARCENY/THEFT|[0.8

In [13]:
## Model Evalution 
evaluator.evaluate(predictions)

0.9722407474605252

The accuracy is excellent! 👍

### Random Forest

In [14]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 15, \
                            maxBins = 50)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+----------------------------+-------------+------------------------------+-----+----------+
|                    Descript|     Category|                   probability|label|prediction|
+----------------------------+-------------+------------------------------+-----+----------+
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9400681504940949,0.00879...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9400681504940949,0.00879...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9400681504940949,0.00879...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9400681504940949,0.00879...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9400681504940949,0.00879...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9400681504940949,0.00879...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9400681504940949,0.00879...|  0.0|       0.0|
|GRAND THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.9400681504940949,0.0087

In [15]:
## Model Evalution 
evaluator.evaluate(predictions)

0.8714910993501884

### Cross-Validation

In [16]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0) ### Logistic Regression 
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
             .build())# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5) ## number of Folds created
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9918194984396738

## Conclusion 