# Part 0. Data Prepraration

In [None]:
from datasets import load_dataset
dataset = load_dataset("rotten_tomatoes")
train_dataset = dataset ['train']
validation_dataset = dataset ['validation']
test_dataset = dataset ['test']

# Part 3. Enhancement


In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, SimpleRNN, Dense
from tensorflow.keras.preprocessing.sequence import pad_sequences
import gensim.downloader as api
import numpy as np
import nltk
import random

In [None]:
model = api.load("glove-wiki-gigaword-100")
vocab_size = len(model.index_to_key) + 1
embedding_dim = model.vector_size
word_index = {word: index+1 for index, word in enumerate(model.index_to_key)} # index 0 is reserved for padding
embedding_matrix = np.zeros((vocab_size, embedding_dim))

vocab_size = len(model.index_to_key) + 1
embedding_dim = model.vector_size
word_index = {word: index+1 for index, word in enumerate(model.index_to_key)} # index 0 is reserved for padding
embedding_matrix = np.zeros((vocab_size, embedding_dim))

for word, idx in word_index.items():
    if word in model:
        embedding_matrix[idx] = model[word]

def tokenize(text, word_index):
    ls = nltk.word_tokenize(text)
    return [word_index[word] for word in ls if word in word_index]

X_train = [tokenize(text, word_index) for text in train_dataset['text']]
X_val = [tokenize(text, word_index) for text in validation_dataset['text']]
X_test = [tokenize(text, word_index) for text in test_dataset['text']]
max_length = max(len(seq) for seq in X_train)

X_train = pad_sequences(X_train, maxlen=max_length, padding='pre', truncating='pre')
X_val = pad_sequences(X_val, maxlen=max_length, padding='pre', truncating='pre')
X_test = pad_sequences(X_test, maxlen=max_length, padding='pre', truncating='pre')

y_train = np.array(train_dataset['label'])
y_val = np.array(validation_dataset['label'])
y_test = np.array(test_dataset['label'])

#### 1. Instead of keeping the word embeddings fixed, now update the word embeddings (the same way as model parameters) during the training process.


In [None]:
tf.random.set_seed(0)
np.random.seed(0)
random.seed(0)
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_accuracy',
    patience=10,
    restore_best_weights=True
)
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath="model_3_1.keras", 
    monitor='val_accuracy',            
    save_best_only=True,           
    mode='max',                 
    save_weights_only=False,       
    verbose=1
)
model = Sequential([
    Embedding(input_dim=vocab_size,
              output_dim=embedding_dim,
              weights=[embedding_matrix],
              trainable=True),
    SimpleRNN(16, return_sequences=False),
    Dense(1, activation='sigmoid')
])
optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.01)
model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=100,
    batch_size=64,
    callbacks=[checkpoint_callback, early_stopping]
)

In [None]:
best_model = tf.keras.models.load_model("model_3_1.keras")
accuracy = best_model.evaluate(X_test, y_test)
print(f"Test accuracy: {accuracy[1] * 100:.2f}%")

#### 2. As discussed in Question 1(c), apply your solution in mitigating the influence of OOV words and train your model again.

In [None]:
model = api.load("glove-wiki-gigaword-100")
vocab_size = len(model.index_to_key) + 2 # 0 is reserved for padding, 1 is reserved for OOV
embedding_dim = model.vector_size
word_index = {word: index+2 for index, word in enumerate(model.index_to_key)} # index 0 is reserved for padding
embedding_matrix = np.zeros((vocab_size, embedding_dim))

In [None]:
for word, idx in word_index.items():
    if word in model:
        embedding_matrix[idx] = model[word]

In [None]:
def tokenize(text, word_index):
    ls = nltk.word_tokenize(text)
    return [word_index[word] if word in word_index else 1 for word in ls]

X_train = [tokenize(text, word_index) for text in train_dataset['text']]
X_val = [tokenize(text, word_index) for text in validation_dataset['text']]
X_test = [tokenize(text, word_index) for text in test_dataset['text']]
max_length = max(len(seq) for seq in X_train)

In [None]:
X_train = pad_sequences(X_train, maxlen=max_length)
X_val = pad_sequences(X_val, maxlen=max_length)
X_test = pad_sequences(X_test, maxlen=max_length)

In [None]:
y_train = np.array(train_dataset['label'])
y_val = np.array(validation_dataset['label'])
y_test = np.array(test_dataset['label'])

In [None]:
def train_model(optimizer, epochs, batch_size, lr):
    tf.random.set_seed(0)
    np.random.seed(0)
    random.seed(0)
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_accuracy',
        patience=10,
        restore_best_weights=True
    )
    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath="model_oov.keras", 
        monitor='val_accuracy',            
        save_best_only=True,           
        mode='max',                 
        save_weights_only=False,       
        verbose=1
    )
    model = Sequential([
        Embedding(input_dim=vocab_size,
                  output_dim=embedding_dim,
                  weights=[embedding_matrix],
                  trainable=True), 
        SimpleRNN(16, return_sequences=False),
        Dense(1, activation='sigmoid')
    ])
    if optimizer == 'adam': optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
    elif optimizer == 'sgd': optimizer = tf.keras.optimizers.SGD(learning_rate=lr)
    elif optimizer == 'rmsprop': optimizer = tf.keras.optimizers.RMSprop(learning_rate=lr)
    else: optimizer = tf.keras.optimizers.Adagrad(learning_rate=lr)
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[checkpoint_callback, early_stopping]
    )
    return model, history

In [None]:
model, history = train_model("adagrad", 100, 64, 0.01)

In [None]:
best_model = tf.keras.models.load_model("model_oov.keras")
accuracy = best_model.evaluate(X_test, y_test)
print(f"Test accuracy: {accuracy[1] * 100:.2f}%")

#### 3. Keeping the above two adjustments, replace your simple RNN model in Part 2 with a biLSTM model and a biGRU model, incorporating recurrent computations in both directions and stacking multiple layers if possible

BiLSTM

In [None]:
from tensorflow.keras.layers import Bidirectional, LSTM
def train_model(optimizer, epochs, batch_size, lr):
    tf.random.set_seed(0)
    np.random.seed(0)
    random.seed(0)
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_accuracy',
        patience=3,
        restore_best_weights=True
    )
    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath="model_bilstm.keras", 
        monitor='val_accuracy',            
        save_best_only=True,           
        mode='max',                 
        save_weights_only=False,       
        verbose=1
    )
    model = Sequential([
        Embedding(input_dim=vocab_size,
                  output_dim=embedding_dim,
                  weights=[embedding_matrix],
                  trainable=True),
        Bidirectional(LSTM(16, return_sequences=True)),
        Bidirectional(LSTM(16, return_sequences=True)),
        Bidirectional(LSTM(16, return_sequences=False)),
        Dense(1, activation='sigmoid')
    ])
    if optimizer == 'adam': optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
    elif optimizer == 'sgd': optimizer = tf.keras.optimizers.SGD(learning_rate=lr)
    elif optimizer == 'rmsprop': optimizer = tf.keras.optimizers.RMSprop(learning_rate=lr)
    else: optimizer = tf.keras.optimizers.Adagrad(learning_rate=lr)
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[checkpoint_callback, early_stopping]
    )
    return model, history

In [None]:
model, history = train_model("adagrad", 100, 64, 0.01)

In [None]:
model.summary()

In [None]:
best_model = tf.keras.models.load_model("model_bilstm.keras")
accuracy = best_model.evaluate(X_test, y_test)
print("Test accuracy:", accuracy[1])

BiGRU

In [None]:
from tensorflow.keras.layers import Bidirectional, GRU
def train_model(optimizer, epochs, batch_size, lr):
    tf.random.set_seed(0)
    np.random.seed(0)
    random.seed(0)
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_accuracy',
        patience=3,
        restore_best_weights=True
    )
    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath="model_bigru.keras", 
        monitor='val_accuracy',            
        save_best_only=True,           
        mode='max',                 
        save_weights_only=False,       
        verbose=1
    )
    model = Sequential([
        Embedding(input_dim=vocab_size,
                  output_dim=embedding_dim,
                  weights=[embedding_matrix],
                  trainable=True), 
        Bidirectional(GRU(16, return_sequences=True)),
        Bidirectional(GRU(16, return_sequences=True)),
        Bidirectional(GRU(16, return_sequences=False)),
        Dense(1, activation='sigmoid')
    ])
    if optimizer == 'adam': optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
    elif optimizer == 'sgd': optimizer = tf.keras.optimizers.SGD(learning_rate=lr)
    elif optimizer == 'rmsprop': optimizer = tf.keras.optimizers.RMSprop(learning_rate=lr)
    else: optimizer = tf.keras.optimizers.Adagrad(learning_rate=lr)
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[checkpoint_callback, early_stopping]
    )
    return model, history

In [None]:
train_model("adagrad", 100, 64, 0.01)

In [None]:
best_model = tf.keras.models.load_model("model_bigru.keras")
accuracy = best_model.evaluate(X_test, y_test)
print("Test accuracy:", accuracy[1])

#### 4. Keeping the above two adjustments, replace your simple RNN model in Part 2 with a Convolutional Neural Network (CNN) to produce sentence representations and perform sentiment classification

In [None]:
from tensorflow.keras.layers import Convolution1D, Flatten

def train_model(optimizer, epochs, batch_size, lr):
    tf.random.set_seed(0)
    np.random.seed(0)
    random.seed(0)
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_accuracy',
        patience=10,
        restore_best_weights=True
    )
    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath="model_cnn.keras", 
        monitor='val_accuracy',            
        save_best_only=True,           
        mode='max',                 
        save_weights_only=False,       
        verbose=1
    )
    model = Sequential([
        Embedding(input_dim=vocab_size,
                  output_dim=embedding_dim,
                  weights=[embedding_matrix],
                  trainable=True), 
        Convolution1D(16, kernel_size=3, activation='relu'),
        Flatten(),
        Dense(1, activation='sigmoid')
    ])
    if optimizer == 'adam': optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
    elif optimizer == 'sgd': optimizer = tf.keras.optimizers.SGD(learning_rate=lr)
    elif optimizer == 'rmsprop': optimizer = tf.keras.optimizers.RMSprop(learning_rate=lr)
    else: optimizer = tf.keras.optimizers.Adagrad(learning_rate=lr)
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[checkpoint_callback, early_stopping]
    )
    return model, history

In [None]:
model, history = train_model("adagrad", 100, 64, 0.01)

In [None]:
best_model = tf.keras.models.load_model("model_cnn.keras")
accuracy = best_model.evaluate(X_test, y_test)
print("Test accuracy:", accuracy[1])

#### 5. Further improve your model. You are free to use any strategy other than the above mentioned solutions. Changing hyper-parameters or stacking more layers is not counted towards a meaningful improvement.

In [None]:
from tensorflow.keras.layers import MultiHeadAttention, Input, Bidirectional, Dropout, Dense, GlobalMaxPooling1D, GRU
from tensorflow.keras.models import Model

def train_model(optimizer, epochs, batch_size, lr):
    tf.random.set_seed(0)
    np.random.seed(0)
    random.seed(0)
    
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_accuracy',
        patience=10,
        restore_best_weights=True
    )
    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath="model_combined.keras", 
        monitor='val_accuracy',            
        save_best_only=True,           
        mode='max',                 
        save_weights_only=False,       
        verbose=1
    )

    input_layer = Input(shape=(max_length,))
    embedding_layer = Embedding(input_dim=vocab_size, output_dim=embedding_dim, weights=[embedding_matrix], trainable=True)(input_layer)
    
    gru_output = GRU(32, return_sequences=True)(embedding_layer)
    gru_output = Dropout(0.5)(gru_output)
    attention_output = MultiHeadAttention(num_heads=1, key_dim=16)(gru_output, gru_output)
    gru_output = Bidirectional(GRU(16, return_sequences=True))(attention_output)
    max_output = GlobalMaxPooling1D()(gru_output)
    gru_output = gru_output[:, -1, :]
    concat_output = tf.keras.layers.Concatenate(axis=1)([max_output, gru_output])
    output_layer = Dense(1, activation='sigmoid')(concat_output) 

    # Create the model
    model = Model(inputs=input_layer, outputs=output_layer)

    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',    
        factor=0.75,             
        patience=3,            
        min_lr=1e-6             
    )
    # Set optimizer
    if optimizer == 'adam': optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
    elif optimizer == 'sgd': optimizer = tf.keras.optimizers.SGD(learning_rate=lr)
    elif optimizer == 'rmsprop': optimizer = tf.keras.optimizers.RMSprop(learning_rate=lr)
    else: optimizer = tf.keras.optimizers.Adagrad(learning_rate=lr)

    # Compile the model
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])

    # Train the model
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[checkpoint_callback, reduce_lr, early_stopping]
    )

    return model, history

In [None]:
model, history = train_model("adagrad", 100, 64, 0.01)

In [None]:
model = tf.keras.models.load_model("model_combined.keras")
accuracy = model.evaluate(X_test, y_test)
print("Test accuracy:", accuracy[1])