# Binary Text Classification using Spark ML in Pyspark


Applying machine learning algorithms to massive datasets is challenging because most of the top machine learning algorithms are not designed for parallel architectures.
1. Traditionally, Data scientists used python and R tools to process data on a single machine where the movement of data becomes time consuming. Here analysis requires sampling which often does not accurately represent the data, and moving from development to production environments requires extensive re-engineering.
MLlib is Spark’s library of machine learning functions. spark.mllib contains the original API built on top of RDDs. spark.ml whereas provides a higher level API built on top of DataFrames for constructing ML pipelines and is the primary Machine Learning API for Spark at the moment.
1. Spark stores the large datasets in cluster memory and can run the iterative algorithms without having to sync multiple times to the disk, making them run faster.
2. Using MLlib, one can access HDFS (Hadoop Data File System) Hive and HBase, in addition to local files. This enables MLlib to be easily plugged into Hadoop workflows, so in a single platform you have it all!
To check how it makes our life easy, let’s do a sentiment classification of Amazon fine food review dataset using Spark ML (https://www.kaggle.com/snap/amazon-fine-food-reviews)

## Importing the Needed Libraries

In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 70kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 37.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=2e82fa6c696779f9df23ecb2cfc62c78478d28527c1c867e1da9312987658fa1
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#importing the needed libraries

from pyspark.sql.types import StructType,StructField,DoubleType
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import CountVectorizer
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
from pyspark.ml.feature import HashingTF, IDF, Tokenizer,StopWordsRemover
import gensim
import gensim.parsing.preprocessing as gsp
from pyspark.sql.types import StringType
from gensim import utils

## Data Exploration

In [None]:
#inititation of spark session
spark = SparkSession.builder.appName("Final_Project").getOrCreate()
amazon_df = spark.read.csv("/content/drive/MyDrive/Amazon Food Reviews (Kaggle)/Reviews.csv",header = True,inferSchema = True)
amazon_df.show(5)

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|
|  2|B00813GRG4|A1D87F6ZCVE5NK|              dll pa|                   0|                     0|    1|1346976000|   Not as Advertised|"Product arrived ...|
|  3|B000LQOCH0| ABXLMWJIXXAIN|"Natalia Corres "...|                   1|                     1|    4|1219017600|"""Delight"" says...|"This is a confec...|
|  4|B000UA0QIQ|A395BORC6FGVXV|                Karl|            

In [None]:
spark.catalog.clearCache()


In [None]:
amazon_df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- UserId: string (nullable = true)
 |-- ProfileName: string (nullable = true)
 |-- HelpfulnessNumerator: string (nullable = true)
 |-- HelpfulnessDenominator: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Text: string (nullable = true)



We can observe the Score column in StringType, and we will need it to convert it into an Integer or float type going forward.

In [None]:
amazon_df.count()

568454

Dataset consists of reviews of fine foods from amazon. Spans over a period of more than 10 years, including all ~500,000 reviews upto October 12. At the end of this study, given a review we should be able to predict whether the sentiment behind is Positive or Negative as accurately as possible.

In [None]:
#The review scores given by customers have 5 different values, let's convert them to binary 
# if Score>3 --> Positive i.e 1 else Negative i.e 0

def sentiment(x):
  return 1 if str(x)>str(3) else 0

# convert the return values of the funtion to integer since they are strings
function = udf(sentiment, IntegerType())

amazon_df = amazon_df.withColumn('Score', function('Score'))
amazon_df.show(5)

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    1|1303862400|Good Quality Dog ...|I have bought sev...|
|  2|B00813GRG4|A1D87F6ZCVE5NK|              dll pa|                   0|                     0|    0|1346976000|   Not as Advertised|"Product arrived ...|
|  3|B000LQOCH0| ABXLMWJIXXAIN|"Natalia Corres "...|                   1|                     1|    1|1219017600|"""Delight"" says...|"This is a confec...|
|  4|B000UA0QIQ|A395BORC6FGVXV|                Karl|            

In [None]:
amazon_df.groupBy('Score').count().show()

+-----+------+
|Score| count|
+-----+------+
|    1|442001|
|    0|126453|
+-----+------+



In [None]:
#checking for duplicates and dropping them

if amazon_df.count() > amazon_df.dropDuplicates(['UserId','ProfileName','Time','Text']).count():
    print ('Data has duplicates')
print ("Row count Now:",amazon_df.count())
amazon_df = amazon_df.dropDuplicates(['UserId','ProfileName','Time','Text'])
print ("After Removing the duplicates, row count becomes:", amazon_df.count())


#showing the new distribution of scores
amazon_df.groupBy('Score').count().show()

Data has duplicates
Row count Now: 568454
After Removing the duplicates, row count becomes: 393559
+-----+------+
|Score| count|
+-----+------+
|    1|305599|
|    0| 87960|
+-----+------+



Dropping unwanted columns and keeping the ID, SCORE and TEXT columns

In [None]:
cols = ('ProductId','HelpfulnessNumerator','HelpfulnessDenominator', 'UserId','Summary','ProfileName','Time')

amazon_df=amazon_df.drop(*cols)

In [None]:
amazon_df.show(5)

+---+-----+--------------------+
| Id|Score|                Text|
+---+-----+--------------------+
| 32|    1|This offer is a g...|
|282|    1|This is one of th...|
|641|    1|I bought this tea...|
|899|    1|The product is al...|
|909|    1|This is my son's ...|
+---+-----+--------------------+
only showing top 5 rows



Since the data consists of 300K+ rows after filtering, a lot of errors were generated when trying to tokenize and vectorize the Text column, accordingly by trial and error we kept 46255 rows

In [None]:
amazon_df1=amazon_df.filter(amazon_df.Id <= 50000)

In [None]:
amazon_df1.count()

46255

The next step is to start the tokenization process and remove stopwords.

In [None]:
tokenization=Tokenizer(inputCol='Text',outputCol='tokens')
tokenized_df=tokenization.transform(amazon_df1)
stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')
refined_text_df=stopword_removal.transform(tokenized_df)

Since we are now dealing with tokens only instead of an entire review, it would make sense to capture the number of tokens in each review. Accordingly, We create another column (token count) that gives the number of tokens in each row.

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import *
len_udf = udf(lambda s: len(s), IntegerType())
refined_text_df = refined_text_df.withColumn("token_count",len_udf(col('refined_tokens')))
refined_text_df.show(5)

+---+-----+--------------------+--------------------+--------------------+-----------+
| Id|Score|                Text|              tokens|      refined_tokens|token_count|
+---+-----+--------------------+--------------------+--------------------+-----------+
| 32|    1|This offer is a g...|[this, offer, is,...|[offer, great, pr...|         11|
|282|    1|This is one of th...|[this, is, one, o...|[one, best, tasti...|         13|
|641|    1|I bought this tea...|[i, bought, this,...|[bought, tea, alt...|         13|
|899|    1|The product is al...|[the, product, is...|[product, says, g...|         53|
|909|    1|This is my son's ...|[this, is, my, so...|[son's, favorite,...|         31|
+---+-----+--------------------+--------------------+--------------------+-----------+
only showing top 5 rows



Now that we have the refined tokens after stopword removal,  we use a countvectorizer for feature vectorization for the Machine Learning Model.

In [None]:
count_vec=CountVectorizer(inputCol='refined_tokens',outputCol='features')
cv_text_df=count_vec.fit(refined_text_df).transform(refined_text_df)
cv_text_df.select(['refined_tokens','token_count','features','Score']).show(10)

+--------------------+-----------+--------------------+-----+
|      refined_tokens|token_count|            features|Score|
+--------------------+-----------+--------------------+-----+
|[offer, great, pr...|         11|(109985,[2,5,37,3...|    1|
|[one, best, tasti...|         13|(109985,[4,19,26,...|    1|
|[bought, tea, alt...|         13|(109985,[12,34,54...|    1|
|[product, says, g...|         53|(109985,[0,1,6,10...|    1|
|[son's, favorite,...|         31|(109985,[5,7,16,2...|    1|
|["i, bought, prod...|         44|(109985,[0,1,3,10...|    1|
|["i, know, super,...|         27|(109985,[4,7,10,1...|    0|
|[love, cookies,, ...|         11|(109985,[7,30,73,...|    0|
|[husband, diabete...|         16|(109985,[0,5,23,1...|    1|
|[really, good., t...|         15|(109985,[11,14,22...|    1|
+--------------------+-----------+--------------------+-----+
only showing top 10 rows



In [None]:
model_text_df=cv_text_df.select(['features','token_count','Score'])

Once we have the feature vector for each row, we can make use of
VectorAssembler to create input features for the machine learning model.

In [None]:
from pyspark.ml.feature import VectorAssembler
df_assembler = VectorAssembler(inputCols=['features','token_count'],outputCol='features_vec')
model_text_df = df_assembler.transform(model_text_df)
model_text_df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- token_count: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- features_vec: vector (nullable = true)



We can use any of the classification models on this data, but we
proceed with training the Logistic Regression Model.

In [None]:
from pyspark.ml.classification import LogisticRegression
training_df,test_df=model_text_df.randomSplit([0.8,0.2])

To validate the presence of enough records for both classes in the train and test dataset, we can apply the groupBy function on the Label column.

In [None]:
training_df.groupBy('Score').count().show()

+-----+-----+
|Score|count|
+-----+-----+
|    1|28346|
|    0| 8734|
+-----+-----+



In [None]:
test_df.groupBy('Score').count().show()

+-----+-----+
|Score|count|
+-----+-----+
|    1| 7029|
|    0| 2146|
+-----+-----+



In [None]:
#fit our model on the training set

log_reg=LogisticRegression(featuresCol='features_vec',labelCol='Score').fit(training_df)

After training the model, we evaluate the performance of the model on
the test dataset.

In [None]:
results=log_reg.evaluate(test_df).predictions
results.show()

+--------------------+-----------+-----+--------------------+--------------------+--------------------+----------+
|            features|token_count|Score|        features_vec|       rawPrediction|         probability|prediction|
+--------------------+-----------+-----+--------------------+--------------------+--------------------+----------+
|(109985,[0,1,2,3,...|        162|    1|(109986,[0,1,2,3,...|[-808.96930895599...|           [0.0,1.0]|       1.0|
|(109985,[0,1,2,13...|         38|    0|(109986,[0,1,2,13...|[124.898249192134...|           [1.0,0.0]|       0.0|
|(109985,[0,1,3,5,...|         40|    1|(109986,[0,1,3,5,...|[-369.97989620157...|[2.08820275557210...|       1.0|
|(109985,[0,1,3,6,...|         21|    0|(109986,[0,1,3,6,...|[111.969117973450...|           [1.0,0.0]|       0.0|
|(109985,[0,1,3,10...|         44|    1|(109986,[0,1,3,10...|[-80.102113705153...|[1.62964882958989...|       1.0|
|(109985,[0,1,5,6,...|         18|    1|(109986,[0,1,5,6,...|[32.1010915639252..

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
true_postives = results[(results.Score == 1) & (results.prediction == 1)].count()
true_negatives = results[(results.Score == 0) & (results.prediction == 0)].count()
false_positives = results[(results.Score == 0) &(results.prediction == 1)].count()
false_negatives = results[(results.Score == 1) & (results.prediction == 0)].count()
recall = float(true_postives)/(true_postives + false_negatives)
precision = float(true_postives) / (true_postives +false_positives)
accuracy=float((true_postives+true_negatives) /(results.count()))

In [None]:
print(true_postives)

5989


In [None]:
print(true_negatives)

1240


In [None]:
print(false_positives)

906


In [None]:
print(false_negatives)

1040


In [None]:
print(recall)

0.8520415421823873


In [None]:
print(precision)

0.868600435097897


In [None]:
print(accuracy)

0.7879019073569482


# TF-IDF

For the TF-IDF featurization we decided to reduce further the number of rows due to the large amount of errors generated. The final dataset consisted of 19310 rows.
The idea behind TF-IDF scheme is the fact that words having a high frequency of occurrence in one document, and less frequency of occurrence in all the other documents, are more crucial for classification.

In [None]:
amazon_df2=amazon_df.filter(amazon_df.Id <= 20000)

In [None]:
amazon_df2.count()

19310

In [None]:
amazon_df2.show(5)

+---+-----+--------------------+
| Id|Score|                Text|
+---+-----+--------------------+
| 32|    1|This offer is a g...|
|282|    1|This is one of th...|
|641|    1|I bought this tea...|
|899|    1|The product is al...|
|909|    1|This is my son's ...|
+---+-----+--------------------+
only showing top 5 rows



In [None]:
#splitting the data 80% for train and 20% for test
train_df, test_df = amazon_df2.randomSplit([0.8, 0.2])

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline


tokenizer = Tokenizer(inputCol="Text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),outputCol="rawFeatures")
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features",minDocFreq=5)

pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])

# Fit the pipeline to training documents.
model1 = pipeline.fit(train_df)

train_df = model1.transform(train_df)
test_df = model1.transform(test_df)

print ("few rows from test df")
test_df.show(3)

few rows from test df
+----+-----+--------------------+--------------------+--------------------+--------------------+
|  Id|Score|                Text|               words|         rawFeatures|            features|
+----+-----+--------------------+--------------------+--------------------+--------------------+
|1019|    1|"I bought this pr...|["i, bought, this...|(262144,[7391,190...|(262144,[7391,190...|
|2369|    0|I love these cook...|[i, love, these, ...|(262144,[19036,22...|(262144,[19036,22...|
|7053|    1|this filters are ...|[this, filters, a...|(262144,[19036,27...|(262144,[19036,27...|
+----+-----+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows



Using Logistic Regression as our classifier

In [None]:
def logistic_reg(train_data, test_data):
    # Create initial Logistic regression model
    
    lr = LogisticRegression(labelCol="Score", featuresCol="features")
    model = lr.fit(train_data)
    predictions = model.transform(test_data)
    evaluator = BinaryClassificationEvaluator(labelCol="Score")
    accuracy = evaluator.evaluate(predictions)
    print ("Accuracy of Logistic Regression Classifier : %g" % accuracy)

logistic_reg(train_df, test_df)

Accuracy of Logistic Regression Classifier : 0.813048


We can see that the accuracy is not great but not bad either. Let’s try some hyper-parameter tuning on our classifier.

In [None]:
def logisticCV(train_df,test_df):
    lr = LogisticRegression(labelCol="Score", featuresCol="features")
    pipeline = Pipeline(stages= [lr])
    paramGrid = (ParamGridBuilder()
                     .addGrid(lr.regParam, [0.01, 0.5, 2.0])
                     .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
                     .addGrid(lr.maxIter, [1, 5, 10])
                     .build())
    evaluator = BinaryClassificationEvaluator(labelCol="Score")
    crossValidator = CrossValidator(estimator=pipeline,
                                        evaluator=evaluator,
                                        estimatorParamMaps=paramGrid,
                                        numFolds=5)
    # use the Logistic Regression Classifier to train (fit) the model
    # and Get the best Logistic Regression model

    cv = crossValidator.fit(train_df)
    best_model = cv.bestModel.stages[0]

    prediction = best_model.transform(test_df)
    acc = evaluator.evaluate(prediction)
    print ("The test's accuracy with cross validation = %g" % acc)   
logisticCV(train_df,test_df)

The test's accuracy with cross validation = 0.895345


Our accuracy improved!

# Word2Vec

Let's try now the word2vec featurization and let’s see if there is any change in the model accuracy.

In [None]:
from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="Text",outputCol="words")
w2v = Word2Vec(vectorSize=300, minCount=0, inputCol="words", outputCol="features")
doc2vec_pipeline = Pipeline(stages=[tokenizer,w2v])
doc2vec_model = doc2vec_pipeline.fit(train_df)
train_df = doc2vec_model.transform(train_df)
test_df = doc2vec_model.transform(test_df)
print ("few rows from train df")
train_df.show(3)

few rows from train df
+---+-----+--------------------+--------------------+--------------------+
| Id|Score|                Text|               words|            features|
+---+-----+--------------------+--------------------+--------------------+
| 32|    1|This offer is a g...|[this, offer, is,...|[0.03636288617174...|
|282|    1|This is one of th...|[this, is, one, o...|[-0.0149411184258...|
|641|    1|I bought this tea...|[i, bought, this,...|[-0.0097574412660...|
+---+-----+--------------------+--------------------+--------------------+
only showing top 3 rows



In [None]:
def logistic_reg(train_data, test_data):
    # Create initial Logistic regression model
    
    lr = LogisticRegression(labelCol="Score", featuresCol="features")
    model = lr.fit(train_data)
    predictions = model.transform(test_data)
    evaluator = BinaryClassificationEvaluator(labelCol="Score")
    accuracy = evaluator.evaluate(predictions)
    print ("Accuracy of Logistic Regression Classifier : %g" % accuracy)
    
logistic_reg(train_df, test_df)

Accuracy of Logistic Regression Classifier : 0.828797


In [None]:
def logisticCV(train_df,test_df):
    lr = LogisticRegression(labelCol="Score", featuresCol="features")
    pipeline = Pipeline(stages= [lr])
    paramGrid = (ParamGridBuilder()
                     .addGrid(lr.regParam, [0.01, 0.5, 2.0])
                     .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
                     .addGrid(lr.maxIter, [1, 5, 10])
                     .build())
    evaluator = BinaryClassificationEvaluator(labelCol="Score")
    crossValidator = CrossValidator(estimator=pipeline,
                                        evaluator=evaluator,
                                        estimatorParamMaps=paramGrid,
                                        numFolds=5)
    # use the Logistic Regression Classifier to train (fit) the model
    # and Get the best Logistic Regression model

    cv = crossValidator.fit(train_df)
    best_model = cv.bestModel.stages[0]

    prediction = best_model.transform(test_df)
    acc = evaluator.evaluate(prediction)
    print ("The test's accuracy with cross validation = %g" % acc)  
logisticCV(train_df,test_df)

The test's accuracy with cross validation = 0.811879


In [None]:
from pyspark.ml.classification import RandomForestClassifier
def RandomForest(train_df,test_df):
    rf = RandomForestClassifier(labelCol="Score", featuresCol="features")
    model = rf.fit(train_df)
    predictions = model.transform(test_df)
    evaluator = BinaryClassificationEvaluator(labelCol="Score")
    accuracy = evaluator.evaluate(predictions)
    print ("Accuracy of Random Forest Classifierr : %g" % accuracy)
RandomForest(train_df,test_df)

Accuracy of Random Forest Classifierr : 0.77072
