In [101]:
import pandas as pd
import numpy as np
import re
from math import log
from math import exp
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
from sklearn.feature_extraction.text import CountVectorizer

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
import tensorflow.keras.layers as L
import spacy

import matplotlib.pyplot as plt
import seaborn as sn

In [102]:
df = pd.read_csv('tripadvisor_hotel_reviews.csv')
X = df['Review']
y = df['Rating']


# hyperparameters
V_size = 10000
pad_len = 200
embd_dim = 64
unit = 64
batch_size = 32
num_epoch=3

In [103]:
def data_cleansing(review):
    """ 
    cleansing of raw data
    :param review: a Series of reviews
    :return: a Series of cleaned reviews 
    """
    # lower case
    review = review.lower()

    # fix negations
    review = re.sub("n't", ' not', review)

    # remove digits & punctuations
    review = re.sub('(\S*\d+\S*)|([^a-z\s])', ' ', review)

    # remove extra white spaces
    review = re.sub('\s+', ' ', review)
    review = review.strip()

    # lemmatization
    lemmatizer = WordNetLemmatizer()
    review = review.split()
    review = " ".join([lemmatizer.lemmatize(word, pos='v') for word in review])

    return review

In [104]:
def rate_shifting(y):
    """ 
    encode the ratings to start from 0 
    :param y: a Series of ratings ranging from 1 to 5
    :return: a Series of ratings ranging from 0 to 4
    """
    mapping = {1: 0, 2: 1, 3: 2, 4: 3, 5: 4}
    y_new = y.copy()
    y_new.replace(mapping, inplace=True)
    return y_new

In [105]:
def tokenization(X_train, X_test):
    """
    tokenize review texts, turn tokens into lists of sequences
    and padding
    :param X_train: a Series of reviews for training
    :param X_test: a Series of reviews for test
    :return V_list: vocabulary list
    :return X_seq_train_pad: padded sequences of reviews for training
    :return X_seq_test_pad: padded sequences of reviews for test 
    """
    tokenizer = Tokenizer(num_words=V_size)
    tokenizer.fit_on_texts(X_train)
    
    # turn texts to sequences
    X_seq_train = tokenizer.texts_to_sequences(X_train)
    X_seq_test = tokenizer.texts_to_sequences(X_test)
    
    # padding
    X_seq_train_pad = pad_sequences(X_seq_train, maxlen=pad_len, padding='post', truncating='post')
    X_seq_test_pad = pad_sequences(X_seq_test, maxlen=pad_len, padding='post', truncating='post')
    
    V_list = list(tokenizer.word_index.keys())[:V_size]
    
    return X_seq_train_pad, X_seq_test_pad, V_list


In [106]:
def cnt_matrix_gen(X, V):
    """ 
    Generate a doc-word count matrix 
    :param X: a Series of reviews 
    :param V: vocabulary list
    :return: doc-word count matrix
    """
    vectorizer = CountVectorizer(vocabulary=V)
    dw_cnt = vectorizer.fit_transform(X).toarray()
    return dw_cnt

In [107]:
def likelihood_gen(dw_cnt, y):
    """
    generate the likelihood of words in each class 
    :param dw_cnt: doc-word count matrix
    :param y: a Series of ratings
    :return: likelihood matrix 
    """ 
    cw_cnt = []
    for c in range(5):
        cw_cnt.append((dw_cnt[y == c].sum(0) + 1).tolist())    # add-one smoothing

    likelihood = []
    for c in range(len(cw_cnt)):
        temp = []
        for pos in range(len(cw_cnt[c])):
            S = sum(cw_cnt[c])
            temp.append(log(cw_cnt[c][pos] / S))
        likelihood.append(temp)

    return likelihood

In [108]:
def multinomialNB_train(dw_cnt_train, y_train):
    """
    train multinomial naive Bayes model
    :param dw_cnt_train: doc-word count matrix for training
    :param y_train: a Series of ratings for training
    :return prior: class prior list
    :return likelihood: likelihood matrix
    """
    num_doc = len(y_train)
    prior = []
    for c in range(5):
        num_class = y_train.value_counts()[c]
        prior.append(log(num_class / num_doc))

    likelihood = likelihood_gen(dw_cnt_train, y_train)

    return prior, likelihood

In [109]:
def multinomialNB_test(dw_cnt_test, y_test, prior, likelihood):
    """
    test multinomial naive Bayes model
    :param dw_cn_test: doc-word count matrix for test
    :param y_test: a Series of ratings for test
    :param prior: class prior list
    :param likelihood: likelihood matrix
    :return: None
    """
    y_pred = []
    for i in range(len(dw_cnt_test)):
        class_prob = []
        for c in range(len(prior)):
            prob = prior[c]

            for pos in range(len(dw_cnt_test[i])):
                prob += likelihood[c][pos] * dw_cnt_test[i][pos]
            class_prob.append(prob)
        class_prob = np.array(class_prob)
        y_pred.append(np.argmax(class_prob))    # maximum likelihood estimation
    
    y_test = y_test.values.tolist()
    
    print("Accuracy: ", accuracy_score(y_test, y_pred))
    print("Precision: ", precision_score(y_test, y_pred, average='macro'))
    print("Recall: ", recall_score(y_test, y_pred, average='macro'))
    print("F1 score: ", f1_score(y_test, y_pred, average='macro'))
    
    print("Confusion matrix: ", confusion_matrix(y_test, y_pred))
    #conf_matrix_plot(confusion_matrix(y_test, y_pred))
    print("\n")
    

In [110]:
def review_gen(likelihood, prior, V):
    """
    generate review data using multinomial NB
    :param likelihood: likelihood matrix
    :param prior: class prior list
    :param V: vocabulary list
    :return X_gen: a Series of generated reviews 
    :return y_gen: a Series of generated ratings 
    """
    X_gen = []
    y_gen = []
    prior = [exp(p) for p in prior]
    num_class = [int(4000 * p) for p in prior]

    for c in range(5):
        likelihood_class = [exp(l) for l in likelihood[c]]
        for i in range(num_class[c]):
            review_len = np.random.poisson(200)    # determine the review length
            review=[]
            for j in range(review_len):
                idx = np.random.multinomial(1, likelihood_class).tolist().index(1)    # determine the words
                review.append(V[idx])

            X_gen.append(review)
            y_gen.append(c)
            
    X_gen=pd.Series([' '.join(review) for review in X_gen])
    y_gen=pd.Series(y_gen)
    
    return X_gen, y_gen

In [111]:
def embd_matrix_gen(V):
    """
    generate a pretrained embedding matrix
    :param V: vocabulary list
    :return: pretrained embedding matrix
    """
    # load spaCy language model
    nlp = spacy.load('en_core_web_lg')
    V_vects = nlp(V_str)
    
    embd_matrix = np.zeros((V_size, 300))
    for i in range(V_size):
        if V_vects[i].vector is not None:
            embd_matrix[i] = V_vects[i].vector
            
    return embd_matrix

In [112]:
def BiLSTM_train(X_seq_train_pad, y_train, embd_matrix, pretrained = False):
    """
    construct a BiLSTM network and train
    :param X_seq_train_pad: padded sequences of reviews for training
    :param y_train: a Series of ratings for training
    :param embd_matrix: pretrained embedding matrix
    :param pretrained: train embeddings if False, use pretrained embeddings if True
    :return: trained BiLSTM network
    """
    model = Sequential()
    if not pretrained:
        model.add(L.Embedding(V_size, embd_dim))
    else:
        model.add(L.Embedding(V_size, 300, weights=[embd_matrix], trainable=False))
        
    model.add(L.Bidirectional(L.LSTM(unit, dropout=0.2, recurrent_dropout=0.2)))
#    model.add(L.Flatten())
#    model.add(L.Dropout(0.2))
#    model.add(L.Dense(1024, activation='relu'))
#    model.add(L.Dropout(0.3))
#    model.add(L.Dense(512, activation='relu'))
#    model.add(L.Dropout(0.3))
    model.add(L.Dense(5, activation='softmax'))
    model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    
    model.fit(X_seq_train_pad, y_train, batch_size=batch_size, epochs=num_epoch, validation_split=0.15, verbose=2)
    
    return model

In [113]:
def BiLSTM_test(model, X_seq_test_pad, y_test):
    """
    test the BiLSTM network
    :param model: trained BiLSTM network
    :X_seq_test_pad: padded sequences of reviews for test
    :y_test: a Series of ratings for test
    :return: accuracy score of test
    """
    y_pred = np.argmax(model.predict(X_seq_test_pad), axis=-1)
    
    print("Accuracy: ", accuracy_score(y_test, y_pred))
    print("Precision: ", precision_score(y_test, y_pred, average='macro'))
    print("Recall: ", recall_score(y_test, y_pred, average='macro'))
    print("F1 score: ", f1_score(y_test, y_pred, average='macro'))
    
    print("Confusion matrix: ", confusion_matrix(y_test, y_pred))
 #   conf_matrix_plot(confusion_matrix(y_test, y_pred))
    print("\n")


In [114]:
def conf_matrix_plot(matrix):
    """
    plot the confusion matrix
    :param matrix: confusion matrix
    :return: None
    """
    sn.heatmap(matrix,annot=True,annot_kws={"size": 10}) 
    plt.show()

In [None]:
def main():
    """
    process data, train & test models
    """
    # data cleansing
    X_clean = X.apply(data_cleansing)
    y_new = rate_shifting(y)
    
    # train-test spliting
    X_train, X_test, y_train, y_test = train_test_split(X_clean, y_new, test_size=0.2, random_state=42)
    
    # tokenization & padding
    X_seq_train_pad, X_seq_test_pad, V_list = tokenization(X_train, X_test)
    
    # pretrained embedding matrix
    nlp = spacy.load('en_core_web_lg')
    V_str = ' '.join(V_list)
    V_vects = nlp(V_str)
    embd_matrix = np.zeros((V_size, 300))
    for i in range(V_size):
        if V_vects[i].vector is not None:
            embd_matrix[i] = V_vects[i].vector
      
    
    print("Test on real-world data *****************************")
    print("##### Multinomial NB #####")
    dw_cnt_train = cnt_matrix_gen(X_train, V_list)
    dw_cnt_test = cnt_matrix_gen(X_test, V_list)
    prior, likelihood = multinomialNB_train(dw_cnt_train, y_train)
    multinomialNB_test(dw_cnt_test, y_test, prior, likelihood)
    
    print("##### Bi-LSTM without pretraiend embeddings #####")
    model1 = BiLSTM_train(X_seq_train_pad, y_train, embd_matrix, pretrained = False)
    BiLSTM_test(model1, X_seq_test_pad, y_test)
    
    print("##### Bi-LSTM with pretraiend embeddings #####")
    model2 = BiLSTM_train(X_seq_train_pad, y_train, embd_matrix, pretrained = True)
    BiLSTM_test(model2, X_seq_test_pad, y_test)
    
    
    # data generation 
    X_gen, y_gen = review_gen(likelihood, prior, V_list)
    tokenizer.fit_on_texts(X_gen)
    X_gen_seq = tokenizer.texts_to_sequences(X_gen)
    X_gen_seq_pad = pad_sequences(X_gen_seq, maxlen=pad_len, padding='post', truncating='post')
    
    print("Test on generated data *****************************")
    print('\n')
    print("##### Multinomial NB #####")
    dw_cnt_gen=cnt_matrix_gen(X_gen,V_list)
    multinomialNB_test(dw_cnt_gen, y_gen, prior, likelihood)
    
    print("##### Bi-LSTM without pretraiend embeddings #####")
    model3 = BiLSTM_train(X_seq_train_pad, y_train, embd_matrix, pretrained = False)
    BiLSTM_test(model3, X_gen_seq_pad, y_gen)
    
    print("##### Bi-LSTM with pretraiend embeddings #####")
    model4 = BiLSTM_train(X_seq_train_pad, y_train, embd_matrix, pretrained = True)
    BiLSTM_test(model4, X_gen_seq_pad, y_gen)
    
    
    
    

In [None]:
if __name__ == "__main__":
    main()