In [1]:
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from textblob import TextBlob


#### PySpark libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.sql.types import StringType, IntegerType, DoubleType
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext

#### NLTK Pkg
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer


In [2]:
from pyspark.sql.types import (IntegerType, StringType, 
                               TimestampType, StructType,
                               StructField, ArrayType,
                               TimestampType)

import pyspark.sql.functions as F

## Loading Data Set and Combining 

In [3]:
%%time
data_ori = sqlContext.read.format('json').option('header', False).option('multiline', True).load('/dis_materials/part-01.json')
data2 = sqlContext.read.format('json').option('header', False).option('multiline', True).load('/dis_materials/part-02.json')
data3 = sqlContext.read.format('json').option('header', False).option('multiline', True).load('/dis_materials/part-03.json')
data4 = sqlContext.read.format('json').option('header', False).option('multiline', True).load('/dis_materials/part-04.json')
data5 = sqlContext.read.format('json').option('header', False).option('multiline', True).load('/dis_materials/part-05.json')
data6 = sqlContext.read.format('json').option('header', False).option('multiline', True).load('/dis_materials/part-06.json')


CPU times: user 18.8 ms, sys: 4.58 ms, total: 23.4 ms
Wall time: 1min 9s


In [4]:
%%time
#Joining all the data frames together into data
data_ori = data_ori.union(data2)
data_ori = data_ori.union(data3)
data_ori = data_ori.union(data4)
data_ori = data_ori.union(data5)
data_ori = data_ori.union(data6)

CPU times: user 0 ns, sys: 3.4 ms, total: 3.4 ms
Wall time: 128 ms


In [5]:
%%time
print("Data Frame Schema ")
data_ori.printSchema()

Data Frame Schema 
root
 |-- helpful: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- movie: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- review_date: string (nullable = true)
 |-- review_detail: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_summary: string (nullable = true)
 |-- reviewer: string (nullable = true)
 |-- spoiler_tag: long (nullable = true)

CPU times: user 2.96 ms, sys: 589 µs, total: 3.54 ms
Wall time: 25.3 ms


## Preporcessing For TextBlob & Vader

In [6]:
data_ori = data_ori.drop("helpful", "spoiler_tag","review_summary")

In [7]:
print("Data Frame Schema after droping columns")
data_ori.printSchema()

Data Frame Schema after droping columns
root
 |-- movie: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- review_date: string (nullable = true)
 |-- review_detail: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- reviewer: string (nullable = true)



In [40]:
print("Data Frame Rows and Columns")
print((data_ori.count(), len(data_ori.columns)))

Data Frame Rows and Columns
(5571499, 6)


In [8]:
%%time
data_ori.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_ori.columns]).show()


+-----+------+-----------+-------------+---------+--------+
|movie|rating|review_date|review_detail|review_id|reviewer|
+-----+------+-----------+-------------+---------+--------+
|    0|662849|          0|            0|        0|       0|
+-----+------+-----------+-------------+---------+--------+

CPU times: user 29.4 ms, sys: 24.1 ms, total: 53.5 ms
Wall time: 2min 18s


In [43]:
print("Droping Missing values")
data_ori = data_ori.dropna()
      

Droping Missing values


In [44]:
data_ori.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_ori.columns]).show()

+-----+------+-----------+-------------+---------+--------+
|movie|rating|review_date|review_detail|review_id|reviewer|
+-----+------+-----------+-------------+---------+--------+
|    0|     0|          0|            0|        0|       0|
+-----+------+-----------+-------------+---------+--------+



### Remove Special Characters 

In [14]:
%%time
df_clean = data_ori.select('review_id', (lower(regexp_replace('review_detail', "[^a-zA-Z\\s]", "")).alias('text')))


CPU times: user 5.56 ms, sys: 4.49 ms, total: 10 ms
Wall time: 33 ms


### Tokenization

In [17]:
%%time
tokenizer = Tokenizer(inputCol='text', outputCol='words_token')
df_words_token = tokenizer.transform(df_clean).select('review_id', 'words_token')

CPU times: user 6.7 ms, sys: 1.46 ms, total: 8.16 ms
Wall time: 57.4 ms


In [62]:
df_words_token.show()

+---------+--------------------+
|review_id|         words_token|
+---------+--------------------+
|rw1133942|[after, seeing, t...|
|rw1133943|[i, have, the, en...|
|rw1133946|[once, again, the...|
|rw1133948|[this, is, a, fil...|
|rw1133949|[chris, farley, i...|
|rw1133950|[i, love, this, a...|
|rw1133952|[excellent, good,...|
|rw1133953|[i, always, get, ...|
|rw1133954|[the, amityville,...|
|rw1133955|[several, friends...|
|rw1133956|[the, first, inst...|
|rw1133957|[how, on, earth, ...|
|rw1133958|[i, figure, that,...|
|rw1133959|[theres, a, websi...|
|rw1133960|[the, mansion, of...|
|rw1133961|[this, is, a, fav...|
|rw1133964|[contains, minor,...|
|rw1133965|[fabulous, film, ...|
|rw1133967|[this, is, a, ver...|
|rw1133968|[what, a, treat, ...|
+---------+--------------------+
only showing top 20 rows



### Remove stop words

In [19]:
%%time
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
df_words_no_stopw = remover.transform(df_words_token).select('review_id', 'words_clean')

CPU times: user 30 ms, sys: 20.2 ms, total: 50.3 ms
Wall time: 194 ms


In [64]:
df_words_no_stopw.show()

+---------+--------------------+
|review_id|         words_clean|
+---------+--------------------+
|rw1133942|[seeing, tarantin...|
|rw1133943|[entire, series, ...|
|rw1133946|[critics, prove, ...|
|rw1133948|[film, done, many...|
|rw1133949|[chris, farley, o...|
|rw1133950|[love, anime, ser...|
|rw1133952|[excellent, good,...|
|rw1133953|[always, get, ann...|
|rw1133954|[amityville, horr...|
|rw1133955|[several, friends...|
|rw1133956|[first, installme...|
|rw1133957|[earth, director,...|
|rw1133958|[figure, people, ...|
|rw1133959|[theres, website,...|
|rw1133960|[mansion, madness...|
|rw1133961|[favorite, mine, ...|
|rw1133964|[contains, minor,...|
|rw1133965|[fabulous, film, ...|
|rw1133967|[lighthearted, pr...|
|rw1133968|[treat, unearth, ...|
+---------+--------------------+
only showing top 20 rows



### Stem text

In [73]:
# Clean text
%%time
filter_length_udf = udf(lambda row: [x for x in row if len(x) >= 3], ArrayType(StringType()))
df_final_words = df_stemmed.withColumn('words', filter_length_udf(col('words_stemmed')))

In [74]:
df_final_words.show()

+---------+--------------------+--------------------+
|review_id|       words_stemmed|               words|
+---------+--------------------+--------------------+
|rw1133942|[see, tarantino, ...|[see, tarantino, ...|
|rw1133943|[entir, seri, vid...|[entir, seri, vid...|
|rw1133946|[critic, prove, m...|[critic, prove, m...|
|rw1133948|[film, done, mani...|[film, done, mani...|
|rw1133949|[chris, farley, o...|[chris, farley, o...|
|rw1133950|[love, anim, seri...|[love, anim, seri...|
|rw1133952|[excel, good, fai...|[excel, good, fai...|
|rw1133953|[alway, get, anno...|[alway, get, anno...|
|rw1133954|[amityvill, horro...|[amityvill, horro...|
|rw1133955|[sever, friend, m...|[sever, friend, m...|
|rw1133956|[first, instal, s...|[first, instal, s...|
|rw1133957|[earth, director,...|[earth, director,...|
|rw1133958|[figur, peopl, pr...|[figur, peopl, pr...|
|rw1133959|[there, websit, c...|[there, websit, c...|
|rw1133960|[mansion, mad, lo...|[mansion, mad, lo...|
|rw1133961|[favorit, mine, a

## TextBlob & Vader

In [45]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
analyser = SentimentIntensityAnalyzer()

def sentiment_analyzer_scores(sentence):
    score = analyser.polarity_scores(sentence)
    return score


def change_polarity(pol_val):
    if pol_val >0:
        return "positive"
    elif pol_val < 0:
        return "negative"
    else:
        return "neutral"
    
def sentement_checker(text):
    sent = TextBlob(text).sentiment.polarity
    return sent

In [46]:
%%time
print("Sentement Check")
checked_sentement = udf(lambda x: sentement_checker(x), DoubleType())

data_ori = data_ori.withColumn('Sentements_orig', checked_sentement('review_detail'))

Sentement Check
CPU times: user 10.4 ms, sys: 312 µs, total: 10.7 ms
Wall time: 49.4 ms


In [47]:
%%time
print("Polarity Check")
checked_polarity = udf(lambda x: change_polarity(x), StringType())

data_ori = data_ori.withColumn('Polarity_orig', checked_polarity('Sentements_orig'))

Polarity Check
CPU times: user 2.4 ms, sys: 8.43 ms, total: 10.8 ms
Wall time: 34.4 ms


In [48]:
%%time
print("Vader Score Check ")
sentiment_analyzer_scores_udf = udf(lambda x: sentiment_analyzer_scores(x), StringType())

data_ori = data_ori.withColumn('vader_score', sentiment_analyzer_scores_udf('review_detail'))

Vader Score Check 
CPU times: user 10.7 ms, sys: 16.1 ms, total: 26.8 ms
Wall time: 121 ms


In [49]:
print("New Schema")
data_ori.printSchema()

New Schema
root
 |-- movie: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- review_date: string (nullable = true)
 |-- review_detail: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- reviewer: string (nullable = true)
 |-- Sentements_orig: double (nullable = true)
 |-- Polarity_orig: string (nullable = true)
 |-- vader_score: string (nullable = true)



In [50]:
from pyspark.sql import functions as f

def generate_udf(constant_var="Correct"):
    def test(col1, col2):
        if col1 == "positive" and  int(col2) >= 5:
            return constant_var
        elif col1 == "negative" and  int(col2) < 5:
            return constant_var
        elif col1 == "neutral":
            return constant_var
        else:
            return "Incorrect"
    return f.udf(test, StringType())

def acc_pre(pol, rate):
    if rate >= 5 and pol == "positive":
        return "Correct"
    elif rate < 5 and pol == "negative":
        return "Correct"
    elif pol == "neutral":
        return"Correct"
    else:
        return "Incorrect"

In [51]:
%%time
data_ori = data_ori.withColumn('Results', generate_udf('Correct')(f.col('Polarity_orig'), f.col('rating')))

CPU times: user 9.04 ms, sys: 932 µs, total: 9.97 ms
Wall time: 35.9 ms


In [53]:
data_ori.printSchema()

root
 |-- movie: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- review_date: string (nullable = true)
 |-- review_detail: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- reviewer: string (nullable = true)
 |-- Sentements_orig: double (nullable = true)
 |-- Polarity_orig: string (nullable = true)
 |-- vader_score: string (nullable = true)
 |-- Results: string (nullable = true)



## Queries

In [55]:
%%time
data_ori.select(col("movie"),col("review_detail"), col("Sentements_orig"),col("Polarity_orig")).show()

+--------------------+--------------------+--------------------+-------------+
|               movie|       review_detail|     Sentements_orig|Polarity_orig|
+--------------------+--------------------+--------------------+-------------+
| After Life (2019– )|I enjoyed the fir...|  0.2727777777777778|     positive|
|The Valhalla Murd...|I know Iceland is...|-0.05833333333333333|     negative|
|Special OPS (2020– )|Except K K , no o...|            0.178125|     positive|
|   #BlackAF (2020– )|I'm guessing that...| 0.30694444444444446|     positive|
|  The Droving (2020)|Here's the truth....| 0.11935606060606062|     positive|
|All About Eve (1950)|Having seen this ...|  0.2208333333333333|     positive|
|Runaway Train (1985)|The movie had som...| 0.11562500000000002|     positive|
|Iron Fist (2017–2...|I loved it from t...|           0.2609375|     positive|
|The Half of It (I...|I see that Netfli...| 0.10127314814814814|     positive|
| This Is Us (2016– )|This is the show ...| -0.34666

In [57]:
%%time
data_ori.select(col("vader_score")).show(truncate=False)

+---------------------------------------------------+
|vader_score                                        |
+---------------------------------------------------+
|{neg=0.046, pos=0.388, compound=0.9916, neu=0.567} |
|{neg=0.186, pos=0.052, compound=-0.908, neu=0.762} |
|{neg=0.185, pos=0.238, compound=0.34, neu=0.577}   |
|{neg=0.074, pos=0.3, compound=0.948, neu=0.626}    |
|{neg=0.085, pos=0.068, compound=-0.6043, neu=0.848}|
|{neg=0.083, pos=0.146, compound=0.7469, neu=0.771} |
|{neg=0.041, pos=0.113, compound=0.7425, neu=0.846} |
|{neg=0.0, pos=0.33, compound=0.9295, neu=0.67}     |
|{neg=0.064, pos=0.219, compound=0.9962, neu=0.717} |
|{neg=0.082, pos=0.035, compound=-0.3869, neu=0.883}|
|{neg=0.0, pos=0.387, compound=0.9701, neu=0.613}   |
|{neg=0.084, pos=0.285, compound=0.9515, neu=0.63}  |
|{neg=0.061, pos=0.19, compound=0.7582, neu=0.749}  |
|{neg=0.0, pos=0.221, compound=0.5267, neu=0.779}   |
|{neg=0.263, pos=0.089, compound=-0.9916, neu=0.648}|
|{neg=0.238, pos=0.036, comp

In [None]:
%%time
data_ori.groupBy("Polarity_orig").count().show()

+-------------+-------+
|Polarity_orig|  count|
+-------------+-------+
|     positive|4024392|
|      neutral|  57527|
|     negative| 826731|
+-------------+-------+

CPU times: user 463 ms, sys: 228 ms, total: 691 ms
Wall time: 1h 7min 35s


In [59]:
%%time
temp_df = data_ori.groupBy("movie").count()

CPU times: user 4.31 ms, sys: 0 ns, total: 4.31 ms
Wall time: 20.7 ms


In [62]:
temp_df.sort(col("count").desc()).show()

+---------------------------+-----+
|                      movie|count|
+---------------------------+-----+
|       Avengers: Endgame...| 8673|
|         Dil Bechara (2020)| 7735|
|       The Shawshank Red...| 7720|
|       Game of Thrones: ...| 7261|
|       Captain Marvel (2...| 7060|
|       Wonder Woman 1984...| 6724|
|       The Dark Knight (...| 6612|
|                小丑 (2019)| 6450|
|       Star Wars: Episod...| 6354|
|       Mrs. Serial Kille...| 5377|
|STAR WARS：天行者的崛起 ...| 5082|
|       Gunjan Saxena: Th...| 4912|
|              Laxmii (2020)| 4714|
|       Star Wars: Episod...| 4627|
|       The Lord of the R...| 4627|
|       Game of Thrones (...| 4394|
|       Avengers: Infinit...| 4187|
|               Joker (2019)| 4135|
|             Aquaman (2018)| 4044|
|               Tenet (2020)| 3975|
+---------------------------+-----+
only showing top 20 rows



In [64]:
%%time
temp_df = data_ori.groupBy("rating").count()

CPU times: user 3.51 ms, sys: 0 ns, total: 3.51 ms
Wall time: 16.8 ms


In [65]:
temp_df.sort(col("count").desc()).show()

+------+-------+
|rating|  count|
+------+-------+
|    10|1178243|
|     8| 685214|
|     9| 591291|
|     7| 567048|
|     1| 498149|
|     6| 409353|
|     5| 312636|
|     4| 235062|
|     3| 223698|
|     2| 207956|
+------+-------+



In [None]:
%%time
temp_df = df_ori.groupBy("movie").count()

In [12]:
%%time
sentiment_transformer_udf = udf(lambda x: sentiment_transformer(x), StringType())

df_ori = df_ori.withColumn('Trans_Sentiments', sentiment_transformer_udf('review_detail'))

CPU times: user 10.9 ms, sys: 491 µs, total: 11.4 ms
Wall time: 92.4 ms


In [None]:
%%time
data_ori.groupBy("Results").count().show()

In [21]:
%%time
stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
df_stemmed = df_words_no_stopw.withColumn("words_stemmed", stemmer_udf("words_clean")).select('review_id', 'words_stemmed')

CPU times: user 6.73 ms, sys: 2.87 ms, total: 9.6 ms
Wall time: 41.1 ms


In [72]:
df_stemmed.show()

+---------+--------------------+
|review_id|       words_stemmed|
+---------+--------------------+
|rw1133942|[see, tarantino, ...|
|rw1133943|[entir, seri, vid...|
|rw1133946|[critic, prove, m...|
|rw1133948|[film, done, mani...|
|rw1133949|[chris, farley, o...|
|rw1133950|[love, anim, seri...|
|rw1133952|[excel, good, fai...|
|rw1133953|[alway, get, anno...|
|rw1133954|[amityvill, horro...|
|rw1133955|[sever, friend, m...|
|rw1133956|[first, instal, s...|
|rw1133957|[earth, director,...|
|rw1133958|[figur, peopl, pr...|
|rw1133959|[there, websit, c...|
|rw1133960|[mansion, mad, lo...|
|rw1133961|[favorit, mine, a...|
|rw1133964|[contain, minor, ...|
|rw1133965|[fabul, film, fun...|
|rw1133967|[lightheart, prod...|
|rw1133968|[treat, unearth, ...|
+---------+--------------------+
only showing top 20 rows



### NLTK

In [215]:
data =  df_ori.select(col("review_detail"),col("Polarity_orig"))

In [None]:
data = data.toPandas()

In [23]:
type(data)

pandas.core.frame.DataFrame

In [27]:
data.head()

Unnamed: 0,review_detail,Polarity_orig
0,"After seeing Tarantino's Kill Bill Vol: 1, I g...",positive
1,Once again the critics prove themselves as mor...,positive
2,This IS a film that has been done too many tim...,positive
3,Chris Farley is one of my favorite comedians a...,positive
4,"I love this anime series, my only complaint is...",positive


In [30]:
from sklearn.model_selection import train_test_split # function for splitting data to train and test sets

train, test = train_test_split(data,test_size = 0.1)


In [31]:
test

Unnamed: 0,review_detail,Polarity_orig
70535,The hoards of die-hard fans of the original mo...,positive
82617,Woman loves ape - but alas it's not to be. Nao...,positive
40327,It's been a long time since we've had a genuin...,positive
54350,"In 1947, after centuries of colonial rule and ...",positive
60554,I love this movie. It's everything I've come t...,positive
...,...,...
12414,Jean (Heather Locklear) moves every time she f...,positive
18320,Richard Pryor again plays the bumbling idiot i...,positive
40556,"The best thing to be said about ""A History of ...",positive
20136,After I finished watching Heathers for the fir...,positive


In [32]:
train

Unnamed: 0,review_detail,Polarity_orig
45877,I was reluctant to see this movie because of a...,positive
3587,"OK, we saw this film by default rather than de...",positive
78887,I saw this movie expecting to see another clev...,positive
15006,What we have here is an attempt to make a funn...,positive
28214,Anyone who saw the original with Gene Wilder a...,positive
...,...,...
61013,I really don't get why this film rates that hi...,positive
71841,"Quite frankly, I think this film is beyond the...",positive
8238,This is by far one of the best movies EVER! Th...,positive
34076,"This is the version of ""Jeopardy!"" I REALLY re...",positive


In [33]:
train = train[train.Polarity_orig != "Neutral"]


In [34]:
train

Unnamed: 0,review_detail,Polarity_orig
45877,I was reluctant to see this movie because of a...,positive
3587,"OK, we saw this film by default rather than de...",positive
78887,I saw this movie expecting to see another clev...,positive
15006,What we have here is an attempt to make a funn...,positive
28214,Anyone who saw the original with Gene Wilder a...,positive
...,...,...
61013,I really don't get why this film rates that hi...,positive
71841,"Quite frankly, I think this film is beyond the...",positive
8238,This is by far one of the best movies EVER! Th...,positive
34076,"This is the version of ""Jeopardy!"" I REALLY re...",positive


-----

In [21]:
CONTRACTION_MAP = {
    "ain't": "is not",
    "aren't": "are not",
    "can't": "cannot",
    "can't've": "cannot have",
    "'cause": "because",
    "could've": "could have",
    "couldn't": "could not",
    "couldn't've": "could not have",
    "didn't": "did not",
    "doesn't": "does not",
    "don't": "do not",
    "hadn't": "had not",
    "hadn't've": "had not have",
    "hasn't": "has not",
    "haven't": "have not",
    "he'd": "he would",
    "he'd've": "he would have",
    "he'll": "he will",
    "he'll've": "he he will have",
    "he's": "he is",
    "how'd": "how did",
    "how'd'y": "how do you",
    "how'll": "how will",
    "how's": "how is",
    "I'd": "I would",
    "I'd've": "I would have",
    "I'll": "I will",
    "I'll've": "I will have",
    "I'm": "I am",
    "I've": "I have",
    "i'd": "i would",
    "i'd've": "i would have",
    "i'll": "i will",
    "i'll've": "i will have",
    "i'm": "i am",
    "i've": "i have",
    "isn't": "is not",
    "it'd": "it would",
    "it'd've": "it would have",
    "it'll": "it will",
    "it'll've": "it will have",
    "it's": "it is",
    "let's": "let us",
    "ma'am": "madam",
    "mayn't": "may not",
    "might've": "might have",
    "mightn't": "might not",
    "mightn't've": "might not have",
    "must've": "must have",
    "mustn't": "must not",
    "mustn't've": "must not have",
    "needn't": "need not",
    "needn't've": "need not have",
    "o'clock": "of the clock",
    "oughtn't": "ought not",
    "oughtn't've": "ought not have",
    "shan't": "shall not",
    "sha'n't": "shall not",
    "shan't've": "shall not have",
    "she'd": "she would",
    "she'd've": "she would have",
    "she'll": "she will",
    "she'll've": "she will have",
    "she's": "she is",
    "should've": "should have",
    "shouldn't": "should not",
    "shouldn't've": "should not have",
    "so've": "so have",
    "so's": "so as",
    "that'd": "that would",
    "that'd've": "that would have",
    "that's": "that is",
    "there'd": "there would",
    "there'd've": "there would have",
    "there's": "there is",
    "they'd": "they would",
    "they'd've": "they would have",
    "they'll": "they will",
    "they'll've": "they will have",
    "they're": "they are",
    "they've": "they have",
    "to've": "to have",
    "wasn't": "was not",
    "we'd": "we would",
    "we'd've": "we would have",
    "we'll": "we will",
    "we'll've": "we will have",
    "we're": "we are",
    "we've": "we have",
    "weren't": "were not",
    "what'll": "what will",
    "what'll've": "what will have",
    "what're": "what are",
    "what's": "what is",
    "what've": "what have",
    "when's": "when is",
    "when've": "when have",
    "where'd": "where did",
    "where's": "where is",
    "where've": "where have",
    "who'll": "who will",
    "who'll've": "who will have",
    "who's": "who is",
    "who've": "who have",
    "why's": "why is",
    "why've": "why have",
    "will've": "will have",
    "won't": "will not",
    "won't've": "will not have",
    "would've": "would have",
    "wouldn't": "would not",
    "wouldn't've": "would not have",
    "y'all": "you all",
    "y'all'd": "you all would",
    "y'all'd've": "you all would have",
    "y'all're": "you all are",
    "y'all've": "you all have",
    "you'd": "you would",
    "you'd've": "you would have",
    "you'll": "you will",
    "you'll've": "you will have",
    "you're": "you are",
    "you've": "you have"
}

In [24]:
%%time
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="token", outputCol="filtered")
df_clean = remover.transform(df_clean)

In [25]:
df4.show()

+--------------------+------+------------+--------------------+---------+-------------------+--------------------+--------------------+
|               movie|rating| review_date|       review_detail|review_id|           reviewer|               token|            filtered|
+--------------------+------+------------+--------------------+---------+-------------------+--------------------+--------------------+
|Kill Bill: Vol. 2...|     8|24 July 2005|After seeing Tara...|rw1133942|OriginalMovieBuff21|[after, seeing, t...|[seeing, tarantin...|
|Journey to the Un...|  null|24 July 2005|I have the entire...|rw1133943|           sentra14|[i, have, the, en...|[entire, series, ...|
|   The Island (2005)|     9|24 July 2005|Once again the cr...|rw1133946|  GreenwheelFan2002|[once, again, the...|[critics, prove, ...|
|Win a Date with T...|     3|24 July 2005|This IS a film th...|rw1133948|     itsascreambaby|[this, is, a, fil...|[film, done, many...|
|Saturday Night Li...|    10|24 July 2005|Chris 

In [26]:
df_filtered = df4.select(col("filtered"))

In [27]:
df4.printSchema()

root
 |-- movie: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- review_date: string (nullable = true)
 |-- review_detail: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- reviewer: string (nullable = true)
 |-- token: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)



## Preprocessing For Logistic Regression

In [79]:
data_ori.printSchema()

root
 |-- helpful: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- movie: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- review_date: string (nullable = true)
 |-- review_detail: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_summary: string (nullable = true)
 |-- reviewer: string (nullable = true)
 |-- spoiler_tag: long (nullable = true)



In [76]:
%%time
def target_gen(rate):
    if int(rate) > 5:
        return 1
    elif int(rate) < 5:
        return -1
    else:
        return 0

CPU times: user 16 µs, sys: 3 µs, total: 19 µs
Wall time: 22.4 µs


In [81]:
from pyspark.sql.functions import lit
data_ori = data_ori.withColumn("target", lit(0))

In [85]:
df_ML = data_ori.select(col("review_detail"), col("target"))

In [86]:
df_ML.dropna()

DataFrame[review_detail: string, target: int]

In [87]:
df_ML.printSchema()

root
 |-- review_detail: string (nullable = true)
 |-- target: integer (nullable = false)



In [88]:
df_ML.show()

+--------------------+------+
|       review_detail|target|
+--------------------+------+
|I enjoyed the fir...|     0|
|I know Iceland is...|     0|
|Except K K , no o...|     0|
|I'm guessing that...|     0|
|Here's the truth....|     0|
|Having seen this ...|     0|
|The movie had som...|     0|
|I loved it from t...|     0|
|I see that Netfli...|     0|
|This is the show ...|     0|
|This is a fun and...|     0|
|A suspenseful thr...|     0|
|Highlight was Cam...|     0|
|A lot of excuses ...|     0|
|A fenomel animati...|     0|
|Some Kind Of Hate...|     0|
|I actually liked ...|     0|
|Well, I just fini...|     0|
|Ah Indies done by...|     0|
|Everybody should ...|     0|
+--------------------+------+
only showing top 20 rows



In [89]:
%%time
(train_set, val_set, test_set) = df_ML.randomSplit([0.98, 0.01, 0.01], seed = 2000)

CPU times: user 3.6 ms, sys: 589 µs, total: 4.19 ms
Wall time: 31.2 ms


## Logistic Regression Using Hashing:

In [None]:
%%time
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression


tokenizer = Tokenizer(inputCol="review_detail", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)

lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
accuracy

CPU times: user 186 ms, sys: 41.3 ms, total: 227 ms
Wall time: 8min 37s


1.0

## Logistic Regression Using CountVectorizing:

In [None]:
%%time
from pyspark.ml.feature import CountVectorizer

tokenizer = Tokenizer(inputCol="review_detail", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_set)
predictions = pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())

print("Accuracy Score: {0:.4f}".format(accuracy))

Accuracy Score: 1.0000
CPU times: user 180 ms, sys: 32.9 ms, total: 213 ms
Wall time: 8min 2s


## Logistic Regression Using CountVectorizing with Ngrams:

In [92]:
from pyspark.ml.feature import NGram, VectorAssembler
def build_ngrams_wocs(inputCol=["review_detail","target"], n=3):
    tokenizer = [Tokenizer(inputCol="review_detail", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+lr)

In [None]:
%%time
trigramwocs_pipelineFit = build_ngrams_wocs().fit(train_set)
predictions_wocs = trigramwocs_pipelineFit.transform(val_set)
accuracy_wocs = predictions_wocs.filter(predictions_wocs.label == predictions_wocs.prediction).count() / float(val_set.count())
print("Accuracy Score: {0:.4f}".format(accuracy_wocs))


Accuracy Score: 1.0000
CPU times: user 1.06 s, sys: 374 ms, total: 1.43 s
Wall time: 1h 23min 5s
