## Text Classification with PySpark 
- Multiclass Text Classification
Task
- Predict the subject category given a course title or text

In [1]:
import pyspark
from pyspark import SparkContext

In [2]:
sc = SparkContext(master='local[2]')

22/03/22 16:56:57 WARN Utils: Your hostname, iamhimanshu0 resolves to a loopback address: 127.0.1.1; using 192.168.43.239 instead (on interface wlo1)
22/03/22 16:56:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/03/22 16:56:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# lunch UI
sc

In [4]:
# create spark seassion
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Text Classifier").getOrCreate()

In [5]:
# read the dataset and load
df = spark.read.csv('udemy.csv',header=True, inferSchema=True)
df.show(5)

                                                                                

+---+---------+--------------------+--------------------+-------+-----+---------------+-----------+------------+------------------+----------------+--------------------+----------------+--------------------+
|_c0|course_id|        course_title|                 url|is_paid|price|num_subscribers|num_reviews|num_lectures|             level|content_duration| published_timestamp|         subject|  clean_course_title|
+---+---------+--------------------+--------------------+-------+-----+---------------+-----------+------------+------------------+----------------+--------------------+----------------+--------------------+
|  0|  1070968|Ultimate Investme...|https://www.udemy...|   True|  200|           2147|         23|          51|        All Levels|       1.5 hours|2017-01-18T20:58:58Z|Business Finance|Ultimate Investme...|
|  1|  1113822|Complete GST Cour...|https://www.udemy...|   True|   75|           2792|        923|         274|        All Levels|        39 hours|2017-03-09T16:34:20Z

22/03/22 17:00:43 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , course_id, course_title, url, is_paid, price, num_subscribers, num_reviews, num_lectures, level, content_duration, published_timestamp, subject, clean_course_title
 Schema: _c0, course_id, course_title, url, is_paid, price, num_subscribers, num_reviews, num_lectures, level, content_duration, published_timestamp, subject, clean_course_title
Expected: _c0 but found: 
CSV file: file:///home/himanshu/Desktop/sparkLearn/getStarted/TextClassification/udemy.csv


In [6]:
df.columns

['_c0',
 'course_id',
 'course_title',
 'url',
 'is_paid',
 'price',
 'num_subscribers',
 'num_reviews',
 'num_lectures',
 'level',
 'content_duration',
 'published_timestamp',
 'subject',
 'clean_course_title']

In [9]:
df = df.select('course_title','subject')

In [10]:
df.show(5)

+--------------------+----------------+
|        course_title|         subject|
+--------------------+----------------+
|Ultimate Investme...|Business Finance|
|Complete GST Cour...|Business Finance|
|Financial Modelin...|Business Finance|
|Beginner to Pro -...|Business Finance|
|How To Maximize Y...|Business Finance|
+--------------------+----------------+
only showing top 5 rows



In [20]:
df.groupby('subject').count().sort("count",ascending=False).show()



+--------------------+-----+
|             subject|count|
+--------------------+-----+
|     Web Development| 1200|
|    Business Finance| 1198|
| Musical Instruments|  676|
|      Graphic Design|  603|
|                null|    6|
|Multiply returns ...|    1|
|play Electric Gui...|    1|
|Learn Play Fernan...|    1|
|Introduction Guit...|    1|
|Learn Classical G...|    1|
|Aprende tocar el ...|    1|
+--------------------+-----+



                                                                                

In [22]:
# getting values count using pandas
# df.toPandas()['subject'].value_counts()

In [23]:
# check for missing values
df.toPandas()['subject'].isnull().sum()

6

In [24]:
# drop missing values
df = df.dropna(subset= ['subject'])

In [25]:
# check for missing values
df.toPandas()['subject'].isnull().sum()

0

In [26]:
df.show(5)

+--------------------+----------------+
|        course_title|         subject|
+--------------------+----------------+
|Ultimate Investme...|Business Finance|
|Complete GST Cour...|Business Finance|
|Financial Modelin...|Business Finance|
|Beginner to Pro -...|Business Finance|
|How To Maximize Y...|Business Finance|
+--------------------+----------------+
only showing top 5 rows



### Feature Extraction

build features 
+ count vectorizer
+ tfIDF
+ wordEmbeddings
+ hashingTF
+ etc...

We have 2 things in Pipeline stages
- Transformer
- Estimator

**Transformer** (Data to Data)

Function that takes data and fit, transform them into augmented data or features
i.e Extractors, Vectorizer, Scalers (Tokenizer, StopwordRemover, CountVectorizer, IDF)

**Estimator** (Data to model)

Function that takes data as input and fit the data and produces a model we can use to predict
i.e LogisticRegression

In [29]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer

In [28]:
# dir(pyspark.ml.feature)

In [30]:
# Stages for the pipeline
tokenizer = Tokenizer(inputCol='course_title', outputCol='mytokens')
stopwordRemover = StopWordsRemover(inputCol='mytokens',outputCol='filtered_tokens')
vectorizer = CountVectorizer(inputCol='filtered_tokens',outputCol='rawFeatures')
idf = IDF(inputCol='rawFeatures', outputCol='vectorizedFeatures')

In [31]:
# work on taget variable (subject)
# label encoding/indexing
labelEncoder = StringIndexer(inputCol='subject',outputCol='label').fit(df)

                                                                                

In [32]:
labelEncoder.transform(df).show(5)

+--------------------+----------------+-----+
|        course_title|         subject|label|
+--------------------+----------------+-----+
|Ultimate Investme...|Business Finance|  1.0|
|Complete GST Cour...|Business Finance|  1.0|
|Financial Modelin...|Business Finance|  1.0|
|Beginner to Pro -...|Business Finance|  1.0|
|How To Maximize Y...|Business Finance|  1.0|
+--------------------+----------------+-----+
only showing top 5 rows



In [34]:
# labelEncoder.labels
# making dict to labels
label_dict =  {
 'Web Development':0.0,
 'Business Finance':1.0,
 'Musical Instruments':2.0,
 'Graphic Design':3.0
}

In [36]:
df = labelEncoder.transform(df)
df.show(5)

+--------------------+----------------+-----+
|        course_title|         subject|label|
+--------------------+----------------+-----+
|Ultimate Investme...|Business Finance|  1.0|
|Complete GST Cour...|Business Finance|  1.0|
|Financial Modelin...|Business Finance|  1.0|
|Beginner to Pro -...|Business Finance|  1.0|
|How To Maximize Y...|Business Finance|  1.0|
+--------------------+----------------+-----+
only showing top 5 rows



In [37]:
# split dataset
(train_df, test_df) = df.randomSplit((0.7,0.3),seed=42)

In [38]:
train_df.show(2)

+--------------------+-------------------+-----+
|        course_title|            subject|label|
+--------------------+-------------------+-----+
|#1 Piano Hand Coo...|Musical Instruments|  2.0|
|#10 Hand Coordina...|Musical Instruments|  2.0|
+--------------------+-------------------+-----+
only showing top 2 rows



In [39]:
# machine learning model (Estimator) (data to model)
from pyspark.ml.classification import LogisticRegression

In [40]:
lr = LogisticRegression(featuresCol='vectorizedFeatures',
                        labelCol = 'label'
                        )

### Building the pipeline


In [41]:
from pyspark.ml import Pipeline

In [42]:
pipeline = Pipeline(
    stages=[tokenizer, stopwordRemover, vectorizer, idf, lr]
)

In [43]:
pipeline.stages

Param(parent='Pipeline_7911e419495d', name='stages', doc='a list of pipeline stages')

In [44]:
# model building
lr_model = pipeline.fit(train_df)

22/03/22 17:36:47 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/03/22 17:36:47 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


In [45]:
lr_model

PipelineModel_e1cdf1893748

In [46]:
# get predicction on test data
predictions = lr_model.transform(test_df)

In [49]:
# predictions.show()
predictions.columns

['course_title',
 'subject',
 'label',
 'mytokens',
 'filtered_tokens',
 'rawFeatures',
 'vectorizedFeatures',
 'rawPrediction',
 'probability',
 'prediction']

In [51]:
predictions.select('rawPrediction', 'probability','subject','label','prediction').show(10)

+--------------------+--------------------+-------------------+-----+----------+
|       rawPrediction|         probability|            subject|label|prediction|
+--------------------+--------------------+-------------------+-----+----------+
|[8.30964874634511...|[0.87877993991729...|Musical Instruments|  2.0|       0.0|
|[-1.3744065857781...|[1.90975343878318...|Musical Instruments|  2.0|       2.0|
|[0.60822716351824...|[3.28451283099288...|Musical Instruments|  2.0|       2.0|
|[-1.0584564885297...|[3.70732079181542...|   Business Finance|  1.0|       1.0|
|[24.6296077836821...|[0.99999999906211...|    Web Development|  0.0|       0.0|
|[22.0136686708729...|[0.99999999049941...|    Web Development|  0.0|       0.0|
|[19.9225858177008...|[0.99999995276066...|    Web Development|  0.0|       0.0|
|[-5.7386799100009...|[5.78822181193782...|Musical Instruments|  2.0|       2.0|
|[-19.060576929776...|[1.71813778453453...|     Graphic Design|  3.0|       3.0|
|[-2.4736166619785...|[1.845

### model evaluation
+ Accuracy
+ Precision
+ F1Score
+ etc

In [52]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [53]:
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction',labelCol='label')

In [55]:
accuracy = evaluator.evaluate(predictions)
accuracy*100

91.70354557551958

In [56]:
"""
Method 2:
 precision, f1score classification report
"""
from pyspark.mllib.evaluation import MulticlassMetrics

In [57]:
lr_metric = MulticlassMetrics(predictions['label','prediction'].rdd)

                                                                                

In [62]:
print("Accuracy ", lr_metric.accuracy)
print("precision ", lr_metric.precision(1.0))
print("f1Score ", lr_metric.fMeasure(1.0))
print("recall ", lr_metric.recall(1.0))

Accuracy  0.9182509505703422
precision  0.9544159544159544
f1Score  0.9178082191780822
recall  0.8839050131926122


#### Confusion matrix
- convert to pandas
- sklearn

In [64]:
y_true = predictions.select('label')
y_true = y_true.toPandas()

y_predict = predictions.select('prediction')
y_predict = y_predict.toPandas()

In [66]:
from sklearn.metrics import confusion_matrix, classification_report

In [67]:
cm = confusion_matrix(y_true, y_predict)
cm

array([[319,  12,   1,   4,   0,   0],
       [ 11, 335,   3,   2,   0,   0],
       [  8,  13, 156,   1,   0,   0],
       [  8,  17,   4, 156,   0,   0],
       [  0,   1,   0,   0,   0,   0],
       [  0,   1,   0,   0,   0,   0]])

#### making prediction on one sample
+ sample as df
+ apply pipeline

In [69]:
from pyspark.sql.types import StringType

In [72]:
exl = spark.createDataFrame([ 
    ("Building Machine Learning Apps with Python and PySpark", StringType())
], 
#column name
['course_title']
)
exl.show()

+--------------------+---+
|        course_title| _2|
+--------------------+---+
|Building Machine ...| {}|
+--------------------+---+



In [73]:
# show fill
exl.show(truncate=False)

+------------------------------------------------------+---+
|course_title                                          |_2 |
+------------------------------------------------------+---+
|Building Machine Learning Apps with Python and PySpark|{} |
+------------------------------------------------------+---+



In [75]:
# making prediction
prediction_ex1 = lr_model.transform(exl)
prediction_ex1.show(truncate=True)

+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|        course_title| _2|            mytokens|     filtered_tokens|         rawFeatures|  vectorizedFeatures|       rawPrediction|         probability|prediction|
+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Building Machine ...| {}|[building, machin...|[building, machin...|(3669,[57,79,115,...|(3669,[57,79,115,...|[14.7174498131555...|[0.99999814636182...|       0.0|
+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+



In [76]:
prediction_ex1.columns

['course_title',
 '_2',
 'mytokens',
 'filtered_tokens',
 'rawFeatures',
 'vectorizedFeatures',
 'rawPrediction',
 'probability',
 'prediction']

In [78]:
prediction_ex1.select('course_title','rawPrediction','probability','prediction').show()

+--------------------+--------------------+--------------------+----------+
|        course_title|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|Building Machine ...|[14.7174498131555...|[0.99999814636182...|       0.0|
+--------------------+--------------------+--------------------+----------+



In [79]:
label_dict

{'Web Development': 0.0,
 'Business Finance': 1.0,
 'Musical Instruments': 2.0,
 'Graphic Design': 3.0}

In [80]:
# save and load the model
modelPath  = "models/pyspark_lr_model"
lr_model.write().save(modelPath)

                                                                                

In [81]:
# loading pickled model 
from pyspark.ml.pipeline import PipelineModel

presistedModel  =PipelineModel.load(modelPath)

In [82]:
# laodModel
# making prediction
loadModel = presistedModel.transform(exl)
loadModel.show(truncate=True)

+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|        course_title| _2|            mytokens|     filtered_tokens|         rawFeatures|  vectorizedFeatures|       rawPrediction|         probability|prediction|
+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Building Machine ...| {}|[building, machin...|[building, machin...|(3669,[57,79,115,...|(3669,[57,79,115,...|[14.7174498131555...|[0.99999814636182...|       0.0|
+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+



In [83]:
loadModel.select('course_title','rawPrediction','probability','prediction').show()

+--------------------+--------------------+--------------------+----------+
|        course_title|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|Building Machine ...|[14.7174498131555...|[0.99999814636182...|       0.0|
+--------------------+--------------------+--------------------+----------+

