# IMDB Movie Reviews
### Justin Farnsworth (farnswj1@tcnj.edu)

In [1]:
# Imported libraries
import numpy as np
import pandas as pd
import string
import random
from nltk.corpus import stopwords
from collections import Counter
from sklearn.metrics import confusion_matrix
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, udf
from pyspark.ml.feature import Tokenizer, HashingTF, StringIndexer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
# Set the random seed
random.seed(3)

In [3]:
# Create a Spark session
spark = SparkSession.builder.appName("IMDB Movie Reviews").getOrCreate()
spark

In [4]:
# Load the dataset through Pandas (Spark loads the data incorrectly)
df = pd.read_csv("IMDB_Dataset.zip", compression="zip")
df.head()

Unnamed: 0,review,sentiment
0,One of the other reviewers has mentioned that ...,positive
1,A wonderful little production. <br /><br />The...,positive
2,I thought this was a wonderful way to spend ti...,positive
3,Basically there's a family where a little boy ...,negative
4,"Petter Mattei's ""Love in the Time of Money"" is...",positive


In [5]:
# Load the dataset into Spark
reviews = spark.createDataFrame(df)
reviews.show(5)

+--------------------+---------+
|              review|sentiment|
+--------------------+---------+
|One of the other ...| positive|
|A wonderful littl...| positive|
|I thought this wa...| positive|
|Basically there's...| negative|
|Petter Mattei's "...| positive|
+--------------------+---------+
only showing top 5 rows



In [6]:
# Print the schema
reviews.printSchema()

root
 |-- review: string (nullable = true)
 |-- sentiment: string (nullable = true)



In [7]:
# Count the number of cases for each classification
df["sentiment"].value_counts()

positive    25000
negative    25000
Name: sentiment, dtype: int64

In [8]:
# Text Processor class (will be used in the upcoming Spark pipeline)
class TextProcessor(Transformer):
    # List of stopwords (without punctuation)
    set_of_stopwords = {
        ''.join(char for char in word if char not in string.punctuation) for word in stopwords.words("english") + ["br"]
    }
    
    # Constructor
    def __init__(self, inputCol):
        super(TextProcessor, self).__init__()
        self.inputCol = inputCol
    
    
    # Transformation function
    def _transform(self, df):
        f = udf(lambda x: self.process(x), StringType())
        return df.withColumn(self.inputCol, f(self.inputCol))
    
    
    # Remove punctuations and stopwords from the messsage
    def process(self, text):
        # Remove any punctuations
        nopunc = ''.join(char for char in text if char not in string.punctuation).lower()
        
        # Remove any stopwords
        return ' '.join(word for word in nopunc.split() if word.lower() not in self.set_of_stopwords)

In [9]:
# Initialize the text processor and clean the messages
test_processor = TextProcessor(inputCol="review")
cleaned_text = df["review"].apply(test_processor.process)
cleaned_text

0        one reviewers mentioned watching 1 oz episode ...
1        wonderful little production filming technique ...
2        thought wonderful way spend time hot summer we...
3        basically theres family little boy jake thinks...
4        petter matteis love time money visually stunni...
                               ...                        
49995    thought movie right good job creative original...
49996    bad plot bad dialogue bad acting idiotic direc...
49997    catholic taught parochial elementary schools n...
49998    im going disagree previous comment side maltin...
49999    one expects star trek movies high art fans exp...
Name: review, Length: 50000, dtype: object

In [10]:
# Get the most common words across all of the reviews
Counter(' '.join(cleaned_text).split()).most_common(50)

[('movie', 83510),
 ('film', 74453),
 ('one', 51024),
 ('like', 38989),
 ('good', 28569),
 ('even', 24572),
 ('would', 24024),
 ('time', 23257),
 ('really', 22948),
 ('see', 22534),
 ('story', 22090),
 ('much', 18947),
 ('well', 18791),
 ('get', 18204),
 ('great', 17819),
 ('also', 17816),
 ('bad', 17704),
 ('people', 17538),
 ('first', 17154),
 ('movies', 15453),
 ('made', 15415),
 ('make', 15305),
 ('films', 15285),
 ('could', 15155),
 ('way', 15000),
 ('characters', 14676),
 ('think', 14215),
 ('watch', 13567),
 ('many', 13369),
 ('seen', 13055),
 ('two', 13019),
 ('character', 12920),
 ('never', 12874),
 ('love', 12570),
 ('acting', 12471),
 ('plot', 12365),
 ('little', 12328),
 ('best', 12324),
 ('know', 12267),
 ('show', 12029),
 ('life', 11684),
 ('ever', 11623),
 ('better', 11044),
 ('still', 10740),
 ('say', 10623),
 ('end', 10537),
 ('scene', 10527),
 ('man', 10291),
 ('scenes', 10177),
 ('something', 9802)]

In [11]:
# Split the data into a training set and a test set
train_set, test_set = reviews.randomSplit([0.8, 0.2], 3)

In [12]:
# Set up the tokenizer and text vector
tokenizer = Tokenizer(inputCol="review", outputCol="tokens")
word_hash = HashingTF(inputCol="tokens", outputCol="features")

In [13]:
# Use string indexing to convert the sentiment values into integers
sentiment_indexer = StringIndexer(inputCol="sentiment", outputCol="label")

In [14]:
# Use Naive Bayes to predict the sentiments
nb = NaiveBayes(featuresCol="features", labelCol="label")

In [15]:
# Create a pipeline
pipeline = Pipeline(stages=[test_processor, tokenizer, word_hash, sentiment_indexer, nb])

In [16]:
# Fit the model
model = pipeline.fit(train_set)

In [17]:
# Make the predictions
predictions = model.transform(test_set).select(col("label"), col("prediction"))
predictions.show()

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       1.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  0.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  0.0|       1.0|
+-----+----------+
only showing top 20 rows



In [18]:
# Use an evaluator to measure the performance of the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator.evaluate(predictions)

0.8547519426180514

In [19]:
# Show the confusion matrix
predictions_pandas = predictions.toPandas()
confusion_matrix(predictions_pandas["label"], predictions_pandas["prediction"])

array([[4356,  645],
       [ 813, 4224]], dtype=int64)

In [20]:
# Terminate the Spark session
spark.stop()