# Initialization

In [None]:
"""
Tasks:

1. Install dependencies
2. Import dependencies for Spark and SQL
3. Import dependencies for text preprocessing and machine learning
4. Setup environment
5. Create Spark and SparkSQL context

"""

#1
!apt-get update > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark > /dev/null
!pip install pyspark > /dev/null

#2
import os
import findspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

#3
from pyspark.sql.functions import col, regexp_replace, concat
from pyspark.ml.feature import CountVectorizer, RegexTokenizer,StopWordsRemover
from pyspark.ml.classification import LogisticRegression, NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator

#4
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"
findspark.init()

#5
spark = SparkSession.builder.master("local[*]").config('spark.ui.port', '4050').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

# Data

In [None]:
!gdown https://drive.google.com/uc?id=1DleRy50zMFGC-tDF5UAagu5laZETSA_v
!gdown https://drive.google.com/uc?id=1qjfI2dcXKQ_KWjoXnjbwsU3r0MXqEx2y

Downloading...
From: https://drive.google.com/uc?id=1DleRy50zMFGC-tDF5UAagu5laZETSA_v
To: /content/train.csv
1.59GB [00:12, 153MB/s]
Downloading...
From: https://drive.google.com/uc?id=1qjfI2dcXKQ_KWjoXnjbwsU3r0MXqEx2y
To: /content/test.csv
176MB [00:03, 54.5MB/s]


In [None]:
trainset = spark.read.csv(f'/content/train.csv',header = False, inferSchema = False)
testset = spark.read.csv(f'/content/test.csv', header=False, inferSchema=False)

In [None]:
type(trainset)

pyspark.sql.dataframe.DataFrame

In [None]:
trainset = trainset.selectExpr("_c0 as Polarity", "_c1 as Review_title", "_c2 as Review_text")
testset = testset.selectExpr("_c0 as Polarity", "_c1 as Review_title", "_c2 as Review_text")

In [None]:
trainset.show(n=5)

+--------+--------------------+--------------------+
|Polarity|        Review_title|         Review_text|
+--------+--------------------+--------------------+
|       2|Stuning even for ...|This sound track ...|
|       2|The best soundtra...|I'm reading a lot...|
|       2|            Amazing!|"This soundtrack ...|
|       2|Excellent Soundtrack|I truly like this...|
|       2|Remember, Pull Yo...|If you've played ...|
+--------+--------------------+--------------------+
only showing top 5 rows



In [None]:
train_before=trainset.count()

In [None]:
test_before = testset.count()

# Data exploration and preprocessing

In [None]:
trainset = trainset.dropna()
train_after = trainset.count()
print(f"No. of rows dropped in the training set: {train_before-train_after}")

No. of rows dropped in the training set: 61


In [None]:
testset = testset.dropna()
test_after = testset.count()
print(f"No. of rows dropped in the testing set: {test_before-test_after}")

No. of rows dropped in the testing set: 5


In [None]:
trainset.groupBy('Polarity').count().show()

+--------+-------+
|Polarity|  count|
+--------+-------+
|       1|1799969|
|       2|1799970|
+--------+-------+



In [None]:
traindata = trainset.select('Polarity',concat('Review_title','Review_text').alias('Review'))
traindata.show(n=5)

+--------+--------------------+
|Polarity|              Review|
+--------+--------------------+
|       2|Stuning even for ...|
|       2|The best soundtra...|
|       2|Amazing!"This sou...|
|       2|Excellent Soundtr...|
|       2|Remember, Pull Yo...|
+--------+--------------------+
only showing top 5 rows



In [None]:
testdata = testset.select('Polarity',concat('Review_title','Review_text').alias('Review'))
testdata.show(n=5)

+--------+--------------------+
|Polarity|              Review|
+--------+--------------------+
|       2|Great CD"My lovel...|
|       2|One of the best g...|
|       1|Batteries died wi...|
|       2|works fine, but M...|
|       2|Great for the non...|
+--------+--------------------+
only showing top 5 rows



In [None]:
traindata = traindata.withColumn("Review",regexp_replace(col('Review'), '\d+', ''))
testdata = testdata.withColumn("Review",regexp_replace(col('Review'), '\d+', ''))

In [None]:
traindata.show(n=5)

+--------+--------------------+
|Polarity|              Review|
+--------+--------------------+
|       2|Stuning even for ...|
|       2|The best soundtra...|
|       2|Amazing!"This sou...|
|       2|Excellent Soundtr...|
|       2|Remember, Pull Yo...|
+--------+--------------------+
only showing top 5 rows



In [None]:
regex_tokenizer = RegexTokenizer(inputCol="Review", outputCol="Review_words", pattern="\\W")
trainset = regex_tokenizer.transform(traindata)
testset = regex_tokenizer.transform(testdata)
trainset.show(5)

+--------+--------------------+--------------------+
|Polarity|              Review|        Review_words|
+--------+--------------------+--------------------+
|       2|Stuning even for ...|[stuning, even, f...|
|       2|The best soundtra...|[the, best, sound...|
|       2|Amazing!"This sou...|[amazing, this, s...|
|       2|Excellent Soundtr...|[excellent, sound...|
|       2|Remember, Pull Yo...|[remember, pull, ...|
+--------+--------------------+--------------------+
only showing top 5 rows



In [None]:
remover = StopWordsRemover(inputCol="Review_words", outputCol="Review_filtered")
trainset = remover.transform(trainset)
testset = remover.transform(testset)

In [None]:
trainset.show(n=5)

+--------+--------------------+--------------------+--------------------+
|Polarity|              Review|        Review_words|     Review_filtered|
+--------+--------------------+--------------------+--------------------+
|       2|Stuning even for ...|[stuning, even, f...|[stuning, even, n...|
|       2|The best soundtra...|[the, best, sound...|[best, soundtrack...|
|       2|Amazing!"This sou...|[amazing, this, s...|[amazing, soundtr...|
|       2|Excellent Soundtr...|[excellent, sound...|[excellent, sound...|
|       2|Remember, Pull Yo...|[remember, pull, ...|[remember, pull, ...|
+--------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [None]:
cv = CountVectorizer(inputCol="Review_filtered", outputCol="features")
model = cv.fit(trainset)
trainset = model.transform(trainset)
trainset = trainset.withColumn("label",col("Polarity").cast("Integer")-1)
trainset.show(5)

+--------+--------------------+--------------------+--------------------+--------------------+-----+
|Polarity|              Review|        Review_words|     Review_filtered|            features|label|
+--------+--------------------+--------------------+--------------------+--------------------+-----+
|       2|Stuning even for ...|[stuning, even, f...|[stuning, even, n...|(262144,[10,13,18...|    1|
|       2|The best soundtra...|[the, best, sound...|[best, soundtrack...|(262144,[1,12,15,...|    1|
|       2|Amazing!"This sou...|[amazing, this, s...|[amazing, soundtr...|(262144,[4,7,29,4...|    1|
|       2|Excellent Soundtr...|[excellent, sound...|[excellent, sound...|(262144,[1,4,7,18...|    1|
|       2|Remember, Pull Yo...|[remember, pull, ...|[remember, pull, ...|(262144,[3,7,18,2...|    1|
+--------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [None]:
testset= model.transform(testset)
testset = testset.withColumn("label",col("Polarity").cast("Integer")-1)
testset.show(5)

+--------+--------------------+--------------------+--------------------+--------------------+-----+
|Polarity|              Review|        Review_words|     Review_filtered|            features|label|
+--------+--------------------+--------------------+--------------------+--------------------+-----+
|       2|Great CD"My lovel...|[great, cd, my, l...|[great, cd, lovel...|(262144,[0,1,2,3,...|    1|
|       2|One of the best g...|[one, of, the, be...|[one, best, game,...|(262144,[1,2,4,9,...|    1|
|       1|Batteries died wi...|[batteries, died,...|[batteries, died,...|(262144,[6,10,20,...|    0|
|       2|works fine, but M...|[works, fine, but...|[works, fine, mah...|(262144,[20,77,18...|    1|
|       2|Great for the non...|[great, for, the,...|[great, non, audi...|(262144,[2,10,16,...|    1|
+--------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



# Machine learning

In [None]:
nb = NaiveBayes(labelCol="label", featuresCol="features")
nbModel = nb.fit(trainset)

In [None]:
nb_predictions = nbModel.transform(testset)
nbEval = BinaryClassificationEvaluator()
print('Test Area Under ROC', nbEval.evaluate(nb_predictions))
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
nb_accuracy = evaluator.evaluate(nb_predictions)
print("Accuracy of NaiveBayes is = %g"% (nb_accuracy))

Test Area Under ROC 0.5415983999037577
Accuracy of NaiveBayes is = 0.846321


In [None]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=50)
lrModel = lr.fit(trainset)

In [None]:
lrPreds = lrModel.transform(trainset)
lrEval = BinaryClassificationEvaluator()
print('Test Area Under ROC', lrEval.evaluate(lrPreds))

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(lrPreds)
print("Accuracy of Logistic Regression is = %g"% (lr_accuracy))

Test Area Under ROC 0.9727471417669786
Accuracy of Logistic Regression is = 0.916696
