In [0]:
from pyspark.ml.feature import StopWordsRemover, Tokenizer, CountVectorizer
from pyspark.sql.functions import split, col, regexp_replace, lower, concat_ws
from pyspark.ml.linalg import Vectors
import numpy
import math

In [0]:
# Initialize a dataframe with our reviews and their corresponding sentiment
reviews_df = spark.read.option("header","true").option("inferSchema","true").option("quote", "\"").option("escape", "\"").csv("/FileStore/tables/IMDB_Dataset.csv")

In [0]:
#https://www.educative.io/answers/remove-all-the-punctuation-marks-from-a-sentence-using-regex
#https://sqlpatterns.wordpress.com/2017/08/18/three-strategies-for-replacing-multiple-spaces-with-single-ones/

# Remove any instances of line breaks from the reviews
edited_df = reviews_df.select(regexp_replace('review', r'<br />', ' ').alias("review"), "sentiment")
# Remove any instances of non-word, non-space, and non-digit characters in reviews
edited_df = edited_df.select(regexp_replace('review', '[^\s\w\d]', '').alias("review"), "sentiment")
# Remove any instances of two or more spaces from reviews
edited_df = edited_df.select(regexp_replace('review', '[ ][ ]+', ' ').alias("review"), "sentiment")
# Switch all words to lowercase to avoid any duplicity in tokenization later
edited_df = edited_df.select(lower('review').alias("review"), "sentiment")

In [0]:
# Split the reviews into a list format so that we can remove stopwords later on; keep sentiment information too
split_reviews = edited_df.select(split(edited_df.review, " ").alias('review_array'), "sentiment")

In [0]:
# Initialize a stopword remover with the review array as input
remover = StopWordsRemover(inputCol="review_array", outputCol="filtered_review_array")
# Transform split_df with the remover to get a list of words per review that have stopwords removed
stop_removed_df = remover.transform(split_reviews).select("filtered_review_array", "sentiment")

In [0]:
# Reconcatenate the list together to have a string version of our reviews. This will allow us to tokenize each review.
stop_removed_df = stop_removed_df.withColumn("filtered_review", concat_ws(" ", "filtered_review_array")).select("filtered_review", "sentiment")

In [0]:
# Make a tokenizer with filtered review as input
tokenizer = Tokenizer(inputCol="filtered_review", outputCol="words")
# Transform our stop_removed dataframe to obtain a final input of tokenized words we can count vectorize for Naive Bayes analysis
fin_df = tokenizer.transform(stop_removed_df).select("words", "sentiment")

In [0]:
# Create a count vectorizer to keep a bag-of-words count of our tokens
cv = CountVectorizer(inputCol="words", outputCol="features")
model = cv.fit(fin_df)
vectorized_df = model.transform(fin_df)

In [0]:
# Do a 70-30 split of our data into training and testing sets, respectively
train_test_split = vectorized_df.randomSplit([0.7, 0.3])
train_data = train_test_split[0]
test_data = train_test_split[1]

In [0]:
# Find P(class) for both sentiments (positive and negative)
# To do so, just calculate the number of records that are of a class / the total number of records
# This is our estimate of the prior associated with each class
num_pos = train_data.filter("sentiment == 'positive'").count()
num_neg = train_data.filter("sentiment == 'negative'").count()
p_pos = num_pos / (num_pos + num_neg)
p_neg = num_neg / (num_pos + num_neg)
print("P(Class = 'positive')\t", p_pos, "\nP(Class = 'negative')\t", p_neg)

P(Class = 'positive')	 0.501209069443259 
P(Class = 'negative')	 0.4987909305567409


In [0]:
# Split training set entries based off of class type
pos_entries = train_data.filter("sentiment == 'positive'")
neg_entries = train_data.filter("sentiment == 'negative'")

In [0]:
# Select features column to get sparse vector
# Convert sparse vector into ndarray that can be summed column-wise via a reduce function
# This will yield an array with each index corresponding to a different unique token 
# where the value at the index is the total count of its occurrence across all documents
pos_sum_arr = pos_entries.select("features").rdd.map(lambda x: x[0].toArray()).reduce(lambda x, y: x+y)
neg_sum_arr = neg_entries.select("features").rdd.map(lambda x: x[0].toArray()).reduce(lambda x, y: x+y)

In [0]:
# In order to obtain our P(Xi | Class), or our estimates of likelihoods, we should normalize these occurrence counts
# We do this for cases where we are trying to find P(Xi | Positive) and P(Xi | Negative)
normalized_pos_arr = pos_sum_arr / sum(pos_sum_arr)
normalized_neg_arr = neg_sum_arr / sum(neg_sum_arr)

In [0]:
# For easier computation further in our notebook, we save the log probabilities we obtained in the cell above
# These log probabilities can be summed later on, and probability values that don't contribute to the computation i.e. 0, 
# will be given a default value of 0s, which have no effect on a sum
# https://stackoverflow.com/questions/21752989/numpy-efficiently-avoid-0s-when-taking-logmatrix
log_pos_arr = numpy.log(normalized_pos_arr, out=numpy.zeros_like(normalized_pos_arr), where=(normalized_pos_arr!=0))
log_neg_arr = numpy.log(normalized_neg_arr, out=numpy.zeros_like(normalized_neg_arr), where=(normalized_neg_arr!=0))

In [0]:
# Custom function to classify a sparse vector of token occurrences (the review) into a sentiment
# The computation follows the parameter estimation model for multinomial Naive Bayes where our vector can have non-binary elements
# https://en.wikipedia.org/wiki/Naive_Bayes_classifier
def classify_arr(x):
    # Calculate the log probability of P(positive | X) and P(negative | X) excluding the denominator which is the same for both
    p_arr_pos = math.log(p_pos) + x.features.dot(log_pos_arr)
    p_arr_neg = math.log(p_neg) + x.features.dot(log_neg_arr)
    # If the probability of it being positive is higher, we predict the review as positive and vice-versa
    prediction = 1 if p_arr_pos > p_arr_neg else 0
    # We also relabel the sentiments as 1 or 0 depending on if it is positive or negative respectively.
    # Note that this is our truth / labels
    binarized_sentiment = 1 if x.sentiment == "positive" else 0
    # Return all necessary columns for further evaluation
    return (x.features, binarized_sentiment, prediction)

In [0]:
# Make a dataframe where features are passed through and used for predictions for each record. Store the results in a dataframe
predictions_df = test_data.select("features", "sentiment").rdd.map(classify_arr).toDF(["features", "sentiment", "prediction"])

In [0]:
# Find the number of records present in our testing set
num_records = predictions_df.count()

In [0]:
# Make a function that returns a 1 for correct predictions and a 0 otherwise
def isCorrect(x):
    return(1 if x.sentiment == x.prediction else 0)

In [0]:
# Calculate the number of predictions that were correct by summing values from isCorrect mapped over all predictions
num_correct = predictions_df.rdd.map(isCorrect).reduce(lambda x, y: x + y)

In [0]:
# Accuracy = number of correct records / total number of records
print("Accuracy =", (num_correct / num_records)*100, "%")

Accuracy = 64.3208296855007 %


In [0]:
print("Estimated Priors for both classes:")
print("P(Class = 'positive')\t", p_pos, "\nP(Class = 'negative')\t", p_neg)

Estimated Priors for both classes:
P(Class = 'positive')	 0.501209069443259 
P(Class = 'negative')	 0.4987909305567409


In [0]:
print("Estimated Likelihood values in vector form for both classes:")
# They are 1 x 167070 sized arrays
print("P(X|Class = 'positive')\t", normalized_pos_arr)
print("P(X|Class = 'negative')\t", normalized_neg_arr)

Estimated Likelihood values in vector form for both classes:
P(X|Class = 'positive')	 [1.20707563e-02 1.33454455e-02 8.64588138e-03 ... 0.00000000e+00
 0.00000000e+00 4.70191504e-07]
P(X|Class = 'negative')	 [1.66292163e-02 1.20871898e-02 8.53586897e-03 ... 4.84689624e-07
 0.00000000e+00 0.00000000e+00]
