## Develop a Text Classification ML Model using PySpark Lib to predict Udemy Subject Category given course title or *text*

In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 52.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=9080d528f43a283bd39280052e665447c4e3ae84e39b90f769ce3226a5a66aa1
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
## Import necessary libraries

from pyspark import SparkContext
sc = SparkContext(master="local[2]")


In [None]:
## launch UI
sc

In [None]:
# Create Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TextClassifierwithPySpark").getOrCreate()


In [None]:
## Load Data from googlle drive

df = spark.read.csv("udemy_courses.csv",header=True,inferSchema=True)



In [None]:
df.show()

+---------+--------------------+--------------------+-------+-----+---------------+-----------+------------+------------------+------------------+-------------------+----------------+
|course_id|        course_title|                 url|is_paid|price|num_subscribers|num_reviews|num_lectures|             level|  content_duration|published_timestamp|         subject|
+---------+--------------------+--------------------+-------+-----+---------------+-----------+------------+------------------+------------------+-------------------+----------------+
|  1070968|Ultimate Investme...|https://www.udemy...|   true|  200|           2147|         23|          51|        All Levels|               1.5|2017-01-18 20:58:58|Business Finance|
|  1113822|Complete GST Cour...|https://www.udemy...|   true|   75|           2792|        923|         274|        All Levels|              39.0|2017-03-09 16:34:20|Business Finance|
|  1006314|Financial Modelin...|https://www.udemy...|   true|   45|           21

In [None]:
df.columns

['course_id',
 'course_title',
 'url',
 'is_paid',
 'price',
 'num_subscribers',
 'num_reviews',
 'num_lectures',
 'level',
 'content_duration',
 'published_timestamp',
 'subject']

In [None]:
## Select Columns

df.select('course_title','subject').show()

+--------------------+----------------+
|        course_title|         subject|
+--------------------+----------------+
|Ultimate Investme...|Business Finance|
|Complete GST Cour...|Business Finance|
|Financial Modelin...|Business Finance|
|Beginner to Pro -...|Business Finance|
|How To Maximize Y...|Business Finance|
|Trading Penny Sto...|Business Finance|
|Investing And Tra...|Business Finance|
|Trading Stock Cha...|Business Finance|
|Options Trading 3...|Business Finance|
|The Only Investme...|Business Finance|
|Forex Trading Sec...|Business Finance|
|Trading Options W...|Business Finance|
|Financial Managem...|Business Finance|
|Forex Trading Cou...|Business Finance|
|Python Algo Tradi...|Business Finance|
|Short Selling: Le...|Business Finance|
|Basic Technical A...|Business Finance|
|The Complete Char...|Business Finance|
|7 Deadly Mistakes...|Business Finance|
|Financial Stateme...|Business Finance|
+--------------------+----------------+
only showing top 20 rows



In [None]:
df = df.select('course_title','subject')
df.show(5)


+--------------------+----------------+
|        course_title|         subject|
+--------------------+----------------+
|Ultimate Investme...|Business Finance|
|Complete GST Cour...|Business Finance|
|Financial Modelin...|Business Finance|
|Beginner to Pro -...|Business Finance|
|How To Maximize Y...|Business Finance|
+--------------------+----------------+
only showing top 5 rows



In [None]:
# Value Counts

df.groupBy('subject').count().show()
	

+-------------------+-----+
|            subject|count|
+-------------------+-----+
|               null|    5|
|   Business Finance| 1195|
|     Graphic Design|  603|
|    Web Development| 1200|
|Musical Instruments|  680|
+-------------------+-----+



In [None]:
# Value Counts via pandas

df.toPandas()['subject'].value_counts()


Web Development        1200
Business Finance       1195
Musical Instruments     680
Graphic Design          603
Name: subject, dtype: int64

In [None]:
# Check For Missing Values

df.toPandas()['subject'].isnull().sum()


5

In [None]:
# Drop Missing Values

df = df.dropna(subset=('subject'))


In [None]:
# Check For Missing Values

df.toPandas()['subject'].isnull().sum()


0

In [None]:
df.show(5)

+--------------------+----------------+
|        course_title|         subject|
+--------------------+----------------+
|Ultimate Investme...|Business Finance|
|Complete GST Cour...|Business Finance|
|Financial Modelin...|Business Finance|
|Beginner to Pro -...|Business Finance|
|How To Maximize Y...|Business Finance|
+--------------------+----------------+
only showing top 5 rows



In [None]:
## Feature Extraction

import pyspark.ml.feature
#dir(pyspark.ml.feature)


In [None]:

# Load Transformer & Extractor Pkgs

from pyspark.ml.feature import Tokenizer,StopWordsRemover,CountVectorizer,IDF
from pyspark.ml.feature import StringIndexer


In [None]:
# Stages For the Pipeline

tokenizer = Tokenizer(inputCol='course_title',outputCol='mytokens')

stopwords_remover = StopWordsRemover(inputCol='mytokens',outputCol='filtered_tokens')

vectorizer = CountVectorizer(inputCol='filtered_tokens',outputCol='rawFeatures')

idf = IDF(inputCol='rawFeatures',outputCol='vectorizedFeatures')



In [None]:
# LabelEncoding/LabelIndexing

labelEncoder = StringIndexer(inputCol='subject',outputCol='subject_title').fit(df)

labelEncoder.transform(df).show(5)


+--------------------+----------------+-------------+
|        course_title|         subject|subject_title|
+--------------------+----------------+-------------+
|Ultimate Investme...|Business Finance|          1.0|
|Complete GST Cour...|Business Finance|          1.0|
|Financial Modelin...|Business Finance|          1.0|
|Beginner to Pro -...|Business Finance|          1.0|
|How To Maximize Y...|Business Finance|          1.0|
+--------------------+----------------+-------------+
only showing top 5 rows



In [None]:
labelEncoder.labels

['Web Development',
 'Business Finance',
 'Musical Instruments',
 'Graphic Design']

In [None]:
# Dict of Labels

label_dict = {'Web Development':0.0,'Business Finance':1.0,
 'Musical Instruments':2.0, 'Graphic Design':3.0}
df.show()


+--------------------+----------------+
|        course_title|         subject|
+--------------------+----------------+
|Ultimate Investme...|Business Finance|
|Complete GST Cour...|Business Finance|
|Financial Modelin...|Business Finance|
|Beginner to Pro -...|Business Finance|
|How To Maximize Y...|Business Finance|
|Trading Penny Sto...|Business Finance|
|Investing And Tra...|Business Finance|
|Trading Stock Cha...|Business Finance|
|Options Trading 3...|Business Finance|
|The Only Investme...|Business Finance|
|Forex Trading Sec...|Business Finance|
|Trading Options W...|Business Finance|
|Financial Managem...|Business Finance|
|Forex Trading Cou...|Business Finance|
|Python Algo Tradi...|Business Finance|
|Short Selling: Le...|Business Finance|
|Basic Technical A...|Business Finance|
|The Complete Char...|Business Finance|
|7 Deadly Mistakes...|Business Finance|
|Financial Stateme...|Business Finance|
+--------------------+----------------+
only showing top 20 rows



In [None]:

df = labelEncoder.transform(df)
df.show(5)

+--------------------+----------------+-------------+
|        course_title|         subject|subject_title|
+--------------------+----------------+-------------+
|Ultimate Investme...|Business Finance|          1.0|
|Complete GST Cour...|Business Finance|          1.0|
|Financial Modelin...|Business Finance|          1.0|
|Beginner to Pro -...|Business Finance|          1.0|
|How To Maximize Y...|Business Finance|          1.0|
+--------------------+----------------+-------------+
only showing top 5 rows



In [None]:
##   Split Data into Dependent & Independent

# train is 70, test is 30

(traindf, testdf) = df.randomSplit((0.7,0.3),seed=42)


In [None]:
traindf.show()


+--------------------+-------------------+-------------+
|        course_title|            subject|subject_title|
+--------------------+-------------------+-------------+
|#1 Piano Hand Coo...|Musical Instruments|          2.0|
|#10 Hand Coordina...|Musical Instruments|          2.0|
|#4 Piano Hand Coo...|Musical Instruments|          2.0|
|#5  Piano Hand Co...|Musical Instruments|          2.0|
|#6 Piano Hand Coo...|Musical Instruments|          2.0|
|'Geometry Of Chan...|   Business Finance|          1.0|
|1 - Concepts of S...|   Business Finance|          1.0|
|          1 Hour CSS|    Web Development|          0.0|
|         1 Hour HTML|    Web Development|          0.0|
|10 Numbers Every ...|   Business Finance|          1.0|
|10.  Bonds and Bo...|   Business Finance|          1.0|
|101 Awesome Rocka...|Musical Instruments|          2.0|
|15  Motion Graphi...|     Graphic Design|          3.0|
|150 Rock Guitar L...|Musical Instruments|          2.0|
|188% Profit in 1Y...|   Busine

In [56]:
### Estimator

from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol='vectorizedFeatures',labelCol='subject_title')


In [57]:
## Build the Pipeline

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[tokenizer,stopwords_remover,vectorizer,idf,lr])
pipeline


Pipeline_0ceedbba75fd

In [58]:
pipeline.stages

Param(parent='Pipeline_0ceedbba75fd', name='stages', doc='a list of pipeline stages')

In [59]:
# Building Model

lr_model = pipeline.fit(traindf)
lr_model


PipelineModel_8d8343fb45b7

In [60]:
# Predictions on Test Dataset

predictions = lr_model.transform(testdf)

predictions.show()


+--------------------+-------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|        course_title|            subject|subject_title|            mytokens|     filtered_tokens|         rawFeatures|  vectorizedFeatures|       rawPrediction|         probability|prediction|
+--------------------+-------------------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|#12 Hand Coordina...|Musical Instruments|          2.0|[#12, hand, coord...|[#12, hand, coord...|(3690,[309,505,58...|(3690,[309,505,58...|[-0.7803356051140...|[0.00116065267926...|       2.0|
|#7 Piano Hand Coo...|Musical Instruments|          2.0|[#7, piano, hand,...|[#7, piano, hand,...|(3690,[10,12,56,2...|(3690,[10,12,56,2...|[-10.533693339739...|[7.98862638877710...|       2.0|
|'Greensleeves' Cr...|Musical 

In [62]:
# Select Columns

predictions.columns

predictions.select('rawPrediction','probability','subject','subject_title','prediction').show(10)


+--------------------+--------------------+-------------------+-------------+----------+
|       rawPrediction|         probability|            subject|subject_title|prediction|
+--------------------+--------------------+-------------------+-------------+----------+
|[-0.7803356051140...|[0.00116065267926...|Musical Instruments|          2.0|       2.0|
|[-10.533693339739...|[7.98862638877710...|Musical Instruments|          2.0|       2.0|
|[-5.7345439611759...|[1.57212304722017...|Musical Instruments|          2.0|       2.0|
|[-5.2349493174801...|[1.36914708755194...|   Business Finance|          1.0|       1.0|
|[17.7120642806377...|[0.99999999924164...|    Web Development|          0.0|       0.0|
|[11.2290601883984...|[0.99999365403204...|    Web Development|          0.0|       0.0|
|[-3.0896827267654...|[3.77433635185601...|   Business Finance|          1.0|       1.0|
|[-2.7014052141029...|[1.23310836985434...|Musical Instruments|          2.0|       2.0|
|[-5.9389807712086...

In [64]:
### Model Evaluation

# Accuracy
# Precision
# F1score

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='subject_title',predictionCol='prediction',metricName='accuracy')

accuracy = evaluator.evaluate(predictions)
accuracy


0.9171428571428571

In [65]:
#### Method 2: Precision. F1Score (Classification Report)

from pyspark.mllib.evaluation import MulticlassMetrics

lr_metric = MulticlassMetrics(predictions['subject_title','prediction'].rdd)

print("Accuracy:",lr_metric.accuracy)
print("Precision:",lr_metric.precision(1.0))
print("Recall:",lr_metric.recall(1.0))
print("F1Score:",lr_metric.fMeasure(1.0))




Accuracy: 0.9171428571428571
Precision: 0.9036827195467422
Recall: 0.9327485380116959
F1Score: 0.9179856115107914


In [66]:
##  Confusion Matrix to Evaluate model

y_true = predictions.select('subject_title')
y_true = y_true.toPandas()
y_pred = predictions.select('prediction')
y_pred = y_pred.toPandas()

from sklearn.metrics import confusion_matrix,classification_report
cm = confusion_matrix(y_true,y_pred)
cm


array([[332,   7,   1,   4],
       [ 29, 319,   1,   4],
       [  4,   6, 165,   2],
       [ 15,  10,   4, 147]])

In [67]:
import matplotlib.pyplot as plt
import numpy as np
import itertools

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)



In [77]:

class_names = ['Web Development', 'Business Finance', 'Musical Instruments', 'Graphic Design','N4','N5']



In [73]:
# Classication Report

print(classification_report(y_true,y_pred))


              precision    recall  f1-score   support

         0.0       0.87      0.97      0.92       344
         1.0       0.93      0.90      0.92       353
         2.0       0.96      0.93      0.95       177
         3.0       0.94      0.84      0.88       176

    accuracy                           0.92      1050
   macro avg       0.93      0.91      0.92      1050
weighted avg       0.92      0.92      0.92      1050



In [80]:
class_temp = predictions.select("subject_title").groupBy("subject_title")\
                        .count().sort('count', ascending=False).toPandas()
class_temp = class_temp["subject_title"].values.tolist()
class_names = map(str, class_temp)


In [81]:
# # # print(class_name)
class_names


<map at 0x7f04ecfb3f10>

In [85]:
###  Making Single Prediction

from pyspark.sql.types import StringType

ex1 = spark.createDataFrame([
    ("Building Machine Learning Apps with Python and PySpark",StringType())
],

# Column Name
["course_title"]

)
ex1.show()


+--------------------+---+
|        course_title| _2|
+--------------------+---+
|Building Machine ...| {}|
+--------------------+---+



In [86]:
# Show Full 
ex1.show(truncate=False)


+------------------------------------------------------+---+
|course_title                                          |_2 |
+------------------------------------------------------+---+
|Building Machine Learning Apps with Python and PySpark|{} |
+------------------------------------------------------+---+



In [87]:
# Predict

pred_ex1 = lr_model.transform(ex1)
pred_ex1.show()


+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|        course_title| _2|            mytokens|     filtered_tokens|         rawFeatures|  vectorizedFeatures|       rawPrediction|         probability|prediction|
+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Building Machine ...| {}|[building, machin...|[building, machin...|(3690,[53,74,85,1...|(3690,[53,74,85,1...|[12.2139940216172...|[0.99999904837362...|       0.0|
+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+



In [88]:
pred_ex1.columns

['course_title',
 '_2',
 'mytokens',
 'filtered_tokens',
 'rawFeatures',
 'vectorizedFeatures',
 'rawPrediction',
 'probability',
 'prediction']

In [89]:


pred_ex1.select('course_title','rawPrediction','probability','prediction').show()

+--------------------+--------------------+--------------------+----------+
|        course_title|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|Building Machine ...|[12.2139940216172...|[0.99999904837362...|       0.0|
+--------------------+--------------------+--------------------+----------+



In [91]:

# Saving Model

model = "models/pyspark_lr_model"
lr_model.save(model)


In [92]:
# Loading pickled model via pipeline api

from pyspark.ml.pipeline import PipelineModel
df_model = PipelineModel.load(model)
