# Multi-Class Text Classification with PySpark and Doc2Vec

In this notebook, we utilize Apache Spark's machine learning library (MLlib) with PySpark to tackle NLP problem and how to simulate Doc2Vec inside Spark envioronment

Apache Spark is a famous distributed competiting system to to scale up any data processing solutions. Spark also provides a Machine-learning powered library called 'MLlib'.

Before going ahead, we need to know what is ‘Doc2Vec’. It is an NLP model to describe a text or document. It converts a text into a vector of numerical features to be used in any ML algorithm. Basically, it is a feature engineering technique. It tries to understand the context of documents by random sampling of words and trains a neural network with those. Hidden layer vectors of the neural network become document vectors a.k.a ‘Doc2Vec’. There is another technique called ‘Word2Vec’ which also works on similar principals. But instead of documents/texts, it works on word corpus and provides vectors for words. \

Reference: https://medium.com/wisio/a-gentle-introduction-to-doc2vec-db3e8c0cce5e

## 1. Set-up PySpark

In [None]:
import os
import pandas as pd

In [None]:
#we use the findspark library to locate spark on our local machine
import findspark
findspark.init()
# findspark.init('C:/Users/bokhy/spark/spark-2.4.6-bin-hadoop2.7')

In [3]:
# PySpark local setup 
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkContext

spark = SparkSession.builder.appName('treecode').getOrCreate()

# OR

##### State the maximum memory u want Spark to use on yr machine.

# MAX_MEMORY = "5g"
# spark = SparkSession.builder \
#                     .appName('multi_class_text_classifiter')\
#                     .config("spark.executor.memory", MAX_MEMORY) \
#                     .config("spark.driver.memory", MAX_MEMORY) \
#                     .getOrCreate()

In [4]:
spark

## 2. Load Data

We will use the ‘Sentence Classification Set’ from the UCI Machine Learning Repository. This one contains a total of 3297 labeled sentences spread across different files
Data can be downloaded from this [Link](https://archive.ics.uci.edu/ml/datasets/Sentence+Classification)
When downloded, the zip file give 3 different folders: labeled articles, unlabeled articles, and word lists.
We are going to only use 'labeled articles' folder in this case.

In [5]:
from pyspark.sql.types import StructType, StructField, StringType

# Define the datatype of d column
schema = StructType([
    StructField("value", StringType(), True)
])

total_df = spark.createDataFrame([], schema)

# Import the data files from the stated folder
for file_name in os.listdir("C:/Users/user/Documents/Python Programming/Py2/sentence+classification/SentenceCorpus/labeled_articles"):
    df = spark.read.option("header", "true").text('C:/Users/user/Documents/Python Programming/Py2/sentence+classification/SentenceCorpus/labeled_articles/' + file_name)
    total_df = total_df.union(df)    # turn the data file into a dataframe

In [6]:
total_df.show()

+--------------------+
|               value|
+--------------------+
|    ### abstract ###|
|MISC\tThe Minimum...|
|MISC\tIf the unde...|
|MISC\tFor MDL, in...|
|AIMX\tWe show tha...|
|OWNX\tWe derive a...|
|OWNX\tThis implie...|
|OWNX\tWe discuss ...|
|### introduction ###|
|MISC\t``Bayes mix...|
|CONT\tIn many cas...|
|MISC\tThe MDL or ...|
|MISC\tIn practice...|
|MISC\tHow good ar...|
|MISC\tThis questi...|
|MISC\tIn many cas...|
|MISC\tIn particul...|
|MISC\tAssume that...|
|MISC\tThen for Ba...|
|MISC\tThis corres...|
+--------------------+
only showing top 20 rows



In [10]:
# Lets view the rows well
total_df.take(15)

[Row(value='### abstract ###'),
 Row(value='MISC\tThe Minimum Description Length principle for online sequence estimation/prediction in a proper learning setup is studied'),
 Row(value='MISC\tIf the underlying model class is discrete, then the total expected square loss is a particularly interesting performance measure: (a) this quantity is finitely bounded, implying convergence with probability one, and (b) it additionally specifies the convergence speed'),
 Row(value='MISC\tFor MDL, in general one can only have loss bounds which are finite but exponentially larger than those for Bayes mixtures'),
 Row(value='AIMX\tWe show that this is even the case if the model class contains only Bernoulli distributions'),
 Row(value='OWNX\tWe derive a new upper bound on the prediction error for countable Bernoulli classes'),
 Row(value='OWNX\tThis implies a small bound (comparable to the one for Bayes mixtures) for certain important model classes'),
 Row(value='OWNX\tWe discuss the application to M

In [7]:
# Total number of records
total_df.count()

3297

## 3. Data Preprocessing

Dataset contains unuseful texts or characters like ‘### abstract ###’ & ‘### introduction ###’, or ' '' ' . 
This dataset is not yet divided into separate ‘label’ & ‘content’ column which is very common for classification problems. So, this has to be cleaned & divided into ‘label’ & ‘content’ columns for us to use for training.

In [11]:
import re

# Using a helper function to create only 1 split btw the 1st and 2nd words so we can structure d 1st word under a column and the 2nd word down to the last word under a differnt column
# eg split only MISC\tThe Minimum so that MISC is under a column and The Minimum....... is under a diff column
def process_line(x):
    line = x['value']
    parts = re.split("\s+",line,1)     # create just 1 split (ie number of split should be only 1) using white space as delimiter as such we split only MISC\tThe Minimum
    sub_parts = re.split('--', parts[0])
    parts_1 = ''
    if len(sub_parts) > 1:
        parts_1 = sub_parts[1] + ' ' + parts[1]
    else:
        parts_1 = parts[1]
    return ([sub_parts[0],parts_1])

In Spark, they use special dataframe object called 'RDD'.  
RDD was the primary user-facing API in Spark since its inception. At the core, an RDD is an immutable distributed collection of elements of your data, partitioned across nodes in your cluster that can be operated in parallel with a low-level API that offers transformations and actions.   

So we turn the data into RDD format while using the helper function created above

In [12]:
input_rdd = total_df.rdd.filter(lambda x : x['value'] not in ['### introduction ###','### abstract ###']).map(lambda x : process_line(x))
# we filter the data so that the function will be used on all text values except '### introduction ###' and '### abstract ###' texts

Lets structure the data where '1' is the label and '2' is the actual text for our problem. Now we can use this dataset for actual problem-solving.

In [16]:
input_df = input_rdd.toDF()

input_df.show()

+----+--------------------+
|  _1|                  _2|
+----+--------------------+
|MISC|The Minimum Descr...|
|MISC|If the underlying...|
|MISC|For MDL, in gener...|
|AIMX|We show that this...|
|OWNX|We derive a new u...|
|OWNX|This implies a sm...|
|OWNX|We discuss the ap...|
|MISC|``Bayes mixture",...|
|CONT|In many cases how...|
|MISC|The MDL or MAP (m...|
|MISC|In practice, the ...|
|MISC|How good are the ...|
|MISC|This question has...|
|MISC|In many cases, an...|
|MISC|In particular the...|
|MISC|Assume that the o...|
|MISC|Then for Bayes mi...|
|MISC|This corresponds ...|
|MISC|For the MDL predi...|
|MISC|Note that in orde...|
+----+--------------------+
only showing top 20 rows



In [17]:
input_df.take(15)

[Row(_1='MISC', _2='The Minimum Description Length principle for online sequence estimation/prediction in a proper learning setup is studied'),
 Row(_1='MISC', _2='If the underlying model class is discrete, then the total expected square loss is a particularly interesting performance measure: (a) this quantity is finitely bounded, implying convergence with probability one, and (b) it additionally specifies the convergence speed'),
 Row(_1='MISC', _2='For MDL, in general one can only have loss bounds which are finite but exponentially larger than those for Bayes mixtures'),
 Row(_1='AIMX', _2='We show that this is even the case if the model class contains only Bernoulli distributions'),
 Row(_1='OWNX', _2='We derive a new upper bound on the prediction error for countable Bernoulli classes'),
 Row(_1='OWNX', _2='This implies a small bound (comparable to the one for Bayes mixtures) for certain important model classes'),
 Row(_1='OWNX', _2='We discuss the application to Machine Learning ta

In [19]:
# Check frequency count of d unique values in d stated column
input_df.groupBy('_1').count().show()

+----+-----+
|  _1|count|
+----+-----+
|OWNX|  867|
|CONT|  170|
|MISC| 1825|
|AIMX|  194|
|BASE|   61|
+----+-----+



## 4. Basic Text Cleaning
Before jumping into ‘Doc2Vec’ processing, basic text cleaning is necessary. A typical text cleaning involves the following steps
1. Conversion to lowercase \
2. Removal of punctuations \
3. Removal of integers, numbers \
4. Removal of extra spaces \
5. Removal of tags (like html, p>, etc) \
6. Removal of stop words (like ‘and’, ‘to’, ‘the’ etc) \
7. Stemming (Conversion of words to root form)

#### Genism is a famous library for Text Cleaning


In [21]:
import gensim.parsing.preprocessing as gsp
from gensim import utils

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


In [22]:
# using Genism functions
filters = [
           gsp.strip_tags, 
           gsp.strip_punctuation,
           gsp.strip_multiple_whitespaces,
           gsp.strip_numeric,
           gsp.remove_stopwords, 
           gsp.strip_short, 
           gsp.stem_text
          ]

# Creating Helper function for text cleaning
def clean_text(x):
    s = x[1]
    s = s.lower()
    s = utils.to_unicode(s)
    for f in filters:
        s = f(s)
    return (x[0],s)

##### Let's compare the text Before/After the text-cleaning we perform

In [26]:
# BEFORE Text Cleaning
input_df.take(1)[0][1]

'The Minimum Description Length principle for online sequence estimation/prediction in a proper learning setup is studied'

In [30]:
# AFTER Text Cleaning
clean_text(input_df.take(1)[0])[1]

'minimum descript length principl onlin sequenc estim predict proper learn setup studi'

##### By comparing BEFORE and AFTER, we see that though the ‘cleaned’ sentence is not grammatically correct anymore, still it holds the context which is very essential for ‘Doc2Vec’ processing

### Using the Helper Function to Clean all texts

In [32]:
cleaned_rdd = input_rdd.map(lambda x : clean_text(x))

cleaned_df = cleaned_rdd.toDF()

In [33]:
cleaned_df.show()

+----+--------------------+
|  _1|                  _2|
+----+--------------------+
|MISC|minimum descript ...|
|MISC|underli model cla...|
|MISC|mdl gener loss bo...|
|AIMX|case model class ...|
|OWNX|deriv new upper b...|
|OWNX|impli small bound...|
|OWNX|discuss applic ma...|
|MISC|bay mixtur solomo...|
|CONT|case bay mixtur c...|
|MISC|mdl map maximum p...|
|MISC|practic mdl estim...|
|MISC|good predict bay ...|
|MISC|question attract ...|
|MISC|case import quali...|
|MISC|particular squar ...|
|MISC|assum outcom spac...|
|MISC|bay mixtur predic...|
|MISC|correspond instan...|
|MISC|mdl predictor los...|
|MISC|note order mdl co...|
+----+--------------------+
only showing top 20 rows



## 5. Create a Model (ML Pipeline)

As of now, Apache Spark does not provide any API for ‘Doc2Vec’. But it provides a ‘Word2Vec’ transformer. It is based on the ‘Skip-Gram’ approach.

Let’s say, for our use case, one sentence has 5 words. Then, for example, a typical ‘Word2Vec’ will convert each word into a feature vector of size 100. In this case, a ‘Doc2Vec’ representation will be average of all these 100 length vectors and its length will also be 100. This is a simplified ‘average-out’ scheme of the ‘Doc2Vec’ model. We will use this average schemed ‘Word2Vec’ of Apache Spark as our ‘Doc2Vec’ model.

Our Machine Learning pipeline will consist of two stages

- A Tokenizer
- A ‘Word2Vec’ model

We will use Apache Spark Pipeline API for this.

In [34]:
from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer

In [35]:
tokenizer = Tokenizer(inputCol="_2", outputCol="tokens")     # tokenize each sentence into individual words

w2v = Word2Vec(vectorSize=300,
               minCount=0, 
               inputCol="tokens",
               outputCol="features")

In [36]:
doc2vec_pipeline = Pipeline(stages=[tokenizer,w2v])

doc2vec_model = doc2vec_pipeline.fit(cleaned_df)

doc2vecs_df = doc2vec_model.transform(cleaned_df)

In [None]:
# Doc2Vec contents
doc2vecs_df.show()

##### 'features' column is the actual ‘Doc2Vec’ dense vectors. We have used ‘Doc2Vec’ of size 300. Generally, the preferred size is kept between 100 and 300.

## 6. Train a Model and Evaluation

In [None]:
# Train/Test Split
w2v_train_df, w2v_test_df = doc2vecs_df.randomSplit([0.75, 0.25], seed = 623)

print("Training Dataset Count: " + str(w2v_train_df.count()))
print("Test Dataset Count: " + str(w2v_test_df.count()))

#### Our model will make predictions and score on the test set, and then we then look at the top accuracy

##### 1. Random-Forest Model

Spark MLlib does not understand typical categorical variables. For that our class labels (column '1') have to be converted into numeric values. 'StringIndexer' function from PyPspark does that for us.

Here also, we have to build a pipeline with the following stages:
- StringIndexer (input = '1', output = 'label')
- RandomForest Classifier (label column = 'label', features column = 'features'. This 'features' is coming from 'Doc2Vec' transformation)

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

si = StringIndexer(inputCol="_1", outputCol="label")
rf_classifier = RandomForestClassifier(labelCol="label", 
                                       featuresCol="features")

# Build Pipeline
rf_classifier_pipeline = Pipeline(stages=[si,rf_classifier])

# Start Training
rfModel = rf_classifier_pipeline.fit(w2v_train_df)

# Prediction on Test-set
rf_predictions = rfModel.transform(w2v_test_df)

# Evalutation the model
rf_model_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")

In [None]:
accuracy = rf_model_evaluator.evaluate(rf_predictions)
print("Accuracy = %g" % (accuracy))

##### 2. Logistic-Regression model

In [None]:
from pyspark.ml.classification import LogisticRegression

lr_classifier = LogisticRegression(family="multinomial", 
                                   maxIter=20, 
                                   regParam=0.3, 
                                   elasticNetParam=0)

# Build Pipeline
lr_classifier_pipeline = Pipeline(stages=[si,lr_classifier])

# Start Training
lrModel = lr_classifier_pipeline.fit(w2v_train_df)

# Prediction on Test-set
lr_predictions = lrModel.transform(w2v_test_df)

# Evalutation the model
lr_model_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")

In [None]:
accuracy = lr_model_evaluator.evaluate(lr_predictions)
print("Test-set Accuracy = %g" % (accuracy))

#### We can fine-tune by changing parameters in LogisticRegression() model