In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import re
import nltk
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from keras.preprocessing.text import Tokenizer
from keras_preprocessing.sequence import pad_sequences
from sklearn.metrics import accuracy_score

In [None]:
nltk.download('omw-1.4')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')
nltk.download('punkt')

In [None]:
# for reproducibility
seed0=1337
np.random.seed(seed0)
tf.keras.utils.set_random_seed(1)
tf.random.set_seed(seed0)
pd.set_option('display.max_colwidth', None)

In [None]:
# Split large data file to smaller copy

df = pd.read_csv('Reviews.csv')

# extract every category for 10000
df_1 = df[df['Score'] == 1].sample(n=10000, random_state=1)
df_2 = df[df['Score'] == 2].sample(n=10000, random_state=1)
df_3 = df[df['Score'] == 3].sample(n=20000, random_state=1)
df_4 = df[df['Score'] == 4].sample(n=10000, random_state=1)
df_5 = df[df['Score'] == 5].sample(n=10000, random_state=1)

# 将所有抽取的部分合并成一个新的DataFrame
df_extract = pd.concat([df_1, df_2, df_3, df_4, df_5])

# 将抽取的数据保存到新的CSV文件
df_extract.to_csv('Reviews_10000.csv', index=False)

In [None]:
df = pd.read_csv("Reviews_10000.csv")
df.head(10)

In [None]:
df_processed = df[['Text','Score']]
df_processed.head()

In [None]:
df_processed[df_processed['Score']==1].count()

In [None]:
# Help funciton to classify based on score
def score_converter(score):
    if score <= 2:
        return 'unsatisfied'
    elif score>=4:
        return 'satisfied'

# Helper function to clean the text
def remove_tags(string):
    result =re.sub(r'<br\s*/?>', '', string)
    result = re.sub('https://.*','',result)   #remove URLs
    result = re.sub('[^a-zA-Z0-9 ]', '', result)    #remove non-alphanumeric characters
    result = result.lower()
    return result

df_processed['Category'] = df_processed['Score'].apply(score_converter)
df_processed['Text'] = df_processed['Text'].apply(remove_tags)

df_processed = df_processed.sample(frac = 1)


In [None]:
# remove the stop word to increase model efficiency

from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))
df_processed['Text'] = df_processed['Text'].apply(lambda x: ' '.join([word for word in x.split() if word not in (stop_words)]))

In [None]:
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet
from nltk import pos_tag
from nltk.tokenize import word_tokenize


def get_wordnet_pos(treebank_tag):
    """Converts treebank tags to WordNet tags."""
    if treebank_tag.startswith('J'):
        return wordnet.ADJ
    elif treebank_tag.startswith('V'):
        return wordnet.VERB
    elif treebank_tag.startswith('N'):
        return wordnet.NOUN
    elif treebank_tag.startswith('R'):
        return wordnet.ADV
    else:
        return None

def lemmatize_text(text):
    lemmatizer = WordNetLemmatizer()
    # Tokenize the text into words
    words = word_tokenize(text)
    # Get part-of-speech tags for each word
    pos_tags = pos_tag(words)

    lemmatized_words = []
    for word, tag in pos_tags:
        # Convert part-of-speech tag to a format recognized by WordNetLemmatizer
        wntag = get_wordnet_pos(tag)
        if wntag is None:
            # If the tag is not recognized, keep the word as is
            lemmatized_words.append(word)
        else:
            # Lemmatize the word with the appropriate part of speech tag
            lemmatized_words.append(lemmatizer.lemmatize(word, pos=wntag))

    # Return the lemmatized words as a single string
    return ' '.join(lemmatized_words)
df_processed['Text'] = df_processed['Text'].apply(lemmatize_text)

In [None]:
# df_processed = pd.read_csv("processed.csv")
df_processed  = df_processed.dropna()

In [None]:
df_processed["Category"].unique()

In [None]:
s = 0.0
for i in df_processed['Text']:
    word_list = i.split()
    s = s + len(word_list)
print("Average length of each review : ",s/df_processed.shape[0])
pos = 0
neg = 0
for i in range(df_processed.shape[0]):

    if df_processed.iloc[i]['Category'] == 'satisfied':
        pos = pos + 1
    elif df_processed.iloc[i]['Category'] == 'unsatisfied':
        neg+=1
print("Percentage of reviews with positive sentiment is "+str(pos/df_processed.shape[0]*100)+"%")
print("Percentage of reviews with negative sentiment is "+str(neg/df_processed.shape[0]*100)+"%")

In [None]:
reviews = df_processed['Text'].values
labels = df_processed['Category'].values
encoder = LabelEncoder()
encoded_labels = encoder.fit_transform(labels)

train_sentences, test_sentences, train_labels, test_labels = train_test_split(reviews, encoded_labels, stratify = encoded_labels)

In [None]:
def plot_acc(his,title):
    plt.plot(his.history['accuracy'], label='Training Accuracy')
    plt.plot(his.history['val_accuracy'], label='Validation Accuracy')
    # add label and tile
    plt.title(title+' Model Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')

    plt.legend()
    plt.show()

In [None]:
# Hyperparameters of the model
vocab_size = 6000
oov_tok = ''
embedding_dim = 100
max_length = 200
padding_type='post'
trunc_type='post'
# tokenize sentences
tokenizer = Tokenizer(num_words = vocab_size, oov_token=oov_tok)
tokenizer.fit_on_texts(train_sentences)
word_index = tokenizer.word_index
# convert train dataset to sequence and pad sequences
train_sequences = tokenizer.texts_to_sequences(train_sentences)
train_padded = pad_sequences(train_sequences, padding='post', maxlen=max_length)
# convert Test dataset to sequence and pad sequences
test_sequences = tokenizer.texts_to_sequences(test_sentences)
test_padded = pad_sequences(test_sequences, padding='post', maxlen=max_length)

In [None]:
# Construct simple NN network
model = tf.keras.Sequential()
model.add(tf.keras.layers.Embedding(3000, 100, input_length=max_length))
model.add(tf.keras.layers.GlobalAveragePooling1D())
model.add(tf.keras.layers.Dense(1, activation='sigmoid'))


optimizer = tf.keras.optimizers.SGD(learning_rate=0.01, momentum=0.9)

model.compile(optimizer=optimizer,
              loss='binary_crossentropy',
              metrics=['accuracy'])


history = model.fit(train_padded, train_labels,
                    epochs=15, verbose=1,
                    validation_split=0.1)

In [None]:
prediction = model.predict(test_padded)
class_predictions = np.where(prediction > 0.5, 1, 0)

accuracy = accuracy_score(test_labels, class_predictions)
print(f"Model accuracy: {accuracy * 100:.2f}%")

plot_acc(history,"Simple NN")

In [None]:


model_cnn = tf.keras.Sequential()
model_cnn.add(tf.keras.layers.Embedding(input_dim=3000, output_dim=100, input_length=max_length))
model_cnn.add(tf.keras.layers.Conv1D(filters=128, kernel_size=5, activation='relu'))
model_cnn.add(tf.keras.layers.GlobalMaxPooling1D())
model_cnn.add(tf.keras.layers.Dense(10, activation='relu'))
model_cnn.add(tf.keras.layers.Dense(1, activation='sigmoid'))


model_cnn.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])

history = model.fit(train_padded, train_labels,
                    epochs=15, verbose=1,
                    validation_split=0.1)




prediction = model_cnn.predict(test_padded)
class_predictions = np.where(prediction > 0.5, 1, 0)

accuracy = accuracy_score(test_labels, class_predictions)
print(f"Model accuracy: {accuracy * 100:.2f}%")

plot_acc(history,"CNN")

In [None]:
#attention layer

from tensorflow.keras.layers import Layer
import tensorflow.keras.backend as K

class Attention(Layer):
    def __init__(self, return_sequences=True):
        self.return_sequences = return_sequences
        super(Attention, self).__init__()

    def build(self, input_shape):
        self.W = self.add_weight(name="att_weight", shape=(input_shape[-1], 1),
                                 initializer="normal")
        self.b = self.add_weight(name="att_bias", shape=(input_shape[1], 1),
                                 initializer="zeros")

        super(Attention, self).build(input_shape)

    def call(self, x):
        e = K.tanh(K.dot(x, self.W) + self.b)
        a = K.softmax(e, axis=1)
        output = x * a

        if self.return_sequences:
            return output

        return K.sum(output, axis=1)

In [None]:
# model initialization
model = tf.keras.Sequential([
    tf.keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences=True)),
    Attention(return_sequences=False),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(32, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')
])
# compile model
model.compile(loss='binary_crossentropy',
              optimizer=tf.keras.optimizers.SGD(learning_rate=0.03,momentum=0.8) ,
              metrics=['accuracy'])
# model summary
model.summary()

In [None]:
history = model.fit(train_padded, train_labels,
                    epochs=70, verbose=1,
                    validation_split=0.1)

In [None]:
prediction = model.predict(test_padded)
prediction

In [None]:
class_predictions = np.where(prediction > 0.5, 1, 0)

accuracy = accuracy_score(test_labels, class_predictions)
print(f"Model accuracy: {accuracy * 100:.2f}%")

plot_acc(history, "Bi-LSTM")

In [None]:
from tensorflow.keras import regularizers
from tensorflow.keras.layers import LeakyReLU

In [None]:
drop_out=0.2
activation=LeakyReLU(alpha = 0.01)
regularizer=regularizers.l2(2e-4)


In [None]:
model_cb = tf.keras.Sequential([
    tf.keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
    tf.keras.layers.Conv1D(filters=128, kernel_size=8,
                          strides=1,
                          activation=activation,
                          padding='causal'),
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences=True)),
    Attention(return_sequences=False),
    tf.keras.layers.Dense(32, activation=activation, kernel_regularizer = regularizer),
    tf.keras.layers.Dense(1, activation='sigmoid')
])
# compile model
model_cb.compile(loss='binary_crossentropy',
              optimizer=tf.keras.optimizers.Nadam(learning_rate=0.001),
              metrics=['accuracy'])
# model summary
model_cb.summary()

In [None]:
history = model_cb.fit(train_padded, train_labels,
                    epochs=2, verbose=1,
                    validation_split=0.1)

In [None]:
prediction = model_cb.predict(test_padded)
class_predictions = np.where(prediction > 0.5, 1, 0)

accuracy = accuracy_score(test_labels, class_predictions)
print(f"Model accuracy: {accuracy * 100:.2f}%")

In [None]:
plot_acc(history,"CNN-LSTM Combined")