### Data Pre-processing

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.functions import collect_list

from nltk import download
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.tokenize import word_tokenize
from nltk import RegexpParser
from nltk import pos_tag

import string
import re
import itertools

#### Remove non ASCII characters

In [None]:
# remove non ASCII characters
def strip_non_ascii(data_str):
    ''' Returns the string without non ASCII characters'''
    stripped = (c for c in data_str if 0 < ord(c) < 127)
    return ''.join(stripped)

# setup pyspark udf function
strip_non_ascii_udf = udf(strip_non_ascii, StringType())

In [None]:
# applying the user defined function of removing non ASCII characters
df = df.withColumn('text_non_asci',strip_non_ascii_udf(df['review_body']))

#### Fixed abbreviation

In [None]:
# modify abbreviations
def fix_abbreviation(data_str):
    data_str = data_str.lower()
    data_str = re.sub(r'\bthats\b', 'that is', data_str)
    data_str = re.sub(r'\bive\b', 'i have', data_str)
    data_str = re.sub(r'\bim\b', 'i am', data_str)
    data_str = re.sub(r'\bya\b', 'yeah', data_str)
    data_str = re.sub(r'\bcant\b', 'can not', data_str)
    data_str = re.sub(r'\bdont\b', 'do not', data_str)
    data_str = re.sub(r'\bwont\b', 'will not', data_str)
    data_str = re.sub(r'\bid\b', 'i would', data_str)
    data_str = re.sub(r'wtf', 'what the fuck', data_str)
    data_str = re.sub(r'\bwth\b', 'what the hell', data_str)
    data_str = re.sub(r'\br\b', 'are', data_str)
    data_str = re.sub(r'\bu\b', 'you', data_str)
    data_str = re.sub(r'\bk\b', 'OK', data_str)
    data_str = re.sub(r'\bsux\b', 'sucks', data_str)
    data_str = re.sub(r'\bno+\b', 'no', data_str)
    data_str = re.sub(r'\bcoo+\b', 'cool', data_str)
    data_str = re.sub(r'rt\b', '', data_str)
    data_str = data_str.strip()
    return data_str

# setup pyspark udf function
fix_abbreviation_udf = udf(fix_abbreviation, StringType())

In [None]:
# applying the user defined function of modifying abbreviations
df = df.withColumn('text_fixed_abbrev',fix_abbreviation_udf(df['text_non_asci']))

#### Remove hyperlinks, puncuations, numbers, etc.

In [None]:
# remove hyperlinks, puncuations, numbers, etc.
def remove_features(data_str):
    # compile regex
    url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
    punc_re = re.compile('[%s]' % re.escape(string.punctuation))
    num_re = re.compile('(\\d+)')
    mention_re = re.compile('@(\w+)')
    alpha_num_re = re.compile("^[a-z0-9_.]+$")
    html_re = re.compile("<br />")
    # convert to lowercase
    data_str = data_str.lower()
    # remove hyperlinks
    data_str = url_re.sub(' ', data_str)
    # remove @mentions
    data_str = mention_re.sub(' ', data_str)
    # remove puncuation
    data_str = punc_re.sub(' ', data_str)
    # remove numeric 'words'
    data_str = num_re.sub(' ', data_str)
    # remove html symbol
    data_str = html_re.sub(' ', data_str)   
    # remove non a-z 0-9 characters and words shorter than 1 characters
    list_pos = 0
    cleaned_str = ''
    for word in data_str.split():
        if list_pos == 0:
            if alpha_num_re.match(word) and len(word) > 1:
                cleaned_str = word
            else:
                cleaned_str = ' '
        else:
            if alpha_num_re.match(word) and len(word) > 1:
                cleaned_str = cleaned_str + ' ' + word
            else:
                cleaned_str += ' '
        list_pos += 1
    # remove unwanted space, *.split() will automatically split on
    # whitespace and discard duplicates, the " ".join() joins the
    # resulting list into one string.
    return " ".join(cleaned_str.split())

# setup pyspark udf function
remove_features_udf = udf(remove_features, StringType())

In [None]:
# applying the user defined function of removing hyperlinks, punctuations, numbers, etc.
df = df.withColumn('text_feature_removed',remove_features_udf(df['text_fixed_abbrev']))

#### Group together the different inflected forms of a word 

- convert past tense and future tense into simple present tense
- convert plural form into singular form

In [None]:
# filter out the empty non-type values
df = df.where(df.text_feature_removed.isNotNull())

In [None]:
# Group together the different inflected forms of a word
def lemmatize(data_str):
    # expects a string
    list_pos = 0
    cleaned_str = ''
    lmtzr = WordNetLemmatizer()
    text = data_str.split()
    tagged_words = pos_tag(text)
    for word in tagged_words:
        if 'v' in word[1].lower():
            lemma = lmtzr.lemmatize(word[0], pos='v')
        else:
            lemma = lmtzr.lemmatize(word[0], pos='n')
        if list_pos == 0:
            cleaned_str = lemma
        else:
            cleaned_str = cleaned_str + ' ' + lemma
        list_pos += 1
    return cleaned_str

# setup pyspark udf function
lemmatize_udf = udf(lemmatize, StringType())

In [None]:
# applying the user defined function of lemmatizing words with different tenses and forms
lemm_df = df.withColumn("lemm_text", lemmatize_udf(df["text_feature_removed"]))

#### Mark up a word in a text as corresponding to a particular part of speech, based on both its definition and its context 

- Identify different part of the speech
- Combine patterns such as "noun + noun" and "adjective + noun"

In [None]:
# filter out the empty non-type values
lemm_df = lemm_df.where(lemm_df.lemm_text.isNotNull())

In [None]:
def tag_and_remove(data_str):
    cleaned_str = ' '
    # noun tags
    nn_tags = ['NN', 'NNP','NNS','NNP','NNPS']
    # adjectives
    jj_tags = ['JJ', 'JJR', 'JJS']
    # verbs
    vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']
    nltk_tags = nn_tags + jj_tags + vb_tags

    # break string into 'words'
    text = data_str.split()
    
    text_notype = []
    for w in text:
        if w is None:
            continue
        else:
            text_notype.append(w)

    # tag the text and keep only those with the right tags
    tagged_text = pos_tag(text_notype)
    
    for i in range(len(tagged_text)):
        if tagged_text[i][1] in nltk_tags:
            if i < len(tagged_text)-1:
                if (tagged_text[i][1] in nn_tags) and (tagged_text[i+1][1] in nn_tags):
                    cleaned_str += tagged_text[i][0] + '_'
                elif (tagged_text[i][1] in jj_tags) and (tagged_text[i+1][1] in nn_tags):
                    cleaned_str += tagged_text[i][0] + '_'  
                else:
                    cleaned_str += tagged_text[i][0] + ' '
#    for tagged_word in tagged_text:
#        if tagged_word[1] in nltk_tags:
#            cleaned_str += tagged_word[0] + ' '
            

    return cleaned_str

# setup pyspark udf function
tag_and_remove_udf = udf(tag_and_remove, StringType())

In [None]:
# applying the user defined function of tagging by part of speech
tagged_df = lemm_df.withColumn("tag_text", tag_and_remove_udf(lemm_df.lemm_text))

#### Remove stopwords

In [None]:
# filter out the empty non-type values
tagged_df = tagged_df.where(tagged_df.tag_text.isNotNull())

In [None]:
from nltk.corpus import stopwords
download('stopwords')
stop_words = stopwords.words('english')
stop_words.append('br')
stop_words.append('would')

In [None]:
# remove stop words
def remove_stops(data_str):
    # expects a string
    #stops = set(stopwords.words("english"))
    list_pos = 0
    cleaned_str = ''
    text = data_str.split()
    for word in text:
        if word not in stop_words:
            # rebuild cleaned_str
            if list_pos == 0:
                cleaned_str = word
            else:
                cleaned_str = cleaned_str + ' ' + word
            list_pos += 1
    return cleaned_str

# setup pyspark udf function
remove_stops_udf = udf(remove_stops, StringType())

In [None]:
# applying the user defined function of removing stop words
stop_df= tagged_df.withColumn("stop_text", remove_stops_udf(tagged_df["tag_text"]))

#### Tokenize the reviews into words

In [None]:
# setup pyspark udf function
tokenize_udf = udf(word_tokenize, ArrayType(StringType()))

token_df = stop_df.withColumn("token_text", tokenize_udf(stop_df["stop_text"]))

#### Set the reviews from 2015 as testing dataset and others as training datasets
#### Group reviews by ratings and years

In [None]:
from pyspark.sql.functions import collect_list

In [None]:
# filter out the empty non-type values
token_df = token_df.where(token_df.token_text.isNotNull())

In [None]:
df_combine_train = token_df.where("year != 2015").groupby('star_rating').agg(collect_list('token_text').alias("review_clean"))

In [None]:
df_combine_test = token_df.where("year = 2015").groupby('star_rating').agg(collect_list('token_text').alias("review_clean"))

In [None]:
df_combine_train = df_combine_train.where(df_combine_train.review_clean.isNotNull())
df_combine_test = df_combine_test.where(df_combine_test.review_clean.isNotNull())

In [None]:
import itertools

In [None]:
# Flatten the nested lists
def flatten_nested_list(nested_list):
    flatten_list = list(itertools.chain.from_iterable(nested_list))
    return flatten_list

In [None]:
flatten_udf = udf(flatten_nested_list, ArrayType(StringType()))

In [None]:
df_combine_train = df_combine_train.withColumn('review_cleaned', flatten_udf(df_combine_train.review_clean))
df_combine_test = df_combine_test.withColumn('review_cleaned', flatten_udf(df_combine_test.review_clean))

In [None]:
texts_train = df_combine_train.sort("star_rating",ascending=True).select('star_rating','review_cleaned').collect()
texts_test = df_combine_test.sort("star_rating",ascending=True).select('star_rating','review_cleaned').collect()

#### Create the documents for LDA

In [None]:
documents_train = []
for i in range(len(texts_train)):
    documents_train.append(texts_train[i].review_cleaned)

In [None]:
documents_test = []
for i in range(len(texts_test)):
    documents_test.append(texts_test[i].review_cleaned)

#### Filtering out the most frequent words

In [None]:
dict_train = {}
for i in documents_train:
    for j in i:
        if j in dict_train.keys():
            dict_train[j] += 1
        else:
            dict_train[j] = dict_train.get(j, 0) + 1

In [None]:
dict_test = {}
for i in documents_test:
    for j in i:
        if j in dict_test.keys():
            dict_test[j] += 1
        else:
            dict_test[j] = dict_test.get(j, 0) + 1

In [None]:
n_frequent_words_train=[]
for k, v in dict_train.items():
    if v > 90:
        n_frequent_words_train.append(k)

In [None]:
n_frequent_words_test=[]
for k, v in dict_test.items():
    if v > 75:
        n_frequent_words_test.append(k)

In [None]:
documents_train_filter = []
for l in documents_train:
    new_l = l
    for w in new_l:
        if w in n_frequent_words_train:
            new_l.remove(w)
    documents_train_filter.append(new_l)

In [None]:
documents_test_filter = []
for l in documents_test:
    new_l = l
    for w in new_l:
        if w in n_frequent_words_test:
            new_l.remove(w)
    documents_test_filter.append(new_l)