<a href="https://colab.research.google.com/github/Gourang97/PySpark/blob/main/MultiClassClassification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark 

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 60kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 33.9MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=0f92e3e98e0f38ada4617b6cb238eb8322a5987bd4f2a9dc97a37899d59bafc9
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


##***Multi-Class Text Classification with PySpark***

In [6]:
from google.colab import drive
drive.mount('/content/drive')
!unzip /content/drive/MyDrive/dataset/train.csv.zip
!unzip /content/drive/MyDrive/dataset/test.csv.zip

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Archive:  /content/drive/MyDrive/dataset/test.csv.zip
  inflating: test.csv                


In [7]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc= SparkContext()
sqlContext = SQLContext(sc)

ValueError: ignored

## Our task is to classify San Francisco Crime Description into 33 pre-defined categories

In [15]:
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/content/train.csv')

In [17]:
data.head()

Row(Dates='2015-05-13 23:53:00', Category='WARRANTS', Descript='WARRANT ARREST', DayOfWeek='Wednesday', PdDistrict='NORTHERN', Resolution='ARREST, BOOKED', Address='OAK ST / LAGUNA ST', X=-122.425891675136, Y=37.7745985956747)

In [18]:
drop_list = ['Dates', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y']
data = data.select([column for column in data.columns if column not in drop_list])
data.show(5)

+--------------+--------------------+
|      Category|            Descript|
+--------------+--------------------+
|      WARRANTS|      WARRANT ARREST|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
+--------------+--------------------+
only showing top 5 rows



In [19]:
data.printSchema()

root
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)



In [20]:
from pyspark.sql.functions import col

In [31]:
# Top 20 Crime  Categories

In [25]:
data.groupBy("Category") \
.count() \
.orderBy(col("count").desc()) \
.show()

+--------------------+------+
|            Category| count|
+--------------------+------+
|       LARCENY/THEFT|174900|
|      OTHER OFFENSES|126182|
|        NON-CRIMINAL| 92304|
|             ASSAULT| 76876|
|       DRUG/NARCOTIC| 53971|
|       VEHICLE THEFT| 53781|
|           VANDALISM| 44725|
|            WARRANTS| 42214|
|            BURGLARY| 36755|
|      SUSPICIOUS OCC| 31414|
|      MISSING PERSON| 25989|
|             ROBBERY| 23000|
|               FRAUD| 16679|
|FORGERY/COUNTERFE...| 10609|
|     SECONDARY CODES|  9985|
|         WEAPON LAWS|  8555|
|        PROSTITUTION|  7484|
|            TRESPASS|  7326|
|     STOLEN PROPERTY|  4540|
|SEX OFFENSES FORC...|  4388|
+--------------------+------+
only showing top 20 rows



In [32]:
# Top 20 Crime Descriptions

In [33]:
data.groupBy("Descript").count().orderBy(col("count").desc()).show()

+--------------------+-----+
|            Descript|count|
+--------------------+-----+
|GRAND THEFT FROM ...|60022|
|       LOST PROPERTY|31729|
|             BATTERY|27441|
|   STOLEN AUTOMOBILE|26897|
|DRIVERS LICENSE, ...|26839|
|      WARRANT ARREST|23754|
|SUSPICIOUS OCCURR...|21891|
|AIDED CASE, MENTA...|21497|
|PETTY THEFT FROM ...|19771|
|MALICIOUS MISCHIE...|17789|
|   TRAFFIC VIOLATION|16471|
|PETTY THEFT OF PR...|16196|
|MALICIOUS MISCHIE...|15957|
|THREATS AGAINST LIFE|14716|
|      FOUND PROPERTY|12146|
|ENROUTE TO OUTSID...|11470|
|GRAND THEFT OF PR...|11010|
|POSSESSION OF NAR...|10050|
|PETTY THEFT FROM ...|10029|
|PETTY THEFT SHOPL...| 9571|
+--------------------+-----+
only showing top 20 rows



In [35]:
## Model Pipeline -- 3 Steps --
#### regexTokenizer
#### stopwordRemoval
#### countVectors

In [50]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import StringIndexer

In [79]:
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="Descript", outputCol="words", pattern="\\W")

# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)



In [56]:
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)


label_stringIdx = StringIndexer(inputCol = "Category", outputCol = "label")

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

In [58]:
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(10)

+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|      Category|            Descript|               words|            filtered|            features|label|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|      WARRANTS|      WARRANT ARREST|   [warrant, arrest]|   [warrant, arrest]|(809,[17,32],[1.0...|  7.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(809,[11,17,35],[...|  1.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(809,[11,17,35],[...|  1.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(809,[0,2,3,4,6],...|  0.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(809,[0,2,3,4,6],...|  0.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(809,[0,2,3,4,104...|  0.0|
| VEHICLE THEFT|   STOLEN AUTOMOBILE|

In [61]:
(trainingData, testData) = dataset.randomSplit([0.7,0.3], seed = 100)
print("Training dataset Count: " +str(trainingData.count()))
print("Testing dataset Count: " +str(testData.count()))

Training dataset Count: 614485
Testing dataset Count: 263564


In [60]:
## Model Training and Evaluation

In [74]:
mlr = LogisticRegression(maxIter=500, regParam=0.3, elasticNetParam=0)
lrModel = mlr.fit(trainingData)

In [75]:
predictions = lrModel.transform(testData)

In [76]:
predictions.filter(predictions['prediction'] == 2.0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 50, truncate = 30)

+------------------------------+---------------+------------------------------+-----+----------+
|                      Descript|       Category|                   probability|label|prediction|
+------------------------------+---------------+------------------------------+-----+----------+
|DESTRUCTION OF PROPERTY WIT...|    WEAPON LAWS|[0.12667520546547523,0.0787...| 15.0|       2.0|
|DESTRUCTION OF PROPERTY WIT...|    WEAPON LAWS|[0.12667520546547523,0.0787...| 15.0|       2.0|
|DESTRUCTION OF PROPERTY WIT...|    WEAPON LAWS|[0.12667520546547523,0.0787...| 15.0|       2.0|
|INSURED PROPERTY, DESTRUCTI...| OTHER OFFENSES|[0.12241560017104001,0.0888...|  1.0|       2.0|
|INSURED PROPERTY, DESTRUCTI...| OTHER OFFENSES|[0.12241560017104001,0.0888...|  1.0|       2.0|
|     RECEIVING STOLEN PROPERTY|STOLEN PROPERTY|[0.1185681758893363,0.09425...| 18.0|       2.0|
|     RECEIVING STOLEN PROPERTY|STOLEN PROPERTY|[0.1185681758893363,0.09425...| 18.0|       2.0|
|     RECEIVING STOLEN PROPERT

In [77]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9726153024477426

In [73]:
## Logistic Regression usinf TF-IDF Features

In [80]:
from pyspark.ml.feature import HashingTF, IDF

In [81]:
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)

idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

pipelineFit = pipeline.fit(data)


In [82]:
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)


In [83]:
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+-------------+------------------------------+-----+----------+
|                      Descript|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.883156090531307,0.018676...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.883156090531307,0.018676...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.883156090531307,0.018676...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.883156090531307,0.018676...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.883156090531307,0.018676...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.883156090531307,0.018676...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.883156090531307,0.018676...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8

In [84]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9718749357100661

In [85]:
## Naive Bayes

In [86]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+-------------------+-------------+------------------------------+-----+----------+
|           Descript|     Category|                   probability|label|prediction|
+-------------------+-------------+------------------------------+-----+----------+
|GRAND THEFT BICYCLE|LARCENY/THEFT|[1.0,1.3251523437990594E-29...|  0.0|       0.0|
|GRAND THEFT BICYCLE|LARCENY/THEFT|[1.0,1.3251523437990594E-29...|  0.0|       0.0|
|GRAND THEFT BICYCLE|LARCENY/THEFT|[1.0,1.3251523437990594E-29...|  0.0|       0.0|
|GRAND THEFT BICYCLE|LARCENY/THEFT|[1.0,1.3251523437990594E-29...|  0.0|       0.0|
|GRAND THEFT BICYCLE|LARCENY/THEFT|[1.0,1.3251523437990594E-29...|  0.0|       0.0|
|GRAND THEFT BICYCLE|LARCENY/THEFT|[1.0,1.3251523437990594E-29...|  0.0|       0.0|
|GRAND THEFT BICYCLE|LARCENY/THEFT|[1.0,1.3251523437990594E-29...|  0.0|       0.0|
|GRAND THEFT BICYCLE|LARCENY/THEFT|[1.0,1.3251523437990594E-29...|  0.0|       0.0|
|GRAND THEFT BICYCLE|LARCENY/THEFT|[1.0,1.3251523437990594E-29...|  0.0|    

In [87]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9951106098719483

In [88]:
## Randome Forest

In [89]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+----------------------------+-------------+------------------------------+-----+----------+
|                    Descript|     Category|                   probability|label|prediction|
+----------------------------+-------------+------------------------------+-----+----------+
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33907378517288383,0.1175...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33907378517288383,0.1175...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33907378517288383,0.1175...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33907378517288383,0.1175...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33907378517288383,0.1175...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33907378517288383,0.1175...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33907378517288383,0.1175...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.33907378517288383,0.117

In [90]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.38356261031655625