In [13]:
import numpy as np
import keras
import os
import string
import tensorflow as tf
from typing import Any
import matplotlib.pyplot as plt
import keras_tuner as kt
import pandas as pd
import re
import gensim
from keras.preprocessing.text import Tokenizer
from keras.utils import pad_sequences
from sklearn.base import OneToOneFeatureMixin
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, precision_recall_curve
from sklearn.model_selection import train_test_split

from sklearn.preprocessing import OrdinalEncoder

# Preprocessing

## Dataframe Validation

When loading datasets, we want to ensure that each frame only consists of two columns, namely a review and sentiment columns, before concatenating all of the data into a singular frame.

In [14]:
def validate_dataframe(dataframe: pd.DataFrame) -> None:
    """
    Ensure the dataframe has a 'review' and 'sentiment' column, and no other
    column. If any of these is violated, throw an exception.

    :param dataframe: The dataframe to validate
    """
    if 'review' not in dataframe.columns:
        raise Exception(
            'Malformed dataframe',
            dataframe,
            'doesnt contain "review" column'
        )
    elif 'sentiment' not in dataframe.columns:
        raise Exception(
            'Malformed dataframe',
            dataframe,
            'doesnt contain "sentiment" column'
        )
    elif len(dataframe.columns) != 2:
        raise Exception(
            'Malformed dataframe',
            dataframe,
            'contains more than 2 columns'
        )

## Dataframe Loading

We want to be able to pass a list of tuple, where the first element is the dataframe to process, and the second is an encoder to apply to the sentiment column.

In [15]:
def load_data(
        dataframe_encoder_pairs: list[(pd.DataFrame, OneToOneFeatureMixin)],
        max_sequence_length: int,
):
    """
    Create a dataframe containing the reviews and sentiment scores of every
    provided dataframe by applying each encoder to its corresponding dataset
    and concatenating them along the horizontal axis.

    :param dataframe_encoder_pairs: The datasets and their corresponding
    encoders
    """
    full_frame = pd.DataFrame({
        'review': np.array([], dtype=str),
        'sentiment': np.array([], dtype=np.int32)
    })

    for dataframe, encoder in dataframe_encoder_pairs:
        # incase the dataframe uses object or some other data type
        dataframe['review'] = dataframe['review'].astype('str')

        # validate dataframe, clean the data and encode the sentiment scores
        validate_dataframe(dataframe)
        encoded_sentiment = encode_sentiment(
            dataframe['sentiment'].to_numpy(),
            encoder
        )
        cleaned_reviews = clean_reviews(dataframe['review'].to_numpy())

        # join the data into a single frame, and tack it on the full frame
        new_data = np.concatenate(
            [cleaned_reviews, encoded_sentiment],
            axis=1
        )
        full_frame = np.concatenate([full_frame, new_data])

    full_frame = pd.DataFrame(
        full_frame,
        columns=['review', 'sentiment']
    )
    
    return full_frame

## String Embedding

Use Google's Word2Vec model to create an embedding matrix for the final model. Also pads reviews with too few characters, encodes sentiment values, and returns thesize of the vocabulary associated with the embedding matrix.

In [16]:
def get_encoded_dataset(full_frame) -> (np.ndarray, np.ndarray, np.ndarray, int):
    # 2d list of sentence, where each element is a string, use this to
    # train a word to vector model
    if not os.path.isfile('./saved_models/w2vmodel.kvmodel'):
        documents = [_text.split() for _text in full_frame.review]
        w2v_model = gensim.models.word2vec.Word2Vec(vector_size=300,
                                                    window=7,
                                                    min_count=10,
                                                    workers=8)
        w2v_model.build_vocab(documents)
        w2v_model.train(documents, total_examples=len(documents), epochs=32)
        w2v_model.save('./saved_models/w2vmodel.kvmodel')
    else:
        w2v_model = gensim.models.word2vec.Word2Vec.load('./saved_models/w2vmodel.kvmodel')

    # fit a tokenizer to the reviews
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(full_frame.review)

    vocab_size = len(tokenizer.word_index) + 1

    # pad the reviews
    padded_review = pad_sequences(
        tokenizer.texts_to_sequences(full_frame.review),
        maxlen=300
    )

    # encode sentiments
    encoder = LabelEncoder()
    encoder.fit(full_frame.sentiment.tolist())

    encoded_sentiment = encoder.transform(full_frame.sentiment.tolist())

    # create embedding matrix
    embedding_matrix = np.zeros((vocab_size, 300))
    for word, i in tokenizer.word_index.items():
        if word in w2v_model.wv:
            embedding_matrix[i] = w2v_model.wv[word]

    return padded_review, encoded_sentiment, embedding_matrix, vocab_size

## Cleaning

Remove unncessary white space, @, hyperlinks and non-alphanumeric characters from any strings. Also make every thing lower case.

In [17]:
def clean_review(review: str) -> str:
    """
    Clean a review

    :param review: string to be cleaned
    :return: Cleaned review
    """

    # remove punctuation from X
    strip_punct = str.maketrans('', '', string.punctuation)
    review = review.translate(strip_punct)

    # replace double spaces with single spaces and remove spaces at the end of
    # sentences
    review = re.sub(' +', ' ', review)
    review = re.sub(' $', '', review)
    review = re.sub('^ ', '', review)

    # remove @, links, and non-alphanumeric characters
    review = re.sub('@\\S+|https?:\\S+|http?:\\S|[^A-Za-z0-9]+', ' ', review)

    # lower case everything
    review = review.lower()

    return review

In [18]:
def clean_reviews(reviews: np.ndarray) -> np.ndarray:
    return np.array(list(map(clean_review, list(reviews)))).reshape((-1, 1))


## Sentiment Scaling

Scale sentiment scores so that they range from 0 to 1.

In [19]:
def encode_sentiment(sentiment: np.ndarray,
                     encoder: Any) -> np.ndarray:
    """
    Encode sentiment scores and scale them such that they have a max of 1,
    and min of 0

    :param sentiment: Sentiment scores
    :param encoder: Encoder
    :return: Sentiment scores with the encoder applied to them, also scaled
    to be within 0 to 1
    """
    if encoder is not None:
        encoded_sentiment = encoder.fit_transform(
            sentiment.reshape(-1, 1)
        ).astype(np.int64)
    else:
        encoded_sentiment = sentiment.reshape((-1, 1))

    encoded_sentiment = MinMaxScaler().fit_transform(encoded_sentiment)
    return encoded_sentiment

# Model Creation

## Base Model

Create a model that combines CNNs and RNNs to perform sentiment analysis, as originally proposed by [Xingyou Wang, Weijie Jiang, Zhiyong Luo](https://aclanthology.org/C16-1229.pdf) in 2016.

In [None]:
def create_model(embedding_matrix: np.ndarray, vocab_size: int):
    input_layer = keras.layers.Input(shape=(300,))
    my_embed_layer = keras.layers.Embedding(vocab_size, 300, weights=[embedding_matrix], input_length=300, trainable=False)(input_layer)
    dropout_layer_1 = keras.layers.Dropout(0.5)(my_embed_layer)

    conv_11 = keras.layers.Conv1D(50, kernel_size=3, padding='same', kernel_initializer='he_uniform')(dropout_layer_1)
    max_pool_1 = keras.layers.MaxPool1D(padding='same')(conv_11)

    conv_21 = keras.layers.Conv1D(50, kernel_size=3, padding='same', kernel_initializer='he_uniform')(dropout_layer_1)
    max_pool_2 = keras.layers.MaxPool1D(padding='same')(conv_21)

    concat = keras.layers.concatenate([max_pool_1, max_pool_2], axis=1)
    dropout_layer_2 = keras.layers.Dropout(0.15)(concat)

    gru = tf.compat.v1.keras.layers.GRU(128)(dropout_layer_2)
    dense = keras.layers.Dense(400)(gru)
    dropout_layer_3 = keras.layers.Dropout(0.1)(dense)
    out = keras.layers.Dense(1, activation='sigmoid')(dropout_layer_3)

    return keras.models.Model(inputs=input_layer, outputs=out)

# Training

## Load Data

In [20]:
imdb_data = pd.read_csv(
    './data/imdb_utf8.csv'
)

In [21]:
full_data = load_data([
    (imdb_data, OrdinalEncoder(categories=[['negative', 'positive']]))
], 300)

x, y, embedding_matrix, vocab_size = get_encoded_dataset(full_data)
x_train, x_test, y_train, y_test = train_test_split(
    x, y, test_size=0.2
)

## Train The Model

In [11]:
model = create_model(embedding_matrix, vocab_size)
model.compile(loss="binary_crossentropy", optimizer="adam",
                   metrics=["accuracy"])


model.fit(x_train, y_train, epochs=5, batch_size=1000, validation_split=0.1)
model.save('./saved_models/sentiment.keras')

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


## Generate Predictions And Metrics

In [12]:
predictions = model.predict(x_test, batch_size=100)



In [13]:
discrete_preds = np.zeros((len(list(predictions)), 1))
discrete_preds[predictions <= 1/2] = 0
discrete_preds[predictions > 1/2] = 1
print('Accuracy :', accuracy_score(discrete_preds, y_test))
print('Precision :', precision_score(discrete_preds, y_test))
print('Recall :', recall_score(discrete_preds, y_test))

Accuracy : 0.8917
Precision : 0.9150905432595573
Recall : 0.8731042426569399


# Finetuning

## Create Model With Variables In Place Of Hyperparameters

In [14]:
def create_bidirectional_model(hp):
    hp_conv_units = hp.Int('conv_units', min_value=50, max_value=200, step=75)
    hp_kernel_size = hp.Int('kernel_size', min_value=2, max_value=4, step=1)
    hp_dense_units = hp.Int('dense_units', min_value=100, max_value=200, step=100)
    
    input_layer = keras.layers.Input(shape=(300,))
    my_embed_layer = keras.layers.Embedding(vocab_size, 300, weights=[embedding_matrix], input_length=300, trainable=False)(input_layer)
    dropout_layer_1 = keras.layers.Dropout(0.5)(my_embed_layer)

    conv_11 = keras.layers.Conv1D(hp_conv_units, kernel_size=hp_kernel_size, padding='same', kernel_initializer='he_uniform')(dropout_layer_1)
    max_pool_1 = keras.layers.MaxPool1D(padding='same')(conv_11)

    conv_21 = keras.layers.Conv1D(hp_conv_units, kernel_size=hp_kernel_size, padding='same', kernel_initializer='he_uniform')(dropout_layer_1)
    max_pool_2 = keras.layers.MaxPool1D(padding='same')(conv_21)

    concat = keras.layers.concatenate([max_pool_1, max_pool_2], axis=1)
    dropout_layer_2 = keras.layers.Dropout(0.15)(concat)

    gru = keras.layers.Bidirectional(tf.compat.v1.keras.layers.GRU(128))(dropout_layer_2)
    dense = keras.layers.Dense(hp_dense_units)(gru)
    dropout_layer_3 = keras.layers.Dropout(0.1)(dense)
    out = keras.layers.Dense(1, activation='sigmoid')(dropout_layer_3)

    model = keras.models.Model(inputs=input_layer, outputs=out)
    model.compile(loss="binary_crossentropy", optimizer="adam",
                   metrics=["accuracy"])
    return model

## Search For Optimal Hyperparameters

In [22]:
stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=0.05, patience=3)
tuner = kt.Hyperband(create_bidirectional_model,
                     objective='val_loss',
                     max_epochs=5)
tuner.search(x_train[:5000], y_train[:5000], epochs=3, batch_size=75, validation_split=0.2, callbacks=[stop_early])

Trial 10 Complete [00h 08m 30s]
val_loss: 0.36807867884635925

Best val_loss So Far: 0.32960665225982666
Total elapsed time: 08h 39m 20s


In [23]:
best_hp = tuner.get_best_hyperparameters()[0]

In [29]:
best_hp.values

{'conv_units': 125,
 'kernel_size': 3,
 'dense_units': 200,
 'tuner/epochs': 5,
 'tuner/initial_epoch': 2,
 'tuner/bracket': 1,
 'tuner/round': 1,
 'tuner/trial_id': '0003'}

## Create The Final Model Based on New Hyperparameters

In [41]:
def create_final_model():
    input_layer = keras.layers.Input(shape=(300,))
    my_embed_layer = keras.layers.Embedding(vocab_size, 300, weights=[embedding_matrix], input_length=300, trainable=False)(input_layer)
    dropout_layer_1 = keras.layers.Dropout(0.5)(my_embed_layer)

    conv_11 = keras.layers.Conv1D(125, kernel_size=3, padding='same', kernel_initializer='he_uniform')(dropout_layer_1)
    max_pool_1 = keras.layers.MaxPool1D(padding='same')(conv_11)

    conv_21 = keras.layers.Conv1D(125, kernel_size=3, padding='same', kernel_initializer='he_uniform')(dropout_layer_1)
    max_pool_2 = keras.layers.MaxPool1D(padding='same')(conv_21)

    concat = keras.layers.concatenate([max_pool_1, max_pool_2], axis=1)
    dropout_layer_2 = keras.layers.Dropout(0.15)(concat)

    gru = tf.compat.v1.keras.layers.GRU(128)(dropout_layer_2)
    dense = keras.layers.Dense(200)(gru)
    dropout_layer_3 = keras.layers.Dropout(0.1)(dense)
    out = keras.layers.Dense(1, activation='sigmoid')(dropout_layer_3)

    model = keras.models.Model(inputs=input_layer, outputs=out)
    model.compile(loss="binary_crossentropy", optimizer="adam",
                   metrics=["accuracy"])
    return model

## Train Final Model On Full Train Set And Evaluate Metrics

In [42]:
final_model = create_final_model()

In [46]:
final_model.fit(x_train, y_train, epochs=4, batch_size=1000, validation_split=0.1)
final_model.save('./final_models/final.keras')

Epoch 1/4
Epoch 2/4
Epoch 3/4
Epoch 4/4


In [47]:
predictions_final = final_model.predict(x_test, batch_size=50)
discrete_preds_final = np.zeros((len(list(predictions_final)), 1))
discrete_preds_final[predictions_final <= 1/2] = 0
discrete_preds_final[predictions_final > 1/2] = 1
print('Accuracy :', accuracy_score(discrete_preds_final, y_test))
print('Precision :', precision_score(discrete_preds_final, y_test))
print('Recall :', recall_score(discrete_preds_final, y_test))

Accuracy : 0.9015
Precision : 0.9114688128772636
Recall : 0.8926108374384236
